This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 209bac0c5f IGNITE-20727 Pass schema version in each read/write 
ReplicaRequest (#2747)
209bac0c5f is described below

commit 209bac0c5f7111ef2ec1cb8c2a33625c92cddd14
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 25 13:21:18 2023 +0400

    IGNITE-20727 Pass schema version in each read/write ReplicaRequest (#2747)
---
 .../message/SchemaVersionAwareReplicaRequest.java} |  19 +--
 .../sql/engine/exec/UpdatableTableImpl.java        |  19 +--
 .../ItInternalTableReadOnlyOperationsTest.java     |  30 ++--
 .../ignite/distributed/ItTablePersistenceTest.java |   7 +-
 .../ignite/distributed/ReplicaUnavailableTest.java |  58 +++----
 .../ignite/internal/table/ItColocationTest.java    |  18 +-
 .../request/MultipleRowPkReplicaRequest.java       |   4 +-
 .../request/MultipleRowReplicaRequest.java         |  21 +--
 .../request/SingleRowPkReplicaRequest.java         |   4 +-
 .../request/SingleRowReplicaRequest.java           |  10 +-
 .../replication/request/SwapRowReplicaRequest.java |  16 +-
 .../replicator/PartitionReplicaListener.java       |  35 ++--
 .../distributed/storage/InternalTableImpl.java     | 189 +++++++++++++--------
 .../PartitionReplicaListenerIndexLockingTest.java  |   9 +-
 .../replication/PartitionReplicaListenerTest.java  |  64 ++++---
 15 files changed, 285 insertions(+), 218 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/SchemaVersionAwareReplicaRequest.java
similarity index 60%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
copy to 
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/SchemaVersionAwareReplicaRequest.java
index e60076022d..03a79787a6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/SchemaVersionAwareReplicaRequest.java
@@ -15,19 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.replication.request;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
-import org.apache.ignite.network.annotations.Marshallable;
+package org.apache.ignite.internal.replicator.message;
 
 /**
- * Single row replica request involving a table's Primary Key.
+ * Requests that are aware about the schema version that must be used when 
processing them.
  */
-public interface SingleRowPkReplicaRequest extends ReplicaRequest {
-    ByteBuffer primaryKey();
-
-    @Marshallable
-    RequestType requestType();
+public interface SchemaVersionAwareReplicaRequest extends ReplicaRequest {
+    /**
+     * Schema version in which input values are marshalled and that must be 
used when processing the request.
+     */
+    int schemaVersion();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index d1b92dbe4f..9272cc1123 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -52,7 +52,6 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
-import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.RowBatch;
@@ -189,7 +188,8 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
             ReplicaRequest request = 
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(partGroupId)
                     
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
-                    
.binaryRowMessages(serializeBinaryRows(partToRows.getValue()))
+                    
.schemaVersion(partToRows.getValue().get(0).schemaVersion())
+                    .binaryTuples(binaryRowsToBuffers(partToRows.getValue()))
                     .transactionId(txAttributes.id())
                     .term(nodeWithTerm.term())
                     .requestType(RequestType.RW_UPSERT_ALL)
@@ -203,16 +203,11 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
         return CompletableFuture.allOf(futures);
     }
 
-    private static List<BinaryRowMessage> 
serializeBinaryRows(Collection<BinaryRow> rows) {
-        var result = new ArrayList<BinaryRowMessage>(rows.size());
+    private static List<ByteBuffer> binaryRowsToBuffers(Collection<BinaryRow> 
rows) {
+        var result = new ArrayList<ByteBuffer>(rows.size());
 
         for (BinaryRow row : rows) {
-            BinaryRowMessage message = MESSAGES_FACTORY.binaryRowMessage()
-                    .binaryTuple(row.tupleSlice())
-                    .schemaVersion(row.schemaVersion())
-                    .build();
-
-            result.add(message);
+            result.add(row.tupleSlice());
         }
 
         return result;
@@ -269,7 +264,8 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
             ReadWriteMultiRowReplicaRequest request = 
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(partGroupId)
                     
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
-                    
.binaryRowMessages(serializeBinaryRows(rowBatch.requestedRows))
+                    
.schemaVersion(rowBatch.requestedRows.get(0).schemaVersion())
+                    .binaryTuples(binaryRowsToBuffers(rowBatch.requestedRows))
                     .transactionId(txAttributes.id())
                     .term(nodeWithTerm.term())
                     .requestType(RequestType.RW_INSERT_ALL)
@@ -337,6 +333,7 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
             ReplicaRequest request = 
MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
                     .groupId(partGroupId)
                     
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
+                    
.schemaVersion(partToRows.getValue().get(0).schemaVersion())
                     .primaryKeys(serializePrimaryKeys(partToRows.getValue()))
                     .transactionId(txAttributes.id())
                     .term(nodeWithTerm.term())
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index 0f383329b7..bded985578 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.ColumnsExtractor;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -103,6 +104,9 @@ public class ItInternalTableReadOnlyOperationsTest extends 
IgniteAbstractTest {
     /** Internal table to test. */
     private InternalTable internalTbl;
 
+    @Mock
+    private BinaryRowEx someRow;
+
     /**
      * Prepare test environment using DummyInternalTableImpl and Mocked 
storage.
      */
@@ -238,19 +242,19 @@ public class ItInternalTableReadOnlyOperationsTest 
extends IgniteAbstractTest {
         when(tx.isReadOnly()).thenReturn(true);
 
         List<Executable> executables = List.of(
-                () -> internalTbl.delete(null, tx).get(),
-                () -> internalTbl.deleteAll(null, tx).get(),
-                () -> internalTbl.deleteExact(null, tx).get(),
-                () -> internalTbl.deleteAllExact(null, tx).get(),
-                () -> internalTbl.getAndDelete(null, tx).get(),
-                () -> internalTbl.getAndReplace(null, tx).get(),
-                () -> internalTbl.getAndUpsert(null, tx).get(),
-                () -> internalTbl.upsert(null, tx).get(),
-                () -> internalTbl.upsertAll(null, tx).get(),
-                () -> internalTbl.insert(null, tx).get(),
-                () -> internalTbl.insertAll(null, tx).get(),
-                () -> internalTbl.replace(null, tx).get(),
-                () -> internalTbl.replace(null, null, tx).get()
+                () -> internalTbl.delete(someRow, tx).get(),
+                () -> internalTbl.deleteAll(List.of(someRow), tx).get(),
+                () -> internalTbl.deleteExact(someRow, tx).get(),
+                () -> internalTbl.deleteAllExact(List.of(someRow), tx).get(),
+                () -> internalTbl.getAndDelete(someRow, tx).get(),
+                () -> internalTbl.getAndReplace(someRow, tx).get(),
+                () -> internalTbl.getAndUpsert(someRow, tx).get(),
+                () -> internalTbl.upsert(someRow, tx).get(),
+                () -> internalTbl.upsertAll(List.of(someRow), tx).get(),
+                () -> internalTbl.insert(someRow, tx).get(),
+                () -> internalTbl.insertAll(List.of(someRow), tx).get(),
+                () -> internalTbl.replace(someRow, tx).get(),
+                () -> internalTbl.replace(someRow, someRow, tx).get()
         );
 
         executables.forEach(executable -> {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 537754ccc6..1297653efc 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -310,7 +310,12 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                         .txId(req0.transactionId())
                         .tablePartitionId(tablePartitionId(new 
TablePartitionId(1, 0)))
                         .rowUuid(new RowId(0).uuid())
-                        .rowMessage(req0.binaryRowMessage())
+                        .rowMessage(
+                                msgFactory.binaryRowMessage()
+                                        .schemaVersion(req0.schemaVersion())
+                                        .binaryTuple(req0.binaryTuple())
+                                        .build()
+                        )
                         .safeTimeLong(hybridClock.nowLong())
                         .txCoordinatorId(UUID.randomUUID().toString())
                         .build();
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 083b847f2a..0d8af5898e 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
-import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -149,14 +148,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
 
         TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
 
-        ReadWriteSingleRowReplicaRequest request = 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
-                .groupId(tablePartitionId)
-                .transactionId(TestTransactionIds.newTransactionId())
-                .commitPartitionId(tablePartitionId())
-                .timestampLong(clock.nowLong())
-                .binaryRowMessage(createKeyValueRow(1L, 1L))
-                .requestType(RequestType.RW_GET)
-                .build();
+        ReadWriteSingleRowReplicaRequest request = 
getRequest(tablePartitionId);
 
         
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
                 (message, sender, correlationId) -> {
@@ -185,20 +177,27 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
         assertEquals(5, respFur.get().result());
     }
 
-    @Test
-    public void testStopReplicaException() {
-        ClusterNode clusterNode = 
clusterService.topologyService().localMember();
+    private ReadWriteSingleRowReplicaRequest getRequest(TablePartitionId 
tablePartitionId) {
+        BinaryRow binaryRow = createKeyValueRow(1L, 1L);
 
-        TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
-
-        ReadWriteSingleRowReplicaRequest request = 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+        return tableMessagesFactory.readWriteSingleRowReplicaRequest()
                 .groupId(tablePartitionId)
                 .transactionId(TestTransactionIds.newTransactionId())
                 .commitPartitionId(tablePartitionId())
                 .timestampLong(clock.nowLong())
-                .binaryRowMessage(createKeyValueRow(1L, 1L))
+                .schemaVersion(binaryRow.schemaVersion())
+                .binaryTuple(binaryRow.tupleSlice())
                 .requestType(RequestType.RW_GET)
                 .build();
+    }
+
+    @Test
+    public void testStopReplicaException() {
+        ClusterNode clusterNode = 
clusterService.topologyService().localMember();
+
+        TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+
+        ReadWriteSingleRowReplicaRequest request = 
getRequest(tablePartitionId);
 
         
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
                 (message, sender, correlationId) -> {
@@ -223,14 +222,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
 
         TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
 
-        ReadWriteSingleRowReplicaRequest request = 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
-                .groupId(tablePartitionId)
-                .transactionId(TestTransactionIds.newTransactionId())
-                .commitPartitionId(tablePartitionId())
-                .timestampLong(clock.nowLong())
-                .binaryRowMessage(createKeyValueRow(1L, 1L))
-                .requestType(RequestType.RW_GET)
-                .build();
+        ReadWriteSingleRowReplicaRequest request = 
getRequest(tablePartitionId);
 
         Exception e0 = null;
         Exception e1 = null;
@@ -280,14 +272,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                 }
         );
 
-        ReadWriteSingleRowReplicaRequest request = 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
-                .groupId(tablePartitionId)
-                .transactionId(TestTransactionIds.newTransactionId())
-                .commitPartitionId(tablePartitionId())
-                .timestampLong(clock.nowLong())
-                .binaryRowMessage(createKeyValueRow(1L, 1L))
-                .requestType(RequestType.RW_GET)
-                .build();
+        ReadWriteSingleRowReplicaRequest request = 
getRequest(tablePartitionId);
 
         Exception e0 = null;
 
@@ -302,18 +287,13 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
         assertEquals(REPLICA_TIMEOUT_ERR, ((ReplicationTimeoutException) 
unwrapCause(e0)).code());
     }
 
-    private BinaryRowMessage createKeyValueRow(long id, long value) {
+    private BinaryRow createKeyValueRow(long id, long value) {
         RowAssembler rowBuilder = new RowAssembler(SCHEMA);
 
         rowBuilder.appendLong(id);
         rowBuilder.appendLong(value);
 
-        BinaryRow row = rowBuilder.build();
-
-        return tableMessagesFactory.binaryRowMessage()
-                .binaryTuple(row.tupleSlice())
-                .schemaVersion(row.schemaVersion())
-                .build();
+        return rowBuilder.build();
     }
 
     private TablePartitionIdMessage tablePartitionId() {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index e7afd3ee1d..611d33fe36 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -36,6 +36,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NullBinaryRow;
@@ -204,8 +206,9 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
             RaftGroupService r = groupRafts.get(request.groupId());
 
             if (request instanceof ReadWriteMultiRowReplicaRequest) {
-                Map<UUID, BinaryRowMessage> rows = 
((ReadWriteMultiRowReplicaRequest) request).binaryRowMessages()
-                        .stream()
+                ReadWriteMultiRowReplicaRequest multiRowReplicaRequest = 
(ReadWriteMultiRowReplicaRequest) request;
+                Map<UUID, BinaryRowMessage> rows = 
multiRowReplicaRequest.binaryTuples().stream()
+                        .map(tupleBuffer -> binaryRowMessage(tupleBuffer, 
multiRowReplicaRequest))
                         .collect(toMap(row -> 
TestTransactionIds.newTransactionId(), Function.identity()));
 
                 return r.run(MSG_FACTORY.updateAllCommand()
@@ -221,6 +224,8 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
             } else {
                 assertThat(request, 
is(instanceOf(ReadWriteSingleRowReplicaRequest.class)));
 
+                ReadWriteSingleRowReplicaRequest singleRowReplicaRequest = 
(ReadWriteSingleRowReplicaRequest) request;
+
                 return r.run(MSG_FACTORY.updateCommand()
                         .tablePartitionId(
                                 MSG_FACTORY.tablePartitionIdMessage()
@@ -229,7 +234,7 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                                         .build()
                         )
                         .rowUuid(UUID.randomUUID())
-                        .rowMessage(((ReadWriteSingleRowReplicaRequest) 
request).binaryRowMessage())
+                        
.rowMessage(binaryRowMessage(singleRowReplicaRequest.binaryTuple(), 
singleRowReplicaRequest))
                         .txId(TestTransactionIds.newTransactionId())
                         .txCoordinatorId(node.id())
                         .build());
@@ -252,6 +257,13 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
         );
     }
 
+    private static BinaryRowMessage binaryRowMessage(ByteBuffer tupleBuffer, 
SchemaVersionAwareReplicaRequest request) {
+        return MSG_FACTORY.binaryRowMessage()
+                .schemaVersion(request.schemaVersion())
+                .binaryTuple(tupleBuffer)
+                .build();
+    }
+
     @AfterAll
     static void afterAllTests() throws Exception {
         if (txManager != null) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowPkReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowPkReplicaRequest.java
index be25a12970..8de1971b36 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowPkReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowPkReplicaRequest.java
@@ -19,14 +19,14 @@ package 
org.apache.ignite.internal.table.distributed.replication.request;
 
 import java.nio.ByteBuffer;
 import java.util.List;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Multiple row replica request involving table's Primary Keys.
  */
-public interface MultipleRowPkReplicaRequest extends ReplicaRequest {
+public interface MultipleRowPkReplicaRequest extends 
SchemaVersionAwareReplicaRequest {
     List<ByteBuffer> primaryKeys();
 
     @Marshallable
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
index 43cbaf1188..6872aaac00 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
@@ -17,32 +17,33 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Multiple row replica request.
  */
-public interface MultipleRowReplicaRequest extends ReplicaRequest {
-    List<BinaryRowMessage> binaryRowMessages();
+public interface MultipleRowReplicaRequest extends 
SchemaVersionAwareReplicaRequest {
+    List<ByteBuffer> binaryTuples();
 
     /**
-     * Deserializes binary row byte buffers into binary rows.
+     * Returns {@link BinaryRow}s contained in the request.
      */
     default List<BinaryRow> binaryRows() {
-        List<BinaryRowMessage> binaryRowMessages = binaryRowMessages();
+        List<ByteBuffer> tuples = binaryTuples();
+        List<BinaryRow> rows = new ArrayList<>(tuples.size());
 
-        var result = new ArrayList<BinaryRow>(binaryRowMessages.size());
-
-        for (BinaryRowMessage message : binaryRowMessages) {
-            result.add(message.asBinaryRow());
+        for (ByteBuffer tuple : tuples) {
+            rows.add(new BinaryRowImpl(schemaVersion(), tuple));
         }
 
-        return result;
+        return rows;
     }
 
     @Marshallable
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
index e60076022d..3cb6e76b4d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
@@ -18,14 +18,14 @@
 package org.apache.ignite.internal.table.distributed.replication.request;
 
 import java.nio.ByteBuffer;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Single row replica request involving a table's Primary Key.
  */
-public interface SingleRowPkReplicaRequest extends ReplicaRequest {
+public interface SingleRowPkReplicaRequest extends 
SchemaVersionAwareReplicaRequest {
     ByteBuffer primaryKey();
 
     @Marshallable
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
index a2f033cf0e..a15fdf38d3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
@@ -17,19 +17,21 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import java.nio.ByteBuffer;
+import 
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Single-row replica request.
  */
-public interface SingleRowReplicaRequest extends ReplicaRequest {
-    BinaryRowMessage binaryRowMessage();
+public interface SingleRowReplicaRequest extends 
SchemaVersionAwareReplicaRequest {
+    ByteBuffer binaryTuple();
 
     default BinaryRow binaryRow() {
-        return binaryRowMessage().asBinaryRow();
+        return new BinaryRowImpl(schemaVersion(), binaryTuple());
     }
 
     @Marshallable
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
index 1b5a82f6ec..b9b6e1fd58 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
@@ -17,25 +17,27 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import java.nio.ByteBuffer;
+import 
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Dual row replica request.
  */
-public interface SwapRowReplicaRequest extends ReplicaRequest {
-    BinaryRowMessage binaryRowMessage();
+public interface SwapRowReplicaRequest extends 
SchemaVersionAwareReplicaRequest {
+    ByteBuffer newBinaryTuple();
 
-    default BinaryRow binaryRow() {
-        return binaryRowMessage().asBinaryRow();
+    default BinaryRow newBinaryRow() {
+        return new BinaryRowImpl(schemaVersion(), newBinaryTuple());
     }
 
-    BinaryRowMessage oldBinaryRowMessage();
+    ByteBuffer oldBinaryTuple();
 
     default BinaryRow oldBinaryRow() {
-        return oldBinaryRowMessage().asBinaryRow();
+        return new BinaryRowImpl(schemaVersion(), oldBinaryTuple());
     }
 
     @Marshallable
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index afb264d848..91e86650a2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -93,6 +93,7 @@ import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaReques
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import 
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -458,6 +459,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     }
 
     private CompletableFuture<?> processRequest(ReplicaRequest request, 
@Nullable Boolean isPrimary, String senderId) {
+        if (request instanceof SchemaVersionAwareReplicaRequest) {
+            assert ((SchemaVersionAwareReplicaRequest) 
request).schemaVersion() > 0 : "No schema version passed?";
+        }
+
         if (request instanceof CommittableTxRequest) {
             var req = (CommittableTxRequest) request;
 
@@ -479,7 +484,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param request Request that's being processed.
      */
     private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest 
request) {
-        // TODO: IGNITE-20106 - validate that input rows schema version 
matches the tx-bound schema version.
+        // TODO: IGNITE-20715 - validate that input rows schema version 
matches the tx-bound schema version.
 
         HybridTimestamp tsToWaitForSchemas;
 
@@ -2021,10 +2026,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     Map<UUID, BinaryRowMessage> convertedMap = 
rowsToInsert.entrySet().stream()
                             .collect(toMap(
                                     e -> e.getKey().uuid(),
-                                    e -> MSG_FACTORY.binaryRowMessage()
-                                            
.binaryTuple(e.getValue().tupleSlice())
-                                            
.schemaVersion(e.getValue().schemaVersion())
-                                            .build()
+                                    e -> binaryRowMessage(e.getValue())
                             ));
 
                     return allOf(insertLockFuts)
@@ -2075,14 +2077,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 }
 
                 return allOf(rowIdFuts).thenCompose(ignore -> {
-                    List<BinaryRowMessage> searchRowMessages = 
request.binaryRowMessages();
-                    Map<UUID, BinaryRowMessage> rowsToUpdate = 
IgniteUtils.newHashMap(searchRowMessages.size());
+                    Map<UUID, BinaryRowMessage> rowsToUpdate = 
IgniteUtils.newHashMap(searchRows.size());
                     List<RowId> rows = new ArrayList<>();
 
-                    for (int i = 0; i < searchRowMessages.size(); i++) {
+                    for (int i = 0; i < searchRows.size(); i++) {
                         RowId lockedRow = rowIdFuts[i].join().get1();
 
-                        rowsToUpdate.put(lockedRow.uuid(), 
searchRowMessages.get(i));
+                        rowsToUpdate.put(lockedRow.uuid(), 
binaryRowMessage(searchRows.get(i)));
 
                         rows.add(lockedRow);
                     }
@@ -3024,7 +3025,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             ReadWriteSwapRowReplicaRequest request,
             String txCoordinatorId
     ) {
-        BinaryRow newRow = request.binaryRow();
+        BinaryRow newRow = request.newBinaryRow();
         BinaryRow expectedRow = request.oldBinaryRow();
         TablePartitionIdMessage commitPartitionId = 
request.commitPartitionId();
 
@@ -3373,17 +3374,19 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         }
 
         if (row != null) {
-            BinaryRowMessage rowMessage = MSG_FACTORY.binaryRowMessage()
-                    .binaryTuple(row.tupleSlice())
-                    .schemaVersion(row.schemaVersion())
-                    .build();
-
-            bldr.rowMessage(rowMessage);
+            bldr.rowMessage(binaryRowMessage(row));
         }
 
         return bldr.build();
     }
 
+    private static BinaryRowMessage binaryRowMessage(BinaryRow row) {
+        return MSG_FACTORY.binaryRowMessage()
+                .binaryTuple(row.tupleSlice())
+                .schemaVersion(row.schemaVersion())
+                .build();
+    }
+
     private static UpdateAllCommand updateAllCommand(
             Map<UUID, BinaryRowMessage> rowsToUpdate,
             Map<UUID, HybridTimestamp> lastCommitTimes,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index cea79a364e..ab34b3265a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -21,6 +21,7 @@ import static 
it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_DELETE_ALL;
 import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET;
 import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET_ALL;
 import static 
org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
@@ -42,10 +43,12 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -87,12 +90,13 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
-import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.MultipleRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.MultipleRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowPkReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder;
 import 
org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest;
@@ -306,7 +310,9 @@ public class InternalTableImpl implements InternalTable {
     private <T> CompletableFuture<T> enlistInTx(
             Collection<BinaryRowEx> keyRows,
             @Nullable InternalTransaction tx,
-            IgnitePentaFunction<Collection<BinaryRow>, InternalTransaction, 
ReplicationGroupId, Long, Boolean, ReplicaRequest> fac,
+            IgnitePentaFunction<
+                    Collection<? extends BinaryRow>, InternalTransaction, 
ReplicationGroupId, Long, Boolean, ReplicaRequest
+            > fac,
             Function<Collection<RowBatch>, CompletableFuture<T>> reducer,
             BiPredicate<T, ReplicaRequest> noOpChecker
     ) {
@@ -727,6 +733,7 @@ public class InternalTableImpl implements InternalTable {
                     (groupId, consistencyToken) -> 
tableMessagesFactory.readOnlyDirectSingleRowReplicaRequest()
                             .groupId(groupId)
                             .enlistmentConsistencyToken(consistencyToken)
+                            .schemaVersion(keyRow.schemaVersion())
                             .primaryKey(keyRow.tupleSlice())
                             .requestType(RequestType.RO_GET)
                             .build()
@@ -743,6 +750,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
                         .groupId(groupId)
+                        .schemaVersion(keyRow.schemaVersion())
                         .primaryKey(keyRow.tupleSlice())
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
                         .transactionId(txo.id())
@@ -766,6 +774,7 @@ public class InternalTableImpl implements InternalTable {
 
         return replicaSvc.invoke(recipientNode, 
tableMessagesFactory.readOnlySingleRowPkReplicaRequest()
                 .groupId(partGroupId)
+                .schemaVersion(keyRow.schemaVersion())
                 .primaryKey(keyRow.tupleSlice())
                 .requestType(RequestType.RO_GET)
                 .readTimestampLong(readTimestamp.longValue())
@@ -808,7 +817,8 @@ public class InternalTableImpl implements InternalTable {
                     (groupId, consistencyToken) -> 
tableMessagesFactory.readOnlyDirectMultiRowReplicaRequest()
                             .groupId(groupId)
                             .enlistmentConsistencyToken(consistencyToken)
-                            .primaryKeys(serializePrimaryKeys(keyRows))
+                            
.schemaVersion(keyRows.iterator().next().schemaVersion())
+                            .primaryKeys(serializeBinaryTuples(keyRows))
                             .requestType(RequestType.RO_GET_ALL)
                             .build()
             );
@@ -824,16 +834,9 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 keyRows,
                 tx,
-                    (keyRows0, txo, groupId, term, full) -> 
tableMessagesFactory.readWriteMultiRowPkReplicaRequest()
-                .groupId(groupId)
-                .primaryKeys(serializePrimaryKeys(keyRows0))
-                
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                .transactionId(txo.id())
-                .term(term)
-                .requestType(RW_GET_ALL)
-                .timestampLong(clock.nowLong())
-                .full(full)
-                .build(),
+                    (keyRows0, txo, groupId, term, full) -> {
+                        return readWriteMultiRowPkReplicaRequest(RW_GET_ALL, 
keyRows0, txo, groupId, term, full);
+                    },
                 InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder,
                 (res, req) -> false
         );
@@ -853,7 +856,8 @@ public class InternalTableImpl implements InternalTable {
 
             ReadOnlyMultiRowPkReplicaRequest request = 
tableMessagesFactory.readOnlyMultiRowPkReplicaRequest()
                     .groupId(partGroupId)
-                    
.primaryKeys(serializePrimaryKeys(partitionRowBatch.getValue().requestedRows))
+                    
.schemaVersion(partitionRowBatch.getValue().requestedRows.get(0).schemaVersion())
+                    
.primaryKeys(serializeBinaryTuples(partitionRowBatch.getValue().requestedRows))
                     .requestType(RequestType.RO_GET_ALL)
                     .readTimestampLong(readTimestamp.longValue())
                     .build();
@@ -864,24 +868,60 @@ public class InternalTableImpl implements InternalTable {
         return 
collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values());
     }
 
-    private List<BinaryRowMessage> serializeBinaryRows(Collection<? extends 
BinaryRow> rows) {
-        var result = new ArrayList<BinaryRowMessage>(rows.size());
+    private ReadWriteMultiRowPkReplicaRequest 
readWriteMultiRowPkReplicaRequest(
+            RequestType requestType,
+            Collection<? extends BinaryRow> rows,
+            InternalTransaction tx,
+            ReplicationGroupId groupId,
+            Long term,
+            boolean full
+    ) {
+        assert allSchemaVersionsSame(rows) : "Different schema versions 
encountered: " + uniqueSchemaVersions(rows);
+
+        return tableMessagesFactory.readWriteMultiRowPkReplicaRequest()
+                .groupId(groupId)
+                
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
+                .schemaVersion(rows.iterator().next().schemaVersion())
+                .primaryKeys(serializeBinaryTuples(rows))
+                .transactionId(tx.id())
+                .term(term)
+                .requestType(requestType)
+                .timestampLong(clock.nowLong())
+                .full(full)
+                .build();
+    }
+
+    private static boolean allSchemaVersionsSame(Collection<? extends 
BinaryRow> rows) {
+        int schemaVersion = -1;
+        boolean first = true;
 
         for (BinaryRow row : rows) {
-            result.add(serializeBinaryRow(row));
+            if (first) {
+                schemaVersion = row.schemaVersion();
+                first = false;
+
+                continue;
+            }
+
+            if (row.schemaVersion() != schemaVersion) {
+                return false;
+            }
         }
 
-        return result;
+        return true;
     }
 
-    private BinaryRowMessage serializeBinaryRow(BinaryRow row) {
-        return tableMessagesFactory.binaryRowMessage()
-                .binaryTuple(row.tupleSlice())
-                .schemaVersion(row.schemaVersion())
-                .build();
+    private static Set<Integer> uniqueSchemaVersions(Collection<? extends 
BinaryRow> rows) {
+        Set<Integer> set = new HashSet<>();
+
+        for (BinaryRow row : rows) {
+            set.add(row.schemaVersion());
+        }
+
+        return set;
     }
 
-    private static List<ByteBuffer> serializePrimaryKeys(Collection<? extends 
BinaryRow> keys) {
+    private static List<ByteBuffer> serializeBinaryTuples(Collection<? extends 
BinaryRow> keys) {
         var result = new ArrayList<ByteBuffer>(keys.size());
 
         for (BinaryRow row : keys) {
@@ -907,7 +947,8 @@ public class InternalTableImpl implements InternalTable {
                 (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                        .binaryRowMessage(serializeBinaryRow(row))
+                        .schemaVersion(row.schemaVersion())
+                        .binaryTuple(row.tupleSlice())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_UPSERT)
@@ -957,7 +998,8 @@ public class InternalTableImpl implements InternalTable {
                 (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                        .binaryRowMessage(serializeBinaryRow(row))
+                        .schemaVersion(row.schemaVersion())
+                        .binaryTuple(row.tupleSlice())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_GET_AND_UPSERT)
@@ -977,7 +1019,8 @@ public class InternalTableImpl implements InternalTable {
                 (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                        .binaryRowMessage(serializeBinaryRow(row))
+                        .schemaVersion(row.schemaVersion())
+                        .binaryTuple(row.tupleSlice())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_INSERT)
@@ -994,16 +1037,9 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 rows,
                 tx,
-                (keyRows0, txo, groupId, term, full) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
-                        .groupId(groupId)
-                        
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                        .binaryRowMessages(serializeBinaryRows(keyRows0))
-                        .transactionId(txo.id())
-                        .term(term)
-                        .requestType(RequestType.RW_INSERT_ALL)
-                        .timestampLong(clock.nowLong())
-                        .full(full)
-                        .build(),
+                (keyRows0, txo, groupId, term, full) -> {
+                    return 
readWriteMultiRowReplicaRequest(RequestType.RW_INSERT_ALL, keyRows0, txo, 
groupId, term, full);
+                },
                 
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
                 (res, req) -> {
                     for (BinaryRow row : res) {
@@ -1018,6 +1054,29 @@ public class InternalTableImpl implements InternalTable {
         );
     }
 
+    private ReadWriteMultiRowReplicaRequest readWriteMultiRowReplicaRequest(
+            RequestType requestType,
+            Collection<? extends BinaryRow> rows,
+            InternalTransaction tx,
+            ReplicationGroupId groupId,
+            Long term,
+            boolean full
+    ) {
+        assert allSchemaVersionsSame(rows) : "Different schema versions 
encountered: " + uniqueSchemaVersions(rows);
+
+        return tableMessagesFactory.readWriteMultiRowReplicaRequest()
+                .groupId(groupId)
+                
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
+                .schemaVersion(rows.iterator().next().schemaVersion())
+                .binaryTuples(serializeBinaryTuples(rows))
+                .transactionId(tx.id())
+                .term(term)
+                .requestType(requestType)
+                .timestampLong(clock.nowLong())
+                .full(full)
+                .build();
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> replace(BinaryRowEx row, 
InternalTransaction tx) {
@@ -1027,7 +1086,8 @@ public class InternalTableImpl implements InternalTable {
                 (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                        .binaryRowMessage(serializeBinaryRow(row))
+                        .schemaVersion(row.schemaVersion())
+                        .binaryTuple(row.tupleSlice())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_REPLACE_IF_EXIST)
@@ -1041,14 +1101,18 @@ public class InternalTableImpl implements InternalTable 
{
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> replace(BinaryRowEx oldRow, BinaryRowEx 
newRow, InternalTransaction tx) {
+        assert oldRow.schemaVersion() == newRow.schemaVersion()
+                : "Mismatching schema versions: old " + oldRow.schemaVersion() 
+ ", new " + newRow.schemaVersion();
+
         return enlistInTx(
                 newRow,
                 tx,
                 (txo, groupId, term) -> 
tableMessagesFactory.readWriteSwapRowReplicaRequest()
                         .groupId(groupId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                        .oldBinaryRowMessage(serializeBinaryRow(oldRow))
-                        .binaryRowMessage(serializeBinaryRow(newRow))
+                        .schemaVersion(oldRow.schemaVersion())
+                        .oldBinaryTuple(oldRow.tupleSlice())
+                        .newBinaryTuple(newRow.tupleSlice())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_REPLACE)
@@ -1068,7 +1132,8 @@ public class InternalTableImpl implements InternalTable {
                 (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                        .binaryRowMessage(serializeBinaryRow(row))
+                        .schemaVersion(row.schemaVersion())
+                        .binaryTuple(row.tupleSlice())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_GET_AND_REPLACE)
@@ -1088,6 +1153,7 @@ public class InternalTableImpl implements InternalTable {
                 (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
                         .groupId(groupId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
+                        .schemaVersion(keyRow.schemaVersion())
                         .primaryKey(keyRow.tupleSlice())
                         .transactionId(txo.id())
                         .term(term)
@@ -1108,7 +1174,8 @@ public class InternalTableImpl implements InternalTable {
                 (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                        .binaryRowMessage(serializeBinaryRow(oldRow))
+                        .schemaVersion(oldRow.schemaVersion())
+                        .binaryTuple(oldRow.tupleSlice())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE_EXACT)
@@ -1128,6 +1195,7 @@ public class InternalTableImpl implements InternalTable {
                 (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
                         .groupId(groupId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
+                        .schemaVersion(row.schemaVersion())
                         .primaryKey(row.tupleSlice())
                         .transactionId(txo.id())
                         .term(term)
@@ -1145,16 +1213,9 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 rows,
                 tx,
-                (keyRows0, txo, groupId, term, full) -> 
tableMessagesFactory.readWriteMultiRowPkReplicaRequest()
-                        .groupId(groupId)
-                        
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                        .primaryKeys(serializePrimaryKeys(keyRows0))
-                        .transactionId(txo.id())
-                        .term(term)
-                        .requestType(RequestType.RW_DELETE_ALL)
-                        .timestampLong(clock.nowLong())
-                        .full(full)
-                        .build(),
+                (keyRows0, txo, groupId, term, full) -> {
+                    return readWriteMultiRowPkReplicaRequest(RW_DELETE_ALL, 
keyRows0, txo, groupId, term, full);
+                },
                 
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
                 (res, req) -> {
                     for (BinaryRow row : res) {
@@ -1178,16 +1239,9 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 rows,
                 tx,
-                (keyRows0, txo, groupId, term, full) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
-                        .groupId(groupId)
-                        
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                        .binaryRowMessages(serializeBinaryRows(keyRows0))
-                        .transactionId(txo.id())
-                        .term(term)
-                        .requestType(RequestType.RW_DELETE_EXACT_ALL)
-                        .timestampLong(clock.nowLong())
-                        .full(full)
-                        .build(),
+                (keyRows0, txo, groupId, term, full) -> {
+                    return 
readWriteMultiRowReplicaRequest(RequestType.RW_DELETE_EXACT_ALL, keyRows0, txo, 
groupId, term, full);
+                },
                 
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
                 (res, req) -> {
                     for (BinaryRow row : res) {
@@ -1978,16 +2032,7 @@ public class InternalTableImpl implements InternalTable {
     ) {
         assert serializeTablePartitionId(txo.commitPartition()) != null;
 
-        return tableMessagesFactory.readWriteMultiRowReplicaRequest()
-                .groupId(groupId)
-                
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                .binaryRowMessages(serializeBinaryRows(keyRows0))
-                .transactionId(txo.id())
-                .term(term)
-                .requestType(RequestType.RW_UPSERT_ALL)
-                .timestampLong(clock.nowLong())
-                .full(full)
-                .build();
+        return readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL, 
keyRows0, txo, groupId, term, full);
     }
 
     @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index bcd5149663..41d46aa1b2 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.table.distributed.replication;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.table.distributed.replication.PartitionReplicaListenerTest.binaryRowsToBuffers;
 import static 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener.tablePartitionId;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
@@ -307,6 +308,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                         .term(1L)
                         .commitPartitionId(tablePartitionId(PARTITION_ID))
                         .transactionId(TRANSACTION_ID)
+                        .schemaVersion(testPk.schemaVersion())
                         .primaryKey(testPk.tupleSlice())
                         .requestType(arg.type)
                         .build();
@@ -324,7 +326,8 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                         .term(1L)
                         .commitPartitionId(tablePartitionId(PARTITION_ID))
                         .transactionId(TRANSACTION_ID)
-                        .binaryRowMessage(binaryRowMessage(testBinaryRow))
+                        .schemaVersion(testBinaryRow.schemaVersion())
+                        .binaryTuple(testBinaryRow.tupleSlice())
                         .requestType(arg.type)
                         .build();
                 break;
@@ -387,6 +390,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                         .term(1L)
                         .commitPartitionId(tablePartitionId(PARTITION_ID))
                         .transactionId(TRANSACTION_ID)
+                        .schemaVersion(pks.iterator().next().schemaVersion())
                         
.primaryKeys(pks.stream().map(BinaryRow::tupleSlice).collect(toList()))
                         .requestType(arg.type)
                         .build();
@@ -401,7 +405,8 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                         .term(1L)
                         .commitPartitionId(tablePartitionId(PARTITION_ID))
                         .transactionId(TRANSACTION_ID)
-                        
.binaryRowMessages(rows.stream().map(PartitionReplicaListenerIndexLockingTest::binaryRowMessage).collect(toList()))
+                        .schemaVersion(rows.iterator().next().schemaVersion())
+                        .binaryTuples(binaryRowsToBuffers(rows))
                         .requestType(arg.type)
                         .build();
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 029ae0cc5f..bb1c861bfd 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -587,34 +587,34 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     public void testReadOnlySingleRowReplicaRequestEmptyResult() throws 
Exception {
         BinaryRow testBinaryKey = nextBinaryKey();
 
-        ByteBuffer pk = testBinaryKey.tupleSlice();
-
-        CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(pk);
+        CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
 
         BinaryRow binaryRow = (BinaryRow) fut.get(1, 
TimeUnit.SECONDS).result();
 
         assertNull(binaryRow);
     }
 
-    private CompletableFuture<ReplicaResult> doReadOnlySingleGet(ByteBuffer 
pk) {
+    private CompletableFuture<ReplicaResult> doReadOnlySingleGet(BinaryRow pk) 
{
         return doReadOnlySingleGet(pk, clock.now());
     }
 
-    private CompletableFuture<ReplicaResult> doReadOnlySingleGet(ByteBuffer 
pk, HybridTimestamp readTimestamp) {
+    private CompletableFuture<ReplicaResult> doReadOnlySingleGet(BinaryRow pk, 
HybridTimestamp readTimestamp) {
         ReadOnlySingleRowPkReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
                 .groupId(grpId)
                 .readTimestampLong(readTimestamp.longValue())
-                .primaryKey(pk)
+                .schemaVersion(pk.schemaVersion())
+                .primaryKey(pk.tupleSlice())
                 .requestType(RequestType.RO_GET)
                 .build();
 
         return partitionReplicaListener.invoke(request, localNode.id());
     }
 
-    private CompletableFuture<ReplicaResult> 
doReadOnlyDirectSingleGet(ByteBuffer pk) {
+    private CompletableFuture<ReplicaResult> 
doReadOnlyDirectSingleGet(BinaryRow pk) {
         ReadOnlyDirectSingleRowReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
                 .groupId(grpId)
-                .primaryKey(pk)
+                .schemaVersion(pk.schemaVersion())
+                .primaryKey(pk.tupleSlice())
                 .requestType(RequestType.RO_GET)
                 .enlistmentConsistencyToken(1L)
                 .build();
@@ -633,7 +633,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
         testMvPartitionStorage.commitWrite(rowId, clock.now());
 
-        CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey.tupleSlice());
+        CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
 
         BinaryRow binaryRow = (BinaryRow) fut.get(1, 
TimeUnit.SECONDS).result();
 
@@ -652,7 +652,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
         txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, 
localNode.id(), clock.now()));
 
-        CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey.tupleSlice());
+        CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
 
         BinaryRow binaryRow = (BinaryRow) fut.get(1, 
TimeUnit.SECONDS).result();
 
@@ -670,7 +670,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
         txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, 
localNode.id(), null));
 
-        CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey.tupleSlice());
+        CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
 
         BinaryRow binaryRow = (BinaryRow) fut.get(1, 
TimeUnit.SECONDS).result();
 
@@ -689,7 +689,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
         txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED, 
localNode.id(), null));
 
-        CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey.tupleSlice());
+        CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
 
         BinaryRow binaryRow = (BinaryRow) fut.get(1, 
TimeUnit.SECONDS).result();
 
@@ -1111,7 +1111,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .groupId(grpId)
                         .transactionId(txId)
                         .requestType(requestType)
-                        .binaryRowMessage(binaryRowMessage(binaryRow))
+                        .schemaVersion(binaryRow.schemaVersion())
+                        .binaryTuple(binaryRow.tupleSlice())
                         .term(1L)
                         .commitPartitionId(commitPartitionId())
                         .full(full)
@@ -1129,6 +1130,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .groupId(grpId)
                         .transactionId(txId)
                         .requestType(requestType)
+                        .schemaVersion(binaryRow.schemaVersion())
                         .primaryKey(binaryRow.tupleSlice())
                         .term(1L)
                         .commitPartitionId(commitPartitionId())
@@ -1154,7 +1156,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .groupId(grpId)
                         .transactionId(txId)
                         .requestType(requestType)
-                        
.binaryRowMessages(binaryRows.stream().map(PartitionReplicaListenerTest::binaryRowMessage).collect(toList()))
+                        
.schemaVersion(binaryRows.iterator().next().schemaVersion())
+                        .binaryTuples(binaryRowsToBuffers(binaryRows))
                         .term(1L)
                         .commitPartitionId(commitPartitionId())
                         .full(full)
@@ -1163,6 +1166,10 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         );
     }
 
+    static List<ByteBuffer> binaryRowsToBuffers(Collection<BinaryRow> 
binaryRows) {
+        return 
binaryRows.stream().map(BinaryRow::tupleSlice).collect(toList());
+    }
+
     private CompletableFuture<?> doMultiRowPkRequest(UUID txId, 
Collection<BinaryRow> binaryRows, RequestType requestType) {
         return doMultiRowPkRequest(txId, binaryRows, requestType, false);
     }
@@ -1172,7 +1179,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .groupId(grpId)
                         .transactionId(txId)
                         .requestType(requestType)
-                        
.primaryKeys(binaryRows.stream().map(BinaryRow::tupleSlice).collect(toList()))
+                        
.schemaVersion(binaryRows.iterator().next().schemaVersion())
+                        .primaryKeys(binaryRowsToBuffers(binaryRows))
                         .term(1L)
                         .commitPartitionId(commitPartitionId())
                         .full(full)
@@ -1195,7 +1203,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                             .groupId(grpId)
                             .transactionId(txId)
                             .requestType(RequestType.RW_INSERT)
-                            .binaryRowMessage(binaryRowMessage(binaryRow))
+                            .schemaVersion(binaryRow.schemaVersion())
+                            .binaryTuple(binaryRow.tupleSlice())
                             .term(1L)
                             .commitPartitionId(commitPartitionId())
                             .build();
@@ -1222,7 +1231,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                             .groupId(grpId)
                             .transactionId(txId)
                             .requestType(RequestType.RW_UPSERT_ALL)
-                            
.binaryRowMessages(asList(binaryRowMessage(binaryRow0), 
binaryRowMessage(binaryRow1)))
+                            .schemaVersion(binaryRow0.schemaVersion())
+                            .binaryTuples(asList(binaryRow0.tupleSlice(), 
binaryRow1.tupleSlice()))
                             .term(1L)
                             .commitPartitionId(commitPartitionId())
                             .build();
@@ -1681,8 +1691,9 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .groupId(grpId)
                         .transactionId(targetTxId)
                         .requestType(RequestType.RW_REPLACE)
-                        .oldBinaryRowMessage(binaryRowMessage(oldRow))
-                        .binaryRowMessage(binaryRowMessage(newRow))
+                        .schemaVersion(oldRow.schemaVersion())
+                        .oldBinaryTuple(oldRow.tupleSlice())
+                        .newBinaryTuple(newRow.tupleSlice())
                         .term(1L)
                         .commitPartitionId(commitPartitionId())
                         .full(full)
@@ -2149,9 +2160,9 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     ) {
         testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, 
readTimestamp, key) -> {
             if (direct) {
-                return doReadOnlyDirectSingleGet(marshalQuietly(key, 
kvMarshaller).tupleSlice());
+                return doReadOnlyDirectSingleGet(marshalQuietly(key, 
kvMarshaller));
             } else {
-                return doReadOnlySingleGet(marshalQuietly(key, 
kvMarshaller).tupleSlice(), readTimestamp);
+                return doReadOnlySingleGet(marshalQuietly(key, kvMarshaller), 
readTimestamp);
             }
         });
     }
@@ -2254,7 +2265,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .groupId(grpId)
                         .requestType(RequestType.RW_UPSERT)
                         .transactionId(txId)
-                        .binaryRowMessage(binaryRowMessage(row))
+                        .schemaVersion(row.schemaVersion())
+                        .binaryTuple(row.tupleSlice())
                         .term(1L)
                         .commitPartitionId(commitPartitionId())
                         .build(),
@@ -2267,6 +2279,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .groupId(grpId)
                         .requestType(RequestType.RW_DELETE)
                         .transactionId(txId)
+                        .schemaVersion(row.schemaVersion())
                         .primaryKey(row.tupleSlice())
                         .term(1L)
                         .commitPartitionId(commitPartitionId())
@@ -2280,6 +2293,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .groupId(grpId)
                         .requestType(RequestType.RO_GET)
                         .readTimestampLong(readTimestamp)
+                        .schemaVersion(row.schemaVersion())
                         .primaryKey(row.tupleSlice())
                         .build(),
                 localNode.id()
@@ -2301,7 +2315,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .groupId(grpId)
                 .requestType(RequestType.RO_GET_ALL)
                 .readTimestampLong(readTimestamp.longValue())
-                
.primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList()))
+                .schemaVersion(rows.iterator().next().schemaVersion())
+                .primaryKeys(binaryRowsToBuffers(rows))
                 .build();
 
         return partitionReplicaListener.invoke(request, localNode.id());
@@ -2311,7 +2326,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         ReadOnlyDirectMultiRowReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
                 .groupId(grpId)
                 .requestType(RequestType.RO_GET_ALL)
-                
.primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList()))
+                .schemaVersion(rows.iterator().next().schemaVersion())
+                .primaryKeys(binaryRowsToBuffers(rows))
                 .enlistmentConsistencyToken(1L)
                 .build();
 

Reply via email to