This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ebb092f0ce IGNITE-20609 Move last commit timestamp into 
BinaryRowMessage (#2749)
ebb092f0ce is described below

commit ebb092f0ced589657411d23277606250ef467c6e
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Tue Oct 31 16:52:45 2023 +0300

    IGNITE-20609 Move last commit timestamp into BinaryRowMessage (#2749)
---
 .../ignite/distributed/ItTablePersistenceTest.java |  8 +-
 .../ignite/internal/table/ItColocationTest.java    | 21 +++--
 .../table/distributed/StorageUpdateHandler.java    | 16 ++--
 .../table/distributed/TableMessageGroup.java       |  8 ++
 ...dateCommand.java => TimedBinaryRowMessage.java} | 47 +++++------
 .../distributed/command/UpdateAllCommand.java      | 25 +++---
 .../table/distributed/command/UpdateCommand.java   | 20 ++---
 .../table/distributed/raft/PartitionListener.java  | 14 +++-
 .../replicator/PartitionReplicaListener.java       | 92 +++++++++++-----------
 .../distributed/replicator/TimedBinaryRow.java     |  4 +-
 .../internal/table/distributed/IndexBaseTest.java  |  7 +-
 .../distributed/StorageUpdateHandlerTest.java      |  4 +-
 .../PartitionRaftCommandsSerializationTest.java    | 49 ++++++++----
 .../raft/PartitionCommandListenerTest.java         | 37 ++++++---
 14 files changed, 193 insertions(+), 159 deletions(-)

diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 1297653efc..fddf2b1e69 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -310,12 +310,12 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                         .txId(req0.transactionId())
                         .tablePartitionId(tablePartitionId(new 
TablePartitionId(1, 0)))
                         .rowUuid(new RowId(0).uuid())
-                        .rowMessage(
-                                msgFactory.binaryRowMessage()
+                        .messageRowToUpdate(msgFactory.timedBinaryRowMessage()
+                                .binaryRowMessage(msgFactory.binaryRowMessage()
                                         .schemaVersion(req0.schemaVersion())
                                         .binaryTuple(req0.binaryTuple())
-                                        .build()
-                        )
+                                        .build())
+                                .build())
                         .safeTimeLong(hybridClock.nowLong())
                         .txCoordinatorId(UUID.randomUUID().toString())
                         .build();
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 611d33fe36..2b0fe309f9 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -50,7 +50,6 @@ import java.util.Set;
 import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -73,6 +72,7 @@ import 
org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
@@ -207,9 +207,14 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
 
             if (request instanceof ReadWriteMultiRowReplicaRequest) {
                 ReadWriteMultiRowReplicaRequest multiRowReplicaRequest = 
(ReadWriteMultiRowReplicaRequest) request;
-                Map<UUID, BinaryRowMessage> rows = 
multiRowReplicaRequest.binaryTuples().stream()
-                        .map(tupleBuffer -> binaryRowMessage(tupleBuffer, 
multiRowReplicaRequest))
-                        .collect(toMap(row -> 
TestTransactionIds.newTransactionId(), Function.identity()));
+
+                Map<UUID, TimedBinaryRowMessage> rows = 
multiRowReplicaRequest.binaryTuples().stream()
+                        .collect(
+                                toMap(tupleBuffer -> 
TestTransactionIds.newTransactionId(),
+                                        tupleBuffer -> 
MSG_FACTORY.timedBinaryRowMessage()
+                                                
.binaryRowMessage(binaryRowMessage(tupleBuffer, multiRowReplicaRequest))
+                                                .build())
+                        );
 
                 return r.run(MSG_FACTORY.updateAllCommand()
                                 
.tablePartitionId(MSG_FACTORY.tablePartitionIdMessage()
@@ -217,7 +222,7 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                                         
.partitionId(commitPartId.partitionId())
                                         .build()
                                 )
-                            .rowsToUpdate(rows)
+                            .messageRowsToUpdate(rows)
                             .txId(UUID.randomUUID())
                             .txCoordinatorId(node.id())
                             .build());
@@ -234,7 +239,9 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                                         .build()
                         )
                         .rowUuid(UUID.randomUUID())
-                        
.rowMessage(binaryRowMessage(singleRowReplicaRequest.binaryTuple(), 
singleRowReplicaRequest))
+                        .messageRowToUpdate(MSG_FACTORY.timedBinaryRowMessage()
+                                
.binaryRowMessage(binaryRowMessage(singleRowReplicaRequest.binaryTuple(), 
singleRowReplicaRequest))
+                                .build())
                         .txId(TestTransactionIds.newTransactionId())
                         .txCoordinatorId(node.id())
                         .build());
@@ -390,7 +397,7 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
             assertEquals(partsMap.get(p), cmd.rowsToUpdate().size(), () -> 
"part=" + p + ", set=" + set);
 
             cmd.rowsToUpdate().values().forEach(rowMessage -> {
-                Row r = Row.wrapBinaryRow(schema, rowMessage.asBinaryRow());
+                Row r = Row.wrapBinaryRow(schema, rowMessage.binaryRow());
 
                 assertEquals(intTable.partition(r), p);
             });
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 155d8e792d..b7c4f9f367 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -35,8 +35,8 @@ import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
-import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replicator.PendingRows;
+import org.apache.ignite.internal.table.distributed.replicator.TimedBinaryRow;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
@@ -167,16 +167,14 @@ public class StorageUpdateHandler {
      * @param trackWriteIntent If {@code true} then write intent should be 
tracked.
      * @param onApplication Callback on application.
      * @param commitTs Commit timestamp to use on autocommit.
-     * @param lastCommitTsMap A map(Row Id -> timestamp) of timestamps of the 
most recent commits to the affected rows.
      */
     public void handleUpdateAll(
             UUID txId,
-            Map<UUID, BinaryRowMessage> rowsToUpdate,
+            Map<UUID, TimedBinaryRow> rowsToUpdate,
             TablePartitionId commitPartitionId,
             boolean trackWriteIntent,
             @Nullable Runnable onApplication,
-            @Nullable HybridTimestamp commitTs,
-            Map<UUID, HybridTimestamp> lastCommitTsMap
+            @Nullable HybridTimestamp commitTs
     ) {
         indexUpdateHandler.waitIndexes();
 
@@ -188,15 +186,15 @@ public class StorageUpdateHandler {
                 List<RowId> rowIds = new ArrayList<>();
 
                 // Sort IDs to prevent deadlock. Natural UUID order matches 
RowId order within the same partition.
-                SortedMap<UUID, BinaryRowMessage> sortedRowsToUpdateMap = new 
TreeMap<>(rowsToUpdate);
+                SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new 
TreeMap<>(rowsToUpdate);
 
-                for (Map.Entry<UUID, BinaryRowMessage> entry : 
sortedRowsToUpdateMap.entrySet()) {
+                for (Map.Entry<UUID, TimedBinaryRow> entry : 
sortedRowsToUpdateMap.entrySet()) {
                     RowId rowId = new RowId(partitionId, entry.getKey());
-                    BinaryRow row = entry.getValue() == null ? null : 
entry.getValue().asBinaryRow();
+                    BinaryRow row = entry.getValue() == null ? null : 
entry.getValue().binaryRow();
 
                     locker.lock(rowId);
 
-                    performStorageCleanupIfNeeded(txId, rowId, 
lastCommitTsMap.get(entry.getKey()));
+                    performStorageCleanupIfNeeded(txId, rowId, 
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
 
                     if (commitTs != null) {
                         storage.addWriteCommitted(rowId, row, commitTs);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 38ddd35f0c..9e697fa2d5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.table.distributed.TableMessageGroup.GRO
 import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import 
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
@@ -37,6 +38,8 @@ import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest;
@@ -175,6 +178,11 @@ public interface TableMessageGroup {
      */
     short RO_DIRECT_MULTI_ROW_REPLICA_REQUEST = 23;
 
+    /**
+     * Message type for {@link TimedBinaryRowMessage}.
+     */
+    short TIMED_BINARY_ROW_MESSAGE = 24;
+
     /**
      * Message types for Table module RAFT commands.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TimedBinaryRowMessage.java
similarity index 54%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TimedBinaryRowMessage.java
index 3e47dfd574..602650c0b1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TimedBinaryRowMessage.java
@@ -17,47 +17,38 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
-
-import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
+import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.annotations.Transferable;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * State machine command to update a row specified by a row id.
+ * The message type represent a binary row and a timestamp.
  */
-@Transferable(TableMessageGroup.Commands.UPDATE)
-public interface UpdateCommand extends PartitionCommand {
-    TablePartitionIdMessage tablePartitionId();
-
-    UUID rowUuid();
-
-    @Nullable
-    BinaryRowMessage rowMessage();
-
-    String txCoordinatorId();
-
-    /** Returns the row to update or {@code null} if the row should be 
removed. */
-    @Nullable
-    default BinaryRow row() {
-        BinaryRowMessage message = rowMessage();
-
-        return message == null ? null : message.asBinaryRow();
-    }
+@Transferable(TableMessageGroup.TIMED_BINARY_ROW_MESSAGE)
+public interface TimedBinaryRowMessage extends NetworkMessage {
+    /**
+     * Gets a binary row message.
+     *
+     * @return binary row message.
+     */
+    @Nullable BinaryRowMessage binaryRowMessage();
 
     /**
-     * Returns the timestamp of the last committed entry.
+     * Gets a timestamp.
+     *
+     * @return Timestamp as long.
      */
-    long lastCommitTimestampLong();
+    long timestamp();
 
     /**
-     * Returns the timestamp of the last committed entry.
+     * Gets a binary row form this message or {@code null} if the binary row 
message is {@code null}.
+     *
+     * @return Binary row or {@code null}.
      */
-    default @Nullable HybridTimestamp lastCommitTimestamp() {
-        return nullableHybridTimestamp(lastCommitTimestampLong());
+    default @Nullable BinaryRow binaryRow() {
+        return binaryRowMessage() == null ? null : 
binaryRowMessage().asBinaryRow();
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
index 4d75fd22be..e230361c4f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
@@ -17,16 +17,15 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
+import org.apache.ignite.internal.table.distributed.replicator.TimedBinaryRow;
+import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.network.annotations.Transferable;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * State machine command for updating a batch of entries.
@@ -35,23 +34,23 @@ import org.jetbrains.annotations.Nullable;
 public interface UpdateAllCommand extends PartitionCommand {
     TablePartitionIdMessage tablePartitionId();
 
-    Map<UUID, BinaryRowMessage> rowsToUpdate();
+    Map<UUID, TimedBinaryRowMessage> messageRowsToUpdate();
 
     String txCoordinatorId();
 
-    // TODO: IGNITE-20609 row id in this map duplicates row id in rowsToUpdate.
-    @Nullable Map<UUID, Long> lastCommitTimestampsLong();
-
     /**
      * Returns the timestamps of the last committed entries for each row.
      */
-    default Map<UUID, HybridTimestamp> lastCommitTimestamps() {
-        Map<UUID, HybridTimestamp> map = new HashMap<>();
+    default Map<UUID, TimedBinaryRow> rowsToUpdate() {
+        Map<UUID, TimedBinaryRow> map = new HashMap<>();
+
+        Map<UUID, TimedBinaryRowMessage> timedRowMap = messageRowsToUpdate();
 
-        Map<UUID, Long> uuidLongMap = lastCommitTimestampsLong();
-        if (uuidLongMap != null) {
-            uuidLongMap.forEach((uuid, ts) -> map.put(uuid, 
hybridTimestamp(ts)));
+        if (!CollectionUtils.nullOrEmpty(timedRowMap)) {
+            timedRowMap.forEach(
+                    (uuid, trMsg) -> map.put(uuid, new 
TimedBinaryRow(trMsg.binaryRow(), nullableHybridTimestamp(trMsg.timestamp()))));
         }
+
         return map;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
index 3e47dfd574..f283fa6ceb 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
@@ -23,7 +23,6 @@ import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.network.annotations.Transferable;
 import org.jetbrains.annotations.Nullable;
 
@@ -36,28 +35,23 @@ public interface UpdateCommand extends PartitionCommand {
 
     UUID rowUuid();
 
-    @Nullable
-    BinaryRowMessage rowMessage();
+    @Nullable TimedBinaryRowMessage messageRowToUpdate();
 
     String txCoordinatorId();
 
     /** Returns the row to update or {@code null} if the row should be 
removed. */
-    @Nullable
-    default BinaryRow row() {
-        BinaryRowMessage message = rowMessage();
+    default @Nullable BinaryRow rowToUpdate() {
+        TimedBinaryRowMessage tsRoMsg = messageRowToUpdate();
 
-        return message == null ? null : message.asBinaryRow();
+        return tsRoMsg == null ? null : tsRoMsg.binaryRow();
     }
 
-    /**
-     * Returns the timestamp of the last committed entry.
-     */
-    long lastCommitTimestampLong();
-
     /**
      * Returns the timestamp of the last committed entry.
      */
     default @Nullable HybridTimestamp lastCommitTimestamp() {
-        return nullableHybridTimestamp(lastCommitTimestampLong());
+        TimedBinaryRowMessage tsRoMsg = messageRowToUpdate();
+
+        return tsRoMsg == null ? null : 
nullableHybridTimestamp(tsRoMsg.timestamp());
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 9456db2c27..3713402199 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -235,7 +235,11 @@ public class PartitionListener implements 
RaftGroupListener {
         // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper 
storage/raft index handling is required.
         synchronized (safeTime) {
             if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), 
cmd.tablePartitionId().asTablePartitionId(), cmd.row(),
+                storageUpdateHandler.handleUpdate(
+                        cmd.txId(),
+                        cmd.rowUuid(),
+                        cmd.tablePartitionId().asTablePartitionId(),
+                        cmd.rowToUpdate(),
                         !cmd.full(),
                         () -> storage.lastApplied(commandIndex, commandTerm),
                         cmd.full() ? cmd.safeTime() : null,
@@ -264,11 +268,13 @@ public class PartitionListener implements 
RaftGroupListener {
         // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper 
storage/raft index handling is required.
         synchronized (safeTime) {
             if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                storageUpdateHandler.handleUpdateAll(cmd.txId(), 
cmd.rowsToUpdate(), cmd.tablePartitionId().asTablePartitionId(),
+                storageUpdateHandler.handleUpdateAll(
+                        cmd.txId(),
+                        cmd.rowsToUpdate(),
+                        cmd.tablePartitionId().asTablePartitionId(),
                         !cmd.full(),
                         () -> storage.lastApplied(commandIndex, commandTerm),
-                        cmd.full() ? cmd.safeTime() : null,
-                        cmd.lastCommitTimestamps()
+                        cmd.full() ? cmd.safeTime() : null
                 );
 
                 updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 94a16bd373..1ca4fd8b1e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.table.distributed.replicator;
 
 import static java.util.Collections.emptyList;
-import static java.util.Collections.emptyMap;
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -26,6 +25,7 @@ import static 
java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.tx.TxState.ABANDONED;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
@@ -50,7 +50,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -115,6 +114,8 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import 
org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuilder;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import 
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
+import 
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessageBuilder;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
@@ -2006,7 +2007,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 }
 
                 return allOf(deleteExactLockFuts).thenCompose(ignore -> {
-                    Map<UUID, BinaryRowMessage> rowIdsToDelete = new 
HashMap<>();
+                    Map<UUID, TimedBinaryRowMessage> rowIdsToDelete = new 
HashMap<>();
                     // TODO:IGNITE-20669 Replace the result to BitSet.
                     Collection<BinaryRow> result = new ArrayList<>();
                     List<RowId> rows = new ArrayList<>();
@@ -2015,7 +2016,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         RowId lockedRowId = deleteExactLockFuts[i].join();
 
                         if (lockedRowId != null) {
-                            rowIdsToDelete.put(lockedRowId.uuid(), null);
+                            rowIdsToDelete.put(lockedRowId.uuid(), 
MSG_FACTORY.timedBinaryRowMessage()
+                                    
.timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRowId.uuid())))
+                                    .build());
 
                             result.add(new NullBinaryRow());
 
@@ -2035,7 +2038,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     catalogVersion -> applyUpdateAllCommand(
                                             request,
                                             rowIdsToDelete,
-                                            lastCommitTimes,
                                             txCoordinatorId,
                                             catalogVersion
                                     )
@@ -2087,10 +2089,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         insertLockFuts[idx++] = 
takeLocksForInsert(entry.getValue(), entry.getKey(), txId);
                     }
 
-                    Map<UUID, BinaryRowMessage> convertedMap = 
rowsToInsert.entrySet().stream()
+                    Map<UUID, TimedBinaryRowMessage> convertedMap = 
rowsToInsert.entrySet().stream()
                             .collect(toMap(
                                     e -> e.getKey().uuid(),
-                                    e -> binaryRowMessage(e.getValue())
+                                    e -> MSG_FACTORY.timedBinaryRowMessage()
+                                            
.binaryRowMessage(binaryRowMessage(e.getValue()))
+                                            .build()
                             ));
 
                     return allOf(insertLockFuts)
@@ -2101,7 +2105,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             .thenCompose(catalogVersion -> 
applyUpdateAllCommand(
                                             request,
                                             convertedMap,
-                                            emptyMap(),
                                             txCoordinatorId,
                                             catalogVersion
                                     )
@@ -2141,13 +2144,17 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 }
 
                 return allOf(rowIdFuts).thenCompose(ignore -> {
-                    Map<UUID, BinaryRowMessage> rowsToUpdate = 
IgniteUtils.newHashMap(searchRows.size());
+                    Map<UUID, TimedBinaryRowMessage> rowsToUpdate = 
IgniteUtils.newHashMap(searchRows.size());
                     List<RowId> rows = new ArrayList<>();
 
                     for (int i = 0; i < searchRows.size(); i++) {
                         RowId lockedRow = rowIdFuts[i].join().get1();
 
-                        rowsToUpdate.put(lockedRow.uuid(), 
binaryRowMessage(searchRows.get(i)));
+                        rowsToUpdate.put(lockedRow.uuid(),
+                                MSG_FACTORY.timedBinaryRowMessage()
+                                        
.binaryRowMessage(binaryRowMessage(searchRows.get(i)))
+                                        
.timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRow.uuid())))
+                                        .build());
 
                         rows.add(lockedRow);
                     }
@@ -2162,7 +2169,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     catalogVersion -> applyUpdateAllCommand(
                                             request,
                                             rowsToUpdate,
-                                            lastCommitTimes,
                                             txCoordinatorId,
                                             catalogVersion
                                     )
@@ -2251,7 +2257,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 }
 
                 return allOf(rowIdLockFuts).thenCompose(ignore -> {
-                    Map<UUID, BinaryRowMessage> rowIdsToDelete = new 
HashMap<>();
+                    Map<UUID, TimedBinaryRowMessage> rowIdsToDelete = new 
HashMap<>();
                     // TODO:IGNITE-20669 Replace the result to BitSet.
                     Collection<BinaryRow> result = new ArrayList<>();
                     List<RowId> rows = new ArrayList<>();
@@ -2260,7 +2266,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         RowId lockedRowId = lockFut.join();
 
                         if (lockedRowId != null) {
-                            rowIdsToDelete.put(lockedRowId.uuid(), null);
+                            rowIdsToDelete.put(lockedRowId.uuid(), 
MSG_FACTORY.timedBinaryRowMessage()
+                                    
.timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRowId.uuid())))
+                                    .build());
 
                             rows.add(lockedRowId);
 
@@ -2279,7 +2287,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             .thenCompose(
                                     catalogVersion -> applyUpdateAllCommand(
                                             rowIdsToDelete,
-                                            lastCommitTimes,
                                             request.commitPartitionId(),
                                             request.transactionId(),
                                             request.full(),
@@ -2384,7 +2391,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                 cmd.txId(),
                                 cmd.rowUuid(),
                                 cmd.tablePartitionId().asTablePartitionId(),
-                                cmd.row(),
+                                cmd.rowToUpdate(),
                                 true,
                                 null,
                                 null,
@@ -2409,7 +2416,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             cmd.txId(),
                             cmd.rowUuid(),
                             cmd.tablePartitionId().asTablePartitionId(),
-                            cmd.row(),
+                            cmd.rowToUpdate(),
                             false,
                             null,
                             cmd.safeTime(),
@@ -2455,8 +2462,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /**
      * Executes an UpdateAll command.
      *
-     * @param rowsToUpdate All {@link BinaryRow}s represented as {@link 
ByteBuffer}s to be updated.
-     * @param lastCommitTimes All timestamps of the last committed entries for 
each row.
+     * @param rowsToUpdate All {@link BinaryRow}s represented as {@link 
TimedBinaryRowMessage}s to be updated.
      * @param commitPartitionId Partition ID that these rows belong to.
      * @param transactionId Transaction ID.
      * @param full {@code true} if this is a single-command transaction.
@@ -2466,8 +2472,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Raft future, see {@link 
#applyCmdWithExceptionHandling(Command)}.
      */
     private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
-            Map<UUID, BinaryRowMessage> rowsToUpdate,
-            Map<UUID, HybridTimestamp> lastCommitTimes,
+            Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
             TablePartitionIdMessage commitPartitionId,
             UUID transactionId,
             boolean full,
@@ -2478,7 +2483,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         synchronized (commandProcessingLinearizationMutex) {
             UpdateAllCommand cmd = updateAllCommand(
                     rowsToUpdate,
-                    lastCommitTimes,
                     commitPartitionId,
                     transactionId,
                     hybridClock.now(),
@@ -2498,8 +2502,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     
cmd.tablePartitionId().asTablePartitionId(),
                                     true,
                                     null,
-                                    null,
-                                    emptyMap());
+                                    null
+                            );
 
                             
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
                         }
@@ -2525,8 +2529,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     
cmd.tablePartitionId().asTablePartitionId(),
                                     true,
                                     null,
-                                    null,
-                                    emptyMap());
+                                    null
+                            );
 
                             
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
                         }
@@ -2548,8 +2552,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             cmd.tablePartitionId().asTablePartitionId(),
                             false,
                             null,
-                            cmd.safeTime(),
-                            emptyMap());
+                            cmd.safeTime()
+                    );
 
                     return null;
                 });
@@ -2561,22 +2565,19 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * Executes an UpdateAll command.
      *
      * @param request Read write multi rows replica request.
-     * @param rowsToUpdate All {@link BinaryRow}s represented as {@link 
ByteBuffer}s to be updated.
-     * @param lastCommitTimes All timestamps of the last committed entries for 
each row.
+     * @param rowsToUpdate All {@link BinaryRow}s represented as {@link 
TimedBinaryRowMessage}s to be updated.
      * @param txCoordinatorId Transaction coordinator id.
      * @param catalogVersion Validated catalog version associated with given 
operation.
      * @return Raft future, see {@link 
#applyCmdWithExceptionHandling(Command)}.
      */
     private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
             ReadWriteMultiRowReplicaRequest request,
-            Map<UUID, BinaryRowMessage> rowsToUpdate,
-            Map<UUID, HybridTimestamp> lastCommitTimes,
+            Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
             String txCoordinatorId,
             int catalogVersion
     ) {
         return applyUpdateAllCommand(
                 rowsToUpdate,
-                lastCommitTimes,
                 request.commitPartitionId(),
                 request.transactionId(),
                 request.full(),
@@ -3433,12 +3434,18 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .txCoordinatorId(txCoordinatorId)
                 .requiredCatalogVersion(catalogVersion);
 
-        if (lastCommitTimestamp != null) {
-            bldr.lastCommitTimestampLong(lastCommitTimestamp.longValue());
-        }
+        if (lastCommitTimestamp != null || row != null) {
+            TimedBinaryRowMessageBuilder rowMsgBldr = 
MSG_FACTORY.timedBinaryRowMessage();
+
+            if (lastCommitTimestamp != null) {
+                rowMsgBldr.timestamp(lastCommitTimestamp.longValue());
+            }
+
+            if (row != null) {
+                rowMsgBldr.binaryRowMessage(binaryRowMessage(row));
+            }
 
-        if (row != null) {
-            bldr.rowMessage(binaryRowMessage(row));
+            bldr.messageRowToUpdate(rowMsgBldr.build());
         }
 
         return bldr.build();
@@ -3452,8 +3459,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     }
 
     private static UpdateAllCommand updateAllCommand(
-            Map<UUID, BinaryRowMessage> rowsToUpdate,
-            Map<UUID, HybridTimestamp> lastCommitTimes,
+            Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
             TablePartitionIdMessage commitPartitionId,
             UUID transactionId,
             HybridTimestamp safeTimeTimestamp,
@@ -3463,18 +3469,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     ) {
         return MSG_FACTORY.updateAllCommand()
                 .tablePartitionId(commitPartitionId)
-                .rowsToUpdate(rowsToUpdate)
+                .messageRowsToUpdate(rowsToUpdate)
                 .txId(transactionId)
                 .safeTimeLong(safeTimeTimestamp.longValue())
                 .full(full)
                 .txCoordinatorId(txCoordinatorId)
                 .requiredCatalogVersion(catalogVersion)
-                .lastCommitTimestampsLong(
-                        // Also make sure lastCommitTimes contains only those 
entries that match rowsToUpdate.
-                        lastCommitTimes.entrySet().stream()
-                                .filter(entry -> 
rowsToUpdate.containsKey(entry.getKey()))
-                                .collect(toMap(Entry::getKey, entry -> 
entry.getValue().longValue()))
-                )
                 .build();
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java
index 2dee94ab59..fcecf5cf39 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java
@@ -24,13 +24,13 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Row with time.
  */
-class TimedBinaryRow {
+public class TimedBinaryRow {
 
     private final @Nullable BinaryRow binaryRow;
 
     private final @Nullable HybridTimestamp commitTimestamp;
 
-    TimedBinaryRow(@Nullable BinaryRow binaryRow, @Nullable HybridTimestamp 
commitTimestamp) {
+    public TimedBinaryRow(@Nullable BinaryRow binaryRow, @Nullable 
HybridTimestamp commitTimestamp) {
         this.binaryRow = binaryRow;
         this.commitTimestamp = commitTimestamp;
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index bcbed62f92..dfe0808399 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.table.distributed;
 
-import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
 import static java.util.stream.Collectors.toList;
 import static org.mockito.Mockito.mock;
@@ -49,6 +48,7 @@ import 
org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
+import org.apache.ignite.internal.table.distributed.replicator.TimedBinaryRow;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.Cursor;
@@ -254,12 +254,11 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
 
                 handler.handleUpdateAll(
                         TX_ID,
-                        singletonMap(rowUuid, rowMessage),
+                        singletonMap(rowUuid, new TimedBinaryRow(rowMessage == 
null ? null : rowMessage.asBinaryRow(), null)),
                         partitionId,
                         true,
                         null,
-                        null,
-                        emptyMap()
+                        null
                 );
             }
         };
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
index 1934ed5477..e3913313e2 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.table.distributed;
 
-import static java.util.Collections.emptyMap;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -140,8 +139,7 @@ public class StorageUpdateHandlerTest extends 
BaseIgniteAbstractTest {
                 new TablePartitionId(TABLE_ID, PARTITION_ID),
                 false,
                 null,
-                null,
-                emptyMap()
+                null
         );
 
         verify(partitionStorage).peek(lwm);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index 307189f1e0..8acabb11f3 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -51,6 +53,9 @@ import org.junit.jupiter.api.Test;
  * Test for partition RAFT commands serialization.
  */
 public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest 
{
+    /** Hybrid clock. */
+    private final HybridClockImpl clock = new HybridClockImpl();
+
     /** Key-value marshaller for tests. */
     private static KvMarshaller<TestKey, TestValue> kvMarshaller;
 
@@ -81,7 +86,9 @@ public class PartitionRaftCommandsSerializationTest extends 
IgniteAbstractTest {
                         .build()
                 )
                 .rowUuid(UUID.randomUUID())
-                .rowMessage(binaryRowMessage(1))
+                .messageRowToUpdate(msgFactory.timedBinaryRowMessage()
+                                .binaryRowMessage(binaryRowMessage(1))
+                                .build())
                 .txId(TestTransactionIds.newTransactionId())
                 .txCoordinatorId(UUID.randomUUID().toString())
                 .build();
@@ -108,15 +115,21 @@ public class PartitionRaftCommandsSerializationTest 
extends IgniteAbstractTest {
 
         assertEquals(cmd.txId(), readCmd.txId());
         assertEquals(cmd.rowUuid(), readCmd.rowUuid());
-        assertNull(readCmd.rowMessage());
+        assertNull(readCmd.rowToUpdate());
     }
 
     @Test
     public void testUpdateAllCommand() throws Exception {
-        Map<UUID, BinaryRowMessage> rowsToUpdate = new HashMap<>();
+        Map<UUID, TimedBinaryRowMessage> rowsToUpdate = new HashMap<>();
 
         for (int i = 0; i < 10; i++) {
-            rowsToUpdate.put(TestTransactionIds.newTransactionId(), 
binaryRowMessage(i));
+            rowsToUpdate.put(
+                    TestTransactionIds.newTransactionId(),
+                    msgFactory.timedBinaryRowMessage()
+                            .binaryRowMessage(binaryRowMessage(i))
+                            .timestamp(i % 2 == 0 ? clock.nowLong() : 
NULL_HYBRID_TIMESTAMP)
+                            .build()
+            );
         }
 
         var cmd = msgFactory.updateAllCommand()
@@ -125,7 +138,7 @@ public class PartitionRaftCommandsSerializationTest extends 
IgniteAbstractTest {
                         .partitionId(1)
                         .build()
                 )
-                .rowsToUpdate(rowsToUpdate)
+                .messageRowsToUpdate(rowsToUpdate)
                 .txId(UUID.randomUUID())
                 .txCoordinatorId(UUID.randomUUID().toString())
                 .build();
@@ -134,22 +147,28 @@ public class PartitionRaftCommandsSerializationTest 
extends IgniteAbstractTest {
 
         assertEquals(cmd.txId(), readCmd.txId());
 
-        for (Map.Entry<UUID, BinaryRowMessage> entry : 
cmd.rowsToUpdate().entrySet()) {
+        for (Map.Entry<UUID, TimedBinaryRowMessage> entry : 
cmd.messageRowsToUpdate().entrySet()) {
             assertTrue(readCmd.rowsToUpdate().containsKey(entry.getKey()));
 
-            var readVal = readCmd.rowsToUpdate().get(entry.getKey());
-            var val = entry.getValue();
+            var readVal = 
readCmd.rowsToUpdate().get(entry.getKey()).binaryRow();
+            var val = entry.getValue().binaryRow();
 
             assertEquals(val, readVal);
+
+            var readTs = 
readCmd.rowsToUpdate().get(entry.getKey()).commitTimestamp();
+            var ts = nullableHybridTimestamp(entry.getValue().timestamp());
+
+            assertEquals(ts, readTs);
         }
     }
 
     @Test
     public void testRemoveAllCommand() throws Exception {
-        Map<UUID, BinaryRowMessage> rowsToRemove = new HashMap<>();
+        Map<UUID, TimedBinaryRowMessage> rowsToRemove = new HashMap<>();
 
         for (int i = 0; i < 10; i++) {
-            rowsToRemove.put(TestTransactionIds.newTransactionId(), null);
+            rowsToRemove.put(TestTransactionIds.newTransactionId(), 
msgFactory.timedBinaryRowMessage()
+                    .build());
         }
 
         var cmd = msgFactory.updateAllCommand()
@@ -158,7 +177,7 @@ public class PartitionRaftCommandsSerializationTest extends 
IgniteAbstractTest {
                         .partitionId(1)
                         .build()
                 )
-                .rowsToUpdate(rowsToRemove)
+                .messageRowsToUpdate(rowsToRemove)
                 .txId(UUID.randomUUID())
                 .txCoordinatorId(UUID.randomUUID().toString())
                 .build();
@@ -167,9 +186,9 @@ public class PartitionRaftCommandsSerializationTest extends 
IgniteAbstractTest {
 
         assertEquals(cmd.txId(), readCmd.txId());
 
-        for (UUID uuid : cmd.rowsToUpdate().keySet()) {
+        for (UUID uuid : cmd.messageRowsToUpdate().keySet()) {
             assertTrue(readCmd.rowsToUpdate().containsKey(uuid));
-            assertNull(readCmd.rowsToUpdate().get(uuid));
+            assertNull(readCmd.rowsToUpdate().get(uuid).binaryRow());
         }
     }
 
@@ -248,7 +267,7 @@ public class PartitionRaftCommandsSerializationTest extends 
IgniteAbstractTest {
                     .txId(updateCommand.txId())
                     .rowUuid(updateCommand.rowUuid())
                     .tablePartitionId(updateCommand.tablePartitionId())
-                    .rowMessage(updateCommand.rowMessage())
+                    .messageRowToUpdate(updateCommand.messageRowToUpdate())
                     .txCoordinatorId(updateCommand.txCoordinatorId())
                     .build();
         } else if (cmd instanceof UpdateAllCommand) {
@@ -256,7 +275,7 @@ public class PartitionRaftCommandsSerializationTest extends 
IgniteAbstractTest {
 
             return (T) msgFactory.updateAllCommand()
                     .txId(updateCommand.txId())
-                    .rowsToUpdate(updateCommand.rowsToUpdate())
+                    .messageRowsToUpdate(updateCommand.messageRowsToUpdate())
                     .tablePartitionId(updateCommand.tablePartitionId())
                     .txCoordinatorId(updateCommand.txCoordinatorId())
                     .build();
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 4d41882d2c..3773ccea66 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -90,6 +90,7 @@ import 
org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import 
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
@@ -615,12 +616,17 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
      * Inserts all rows.
      */
     private void insertAll() {
-        Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
+        Map<UUID, TimedBinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
         UUID txId = TestTransactionIds.newTransactionId();
         var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID);
 
         for (int i = 0; i < KEY_COUNT; i++) {
-            rows.put(TestTransactionIds.newTransactionId(), getTestRow(i, i));
+            rows.put(
+                    TestTransactionIds.newTransactionId(),
+                    msgFactory.timedBinaryRowMessage()
+                            .binaryRowMessage(getTestRow(i, i))
+                            .build()
+            );
         }
 
         HybridTimestamp commitTimestamp = hybridClock.now();
@@ -631,7 +637,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                                 .tableId(commitPartId.tableId())
                                 .partitionId(commitPartId.partitionId())
                                 .build())
-                .rowsToUpdate(rows)
+                .messageRowsToUpdate(rows)
                 .txId(txId)
                 .safeTimeLong(hybridClock.nowLong())
                 .txCoordinatorId(UUID.randomUUID().toString())
@@ -654,12 +660,16 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     private void updateAll(Function<Integer, Integer> keyValueMapper) {
         UUID txId = TestTransactionIds.newTransactionId();
         var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID);
-        Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
+        Map<UUID, TimedBinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
 
         for (int i = 0; i < KEY_COUNT; i++) {
             ReadResult readResult = readRow(getTestKey(i));
 
-            rows.put(readResult.rowId().uuid(), getTestRow(i, 
keyValueMapper.apply(i)));
+            rows.put(readResult.rowId().uuid(),
+                    msgFactory.timedBinaryRowMessage()
+                            .binaryRowMessage(getTestRow(i, 
keyValueMapper.apply(i)))
+                            .build()
+            );
         }
 
         HybridTimestamp commitTimestamp = hybridClock.now();
@@ -670,7 +680,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                                 .tableId(commitPartId.tableId())
                                 .partitionId(commitPartId.partitionId())
                                 .build())
-                .rowsToUpdate(rows)
+                .messageRowsToUpdate(rows)
                 .txId(txId)
                 .safeTimeLong(hybridClock.nowLong())
                 .txCoordinatorId(UUID.randomUUID().toString())
@@ -691,12 +701,13 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     private void deleteAll() {
         UUID txId = TestTransactionIds.newTransactionId();
         var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID);
-        Map<UUID, BinaryRowMessage> keyRows = new HashMap<>(KEY_COUNT);
+        Map<UUID, TimedBinaryRowMessage> keyRows = new HashMap<>(KEY_COUNT);
 
         for (int i = 0; i < KEY_COUNT; i++) {
             ReadResult readResult = readRow(getTestKey(i));
 
-            keyRows.put(readResult.rowId().uuid(), null);
+            keyRows.put(readResult.rowId().uuid(), 
msgFactory.timedBinaryRowMessage()
+                    .build());
         }
 
         HybridTimestamp commitTimestamp = hybridClock.now();
@@ -707,7 +718,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                                 .tableId(commitPartId.tableId())
                                 .partitionId(commitPartId.partitionId())
                                 .build())
-                .rowsToUpdate(keyRows)
+                .messageRowsToUpdate(keyRows)
                 .txId(txId)
                 .safeTimeLong(hybridClock.nowLong())
                 .txCoordinatorId(UUID.randomUUID().toString())
@@ -745,7 +756,9 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                                     .tableId(1)
                                     .partitionId(PARTITION_ID).build())
                             .rowUuid(readResult.rowId().uuid())
-                            .rowMessage(row)
+                            
.messageRowToUpdate(msgFactory.timedBinaryRowMessage()
+                                    .binaryRowMessage(row)
+                                    .build())
                             .txId(txId)
                             .safeTimeLong(hybridClock.nowLong())
                             .txCoordinatorId(UUID.randomUUID().toString())
@@ -862,7 +875,9 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                                     .tableId(1)
                                     .partitionId(PARTITION_ID).build())
                             .rowUuid(UUID.randomUUID())
-                            .rowMessage(getTestRow(i, i))
+                            
.messageRowToUpdate(msgFactory.timedBinaryRowMessage()
+                                    .binaryRowMessage(getTestRow(i, i))
+                                    .build())
                             .txId(txId)
                             .safeTimeLong(hybridClock.nowLong())
                             .txCoordinatorId(UUID.randomUUID().toString())

Reply via email to