This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 228b5be6e3 IGNITE-20445 Clean up write intents for RW transaction on 
replication… (#2657)
228b5be6e3 is described below

commit 228b5be6e3176ce40982b41f0dd3cf5c96452b09
Author: Cyrill <cyrill.si...@gmail.com>
AuthorDate: Wed Oct 11 11:29:02 2023 +0300

    IGNITE-20445 Clean up write intents for RW transaction on replication… 
(#2657)
---
 ...ava => ItTxDistributedCleanupRecoveryTest.java} | 157 +++-------
 ...xDistributedTestSingleNodeNoCleanupMessage.java |  15 +-
 .../table/distributed/StorageUpdateHandler.java    |  65 ++++-
 .../distributed/command/UpdateAllCommand.java      |  21 ++
 .../table/distributed/command/UpdateCommand.java   |  15 +
 .../table/distributed/raft/PartitionListener.java  |   6 +-
 .../replicator/PartitionReplicaListener.java       | 269 ++++++++++-------
 .../TimedBinaryRow.java}                           |  37 +--
 .../replicator/TransactionStateResolver.java       |   4 +-
 .../internal/table/distributed/IndexBaseTest.java  |  13 +-
 .../table/distributed/StorageCleanupTest.java      | 323 ++++++++++++++++++++-
 .../distributed/StorageUpdateHandlerTest.java      |   5 +-
 .../ignite/internal/table/TxAbstractTest.java      |  27 +-
 13 files changed, 682 insertions(+), 275 deletions(-)

diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
similarity index 54%
copy from 
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
copy to 
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
index 08c7bcee7a..b01d5e14da 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
@@ -17,19 +17,14 @@
 
 package org.apache.ignite.distributed;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
-import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -37,7 +32,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaResult;
-import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
@@ -47,18 +42,12 @@ import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.table.distributed.schema.Schemas;
-import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.impl.HeapLockManager;
-import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
-import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.TransactionException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -66,27 +55,31 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
 /**
- * Test to Simulate missing cleanup action.
+ * Durable cleanup test with successful recovery after the failures.
  */
-public class ItTxDistributedTestSingleNodeNoCleanupMessage extends 
ItTxDistributedTestSingleNode {
-    /** A list of background cleanup futures. */
-    private final List<CompletableFuture<?>> cleanupFutures = new 
CopyOnWriteArrayList<>();
+public class ItTxDistributedCleanupRecoveryTest extends 
ItTxDistributedTestSingleNode {
 
-    /** A flag to drop async cleanup actions.  */
-    private volatile boolean ignoreAsyncCleanup;
+    private AtomicInteger defaultRetryCount;
 
     /**
      * The constructor.
      *
      * @param testInfo Test info.
      */
-    public ItTxDistributedTestSingleNodeNoCleanupMessage(TestInfo testInfo) {
+    public ItTxDistributedCleanupRecoveryTest(TestInfo testInfo) {
         super(testInfo);
     }
 
+    private void setDefaultRetryCount(int count) {
+        defaultRetryCount = new AtomicInteger(count);
+    }
+
     @BeforeEach
     @Override
     public void before() throws Exception {
+        // The value of 3 is less than the allowed number of cleanup retries.
+        setDefaultRetryCount(3);
+
         txTestCluster = new ItTxTestCluster(
                 testInfo,
                 raftConfiguration,
@@ -97,23 +90,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends ItTxDistribut
                 startClient(),
                 timestampTracker
         ) {
-            @Override
-            protected TxManagerImpl newTxManager(ReplicaService replicaSvc, 
HybridClock clock, TransactionIdGenerator generator,
-                    ClusterNode node) {
-                return new TxManagerImpl(replicaSvc, new HeapLockManager(), 
clock, generator, node::id) {
-                    @Override
-                    public CompletableFuture<Void> 
executeCleanupAsync(Runnable runnable) {
-                        if (ignoreAsyncCleanup) {
-                            return completedFuture(null);
-                        }
-                        CompletableFuture<Void> cleanupFuture = 
super.executeCleanupAsync(runnable);
-
-                        cleanupFutures.add(cleanupFuture);
-
-                        return cleanupFuture;
-                    }
-                };
-            }
 
             @Override
             protected PartitionReplicaListener newReplicaListener(
@@ -161,15 +137,12 @@ public class 
ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut
                 ) {
                     @Override
                     public CompletableFuture<ReplicaResult> 
invoke(ReplicaRequest request, String senderId) {
-                        if (request instanceof TxCleanupReplicaRequest) {
+                        if (request instanceof TxCleanupReplicaRequest && 
defaultRetryCount.getAndDecrement() > 0) {
                             logger().info("Dropping cleanup request: {}", 
request);
 
-                            releaseTxLocks(
-                                    ((TxCleanupReplicaRequest) request).txId(),
-                                    txManager.lockManager()
-                            );
-
-                            return completedFuture(new ReplicaResult(null, 
null));
+                            return failedFuture(new ReplicationException(
+                                    REPLICA_COMMON_ERR,
+                                    "Test Tx Cleanup exception 
[replicaGroupId=" + request.groupId() + ']'));
                         }
                         return super.invoke(request, senderId);
                     }
@@ -187,86 +160,44 @@ public class 
ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut
         log.info("Tables have been started");
     }
 
-    @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446")
-    @Test
-    @Override
-    public void testTransactionAlreadyRolledback() {
-        super.testTransactionAlreadyRolledback();
-    }
 
-    @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446")
     @Test
     @Override
-    public void testTransactionAlreadyCommitted() {
-        super.testTransactionAlreadyCommitted();
-    }
+    public void testDeleteUpsertCommit() throws TransactionException {
+        // The value of 6 is higher than the default retry count.
+        // So we should give up retrying and crash.
+        setDefaultRetryCount(6);
 
-    @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446")
-    @Test
-    @Override
-    public void testBalance() throws InterruptedException {
-        super.testBalance();
+        assertThrows(TransactionException.class, () -> 
deleteUpsert().commit());
     }
 
-    @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446")
+    @Disabled("IGNITE-20560")
     @Test
     @Override
-    public void testDelete() throws TransactionException {
-        super.testDelete();
-    }
-
-    @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446")
-    @Test
-    public void testTwoReadWriteTransactions() throws TransactionException {
-        Tuple key = makeKey(1);
-
-        assertFalse(accounts.recordView().delete(null, key));
-        assertNull(accounts.recordView().get(null, key));
-
-        // Disable background cleanup to avoid a race.
-        ignoreAsyncCleanup = true;
-
-        InternalTransaction tx1 = (InternalTransaction) 
igniteTransactions.begin();
-        accounts.recordView().upsert(tx1, makeValue(1, 100.));
-        tx1.commit();
+    public void testTransactionAlreadyRolledback() {
+        // The value of 6 is higher than the default retry count.
+        // So we should give up retrying and crash.
+        setDefaultRetryCount(6);
 
-        InternalTransaction tx2 = (InternalTransaction) 
igniteTransactions.begin();
-        accounts.recordView().upsert(tx2, makeValue(1, 200.));
-        tx2.commit();
+        testTransactionAlreadyFinished(false, (transaction, txId) -> {
+            assertThrows(TransactionException.class, transaction::rollback);
 
-        assertEquals(200., accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
+            log.info("Rolled back transaction {}", txId);
+        });
     }
 
+    @Disabled("IGNITE-20560")
     @Test
-    public void testTwoReadWriteTransactionsWaitForCleanup() throws 
TransactionException {
-        Tuple key = makeKey(1);
-
-        assertFalse(accounts.recordView().delete(null, key));
-        assertNull(accounts.recordView().get(null, key));
-
-        // Start the first transaction. The values it changes will not be 
cleaned up.
-        InternalTransaction tx1 = (InternalTransaction) 
igniteTransactions.begin();
-
-        accounts.recordView().upsert(tx1, makeValue(1, 100.));
-
-        tx1.commit();
-
-        //Now start the seconds transaction and make sure write intent 
resolution is called  by adding a `get` operaiton.
-        InternalTransaction tx2 = (InternalTransaction) 
igniteTransactions.begin();
-
-        assertEquals(100., accounts.recordView().get(tx2, 
makeKey(1)).doubleValue("balance"));
-
-        // Now wait for the background task to finish.
-        cleanupFutures.forEach(completableFuture -> 
assertThat(completableFuture, willCompleteSuccessfully()));
-
-        accounts.recordView().upsert(tx2, makeValue(1, 200.));
-
-        tx2.commit();
+    @Override
+    public void testTransactionAlreadyCommitted() {
+        // The value of 6 is higher than the default retry count.
+        // So we should give up retrying and crash.
+        setDefaultRetryCount(6);
 
-        assertEquals(200., accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
-    }
+        testTransactionAlreadyFinished(true, (transaction, txId) -> {
+            assertThrows(TransactionException.class, transaction::commit);
 
-    private static void releaseTxLocks(UUID txId, LockManager lockManager) {
-        lockManager.locks(txId).forEachRemaining(lockManager::release);
+            log.info("Committed transaction {}", txId);
+        });
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 08c7bcee7a..c240591ab9 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -187,35 +187,28 @@ public class 
ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut
         log.info("Tables have been started");
     }
 
-    @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446")
+    @Disabled("IGNITE-20560")
     @Test
     @Override
     public void testTransactionAlreadyRolledback() {
         super.testTransactionAlreadyRolledback();
     }
 
-    @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446")
+    @Disabled("IGNITE-20560")
     @Test
     @Override
     public void testTransactionAlreadyCommitted() {
         super.testTransactionAlreadyCommitted();
     }
 
-    @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446")
+    @Disabled("IGNITE-20395")
     @Test
     @Override
     public void testBalance() throws InterruptedException {
         super.testBalance();
     }
 
-    @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446")
-    @Test
-    @Override
-    public void testDelete() throws TransactionException {
-        super.testDelete();
-    }
-
-    @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446")
+    @Disabled("IGNITE-20395")
     @Test
     public void testTwoReadWriteTransactions() throws TransactionException {
         Tuple key = makeKey(1);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 6e91612520..155d8e792d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -107,6 +107,7 @@ public class StorageUpdateHandler {
      * @param trackWriteIntent If {@code true} then write intent should be 
tracked.
      * @param onApplication Callback on application.
      * @param commitTs Commit timestamp to use on autocommit.
+     * @param lastCommitTs The timestamp of last known committed entry.
      */
     public void handleUpdate(
             UUID txId,
@@ -115,7 +116,8 @@ public class StorageUpdateHandler {
             @Nullable BinaryRow row,
             boolean trackWriteIntent,
             @Nullable Runnable onApplication,
-            @Nullable HybridTimestamp commitTs
+            @Nullable HybridTimestamp commitTs,
+            @Nullable HybridTimestamp lastCommitTs
     ) {
         indexUpdateHandler.waitIndexes();
 
@@ -126,6 +128,8 @@ public class StorageUpdateHandler {
 
             locker.lock(rowId);
 
+            performStorageCleanupIfNeeded(txId, rowId, lastCommitTs);
+
             if (commitTs != null) {
                 storage.addWriteCommitted(rowId, row, commitTs);
             } else {
@@ -163,6 +167,7 @@ public class StorageUpdateHandler {
      * @param trackWriteIntent If {@code true} then write intent should be 
tracked.
      * @param onApplication Callback on application.
      * @param commitTs Commit timestamp to use on autocommit.
+     * @param lastCommitTsMap A map(Row Id -> timestamp) of timestamps of the 
most recent commits to the affected rows.
      */
     public void handleUpdateAll(
             UUID txId,
@@ -170,7 +175,8 @@ public class StorageUpdateHandler {
             TablePartitionId commitPartitionId,
             boolean trackWriteIntent,
             @Nullable Runnable onApplication,
-            @Nullable HybridTimestamp commitTs
+            @Nullable HybridTimestamp commitTs,
+            Map<UUID, HybridTimestamp> lastCommitTsMap
     ) {
         indexUpdateHandler.waitIndexes();
 
@@ -190,6 +196,8 @@ public class StorageUpdateHandler {
 
                     locker.lock(rowId);
 
+                    performStorageCleanupIfNeeded(txId, rowId, 
lastCommitTsMap.get(entry.getKey()));
+
                     if (commitTs != null) {
                         storage.addWriteCommitted(rowId, row, commitTs);
                     } else {
@@ -221,6 +229,59 @@ public class StorageUpdateHandler {
         executeBatchGc();
     }
 
+    private void performStorageCleanupIfNeeded(UUID txId, RowId rowId, 
@Nullable HybridTimestamp lastCommitTs) {
+        // No previously committed value, this action might be an insert. No 
need to cleanup.
+        if (lastCommitTs == null) {
+            return;
+        }
+
+        try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+            // Okay, lastCommitTs is not null. It means that we are changing 
the previously committed data.
+            // However, we could have previously called cleanup for the same 
row.
+            // If the previous operation was "delete" and it was executed 
successfully, no data will be present in the storage.
+            if (!cursor.hasNext()) {
+                return;
+            }
+
+            ReadResult item = cursor.next();
+            // If there is a write intent in the storage and this intent was 
created by a different transaction
+            // then check the previous entry.
+            // Otherwise exit the check - everything's fine.
+            if (item.isWriteIntent() && !txId.equals(item.transactionId())) {
+                if (!cursor.hasNext()) {
+                    // No more data => the write intent we have is actually 
the first version of this row
+                    // and lastCommitTs is the commit timestamp of it.
+                    // Action: commit this write intent.
+                    performCommitWrite(item.transactionId(), Set.of(rowId), 
lastCommitTs);
+                    return;
+                }
+                // Otherwise there are other versions in the chain.
+                ReadResult committedItem = cursor.next();
+
+                // They should be regular entries, not write intents.
+                assert !committedItem.isWriteIntent() : "Cannot have more than 
one write intent per row";
+
+                assert lastCommitTs.compareTo(committedItem.commitTimestamp()) 
>= 0 :
+                        "Primary commit timestamp " + lastCommitTs + " is 
earlier than local commit timestamp "
+                                + committedItem.commitTimestamp();
+
+                if (lastCommitTs.compareTo(committedItem.commitTimestamp()) > 
0) {
+                    // We see that lastCommitTs is later than the timestamp of 
the committed value => we need to commit the write intent.
+                    // Action: commit this write intent.
+                    performCommitWrite(item.transactionId(), Set.of(rowId), 
lastCommitTs);
+                } else {
+                    // lastCommitTs == committedItem.commitTimestamp()
+                    // So we see a write intent from a different transaction, 
which was not committed on primary.
+                    // Because of transaction locks we cannot have two 
transactions creating write intents for the same row.
+                    // So if we got up to here, it means that the previous 
transaction was aborted,
+                    // but the storage was not cleaned after it.
+                    // Action: abort this write intent.
+                    performAbortWrite(item.transactionId(), Set.of(rowId));
+                }
+            }
+        }
+    }
+
     void executeBatchGc() {
         HybridTimestamp lwm = lowWatermark.getLowWatermark();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
index 470333b64f..4d75fd22be 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
@@ -17,11 +17,16 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * State machine command for updating a batch of entries.
@@ -33,4 +38,20 @@ public interface UpdateAllCommand extends PartitionCommand {
     Map<UUID, BinaryRowMessage> rowsToUpdate();
 
     String txCoordinatorId();
+
+    // TODO: IGNITE-20609 row id in this map duplicates row id in rowsToUpdate.
+    @Nullable Map<UUID, Long> lastCommitTimestampsLong();
+
+    /**
+     * Returns the timestamps of the last committed entries for each row.
+     */
+    default Map<UUID, HybridTimestamp> lastCommitTimestamps() {
+        Map<UUID, HybridTimestamp> map = new HashMap<>();
+
+        Map<UUID, Long> uuidLongMap = lastCommitTimestampsLong();
+        if (uuidLongMap != null) {
+            uuidLongMap.forEach((uuid, ts) -> map.put(uuid, 
hybridTimestamp(ts)));
+        }
+        return map;
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
index 821e449311..3e47dfd574 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
+
 import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
@@ -45,4 +48,16 @@ public interface UpdateCommand extends PartitionCommand {
 
         return message == null ? null : message.asBinaryRow();
     }
+
+    /**
+     * Returns the timestamp of the last committed entry.
+     */
+    long lastCommitTimestampLong();
+
+    /**
+     * Returns the timestamp of the last committed entry.
+     */
+    default @Nullable HybridTimestamp lastCommitTimestamp() {
+        return nullableHybridTimestamp(lastCommitTimestampLong());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 6ed5a8464b..9456db2c27 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -238,7 +238,8 @@ public class PartitionListener implements RaftGroupListener 
{
                 storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), 
cmd.tablePartitionId().asTablePartitionId(), cmd.row(),
                         !cmd.full(),
                         () -> storage.lastApplied(commandIndex, commandTerm),
-                        cmd.full() ? cmd.safeTime() : null
+                        cmd.full() ? cmd.safeTime() : null,
+                        cmd.lastCommitTimestamp()
                 );
             }
 
@@ -266,7 +267,8 @@ public class PartitionListener implements RaftGroupListener 
{
                 storageUpdateHandler.handleUpdateAll(cmd.txId(), 
cmd.rowsToUpdate(), cmd.tablePartitionId().asTablePartitionId(),
                         !cmd.full(),
                         () -> storage.lastApplied(commandIndex, commandTerm),
-                        cmd.full() ? cmd.safeTime() : null
+                        cmd.full() ? cmd.safeTime() : null,
+                        cmd.lastCommitTimestamps()
                 );
 
                 updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ee6b2183a8..189929cf27 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -18,12 +18,14 @@
 package org.apache.ignite.internal.table.distributed.replicator;
 
 import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.tx.TxState.ABANDONED;
@@ -48,6 +50,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -59,11 +62,9 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
@@ -71,6 +72,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.IgniteTriFunction;
 import org.apache.ignite.internal.lang.IgniteUuid;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -514,7 +516,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Result future.
      */
     private CompletableFuture<List<BinaryRow>> 
retrieveExactEntriesUntilCursorEmpty(
-            @Nullable UUID txId,
+            UUID txId,
             @Nullable HybridTimestamp readTimestamp,
             IgniteUuid cursorId,
             int count
@@ -522,14 +524,20 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         @SuppressWarnings("resource") PartitionTimestampCursor cursor = 
(PartitionTimestampCursor) cursors.computeIfAbsent(cursorId,
                 id -> mvDataStorage.scan(readTimestamp == null ? 
HybridTimestamp.MAX_VALUE : readTimestamp));
 
-        var resolutionFuts = new 
ArrayList<CompletableFuture<BinaryRow>>(count);
+        var resolutionFuts = new 
ArrayList<CompletableFuture<TimedBinaryRow>>(count);
 
         while (resolutionFuts.size() < count && cursor.hasNext()) {
             ReadResult readResult = cursor.next();
             HybridTimestamp newestCommitTimestamp = 
readResult.newestCommitTimestamp();
 
-            BinaryRow candidate =
-                    newestCommitTimestamp == null || 
!readResult.isWriteIntent() ? null : cursor.committed(newestCommitTimestamp);
+            TimedBinaryRow candidate;
+            if (newestCommitTimestamp == null || !readResult.isWriteIntent()) {
+                candidate = null;
+            } else {
+                BinaryRow committedRow = 
cursor.committed(newestCommitTimestamp);
+
+                candidate = committedRow == null ? null : new 
TimedBinaryRow(committedRow, newestCommitTimestamp);
+            }
 
             resolutionFuts.add(resolveReadResult(readResult, txId, 
readTimestamp, () -> candidate));
         }
@@ -537,11 +545,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return allOf(resolutionFuts.toArray(new 
CompletableFuture[0])).thenCompose(unused -> {
             var rows = new ArrayList<BinaryRow>(count);
 
-            for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) {
-                BinaryRow resolvedReadResult = resolutionFut.join();
+            for (CompletableFuture<TimedBinaryRow> resolutionFut : 
resolutionFuts) {
+                TimedBinaryRow resolvedReadResult = resolutionFut.join();
 
-                if (resolvedReadResult != null) {
-                    rows.add(resolvedReadResult);
+                if (resolvedReadResult != null && 
resolvedReadResult.binaryRow() != null) {
+                    rows.add(resolvedReadResult.binaryRow());
                 }
             }
 
@@ -965,8 +973,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         RowId rowId = indexRow.rowId();
 
         return resolvePlainReadResult(rowId, null, 
timestamp).thenComposeAsync(resolvedReadResult -> {
-            if (resolvedReadResult != null && indexRowMatches(indexRow, 
resolvedReadResult, schemaAwareIndexStorage)) {
-                result.add(resolvedReadResult);
+            if (resolvedReadResult != null
+                    && resolvedReadResult.binaryRow() != null
+                    && indexRowMatches(indexRow, 
resolvedReadResult.binaryRow(), schemaAwareIndexStorage)) {
+                result.add(resolvedReadResult.binaryRow());
             }
 
             return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, 
timestamp, batchSize, result);
@@ -1009,10 +1019,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     return lockManager.acquire(txId, new LockKey(tableId(), 
rowId), LockMode.S)
                             .thenComposeAsync(rowLock -> { // Table row S lock
                                 return resolvePlainReadResult(rowId, 
txId).thenCompose(resolvedReadResult -> {
-                                    if (resolvedReadResult != null) {
-                                        if (indexRowMatches(currentRow, 
resolvedReadResult, schemaAwareIndexStorage)) {
-                                            result.add(resolvedReadResult);
-                                        }
+                                    if (resolvedReadResult != null
+                                            && resolvedReadResult.binaryRow() 
!= null
+                                            && indexRowMatches(currentRow, 
resolvedReadResult.binaryRow(), schemaAwareIndexStorage)) {
+                                        
result.add(resolvedReadResult.binaryRow());
                                     }
 
                                     // Proceed scan.
@@ -1059,8 +1069,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return lockManager.acquire(txId, new LockKey(tableId(), rowId), 
LockMode.S)
                 .thenComposeAsync(rowLock -> { // Table row S lock
                     return resolvePlainReadResult(rowId, 
txId).thenCompose(resolvedReadResult -> {
-                        if (resolvedReadResult != null) {
-                            result.add(resolvedReadResult);
+                        if (resolvedReadResult != null && 
resolvedReadResult.binaryRow() != null) {
+                            result.add(resolvedReadResult.binaryRow());
                         }
 
                         // Proceed lookup.
@@ -1077,7 +1087,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param timestamp Read timestamp.
      * @return Future finishes with the resolved binary row.
      */
-    private CompletableFuture<BinaryRow> resolvePlainReadResult(RowId rowId, 
@Nullable UUID txId, @Nullable HybridTimestamp timestamp) {
+    private CompletableFuture<@Nullable TimedBinaryRow> resolvePlainReadResult(
+            RowId rowId,
+            @Nullable UUID txId,
+            @Nullable HybridTimestamp timestamp
+    ) {
         ReadResult readResult = mvDataStorage.read(rowId, timestamp == null ? 
HybridTimestamp.MAX_VALUE : timestamp);
 
         return resolveReadResult(readResult, txId, timestamp, () -> {
@@ -1091,7 +1105,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     "The result is not committed [rowId=" + rowId + ", 
timestamp="
                             + readResult.newestCommitTimestamp() + ']';
 
-            return committedReadResult.binaryRow();
+            return new TimedBinaryRow(committedReadResult.binaryRow(), 
committedReadResult.commitTimestamp());
         });
     }
 
@@ -1102,13 +1116,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param txId Transaction id.
      * @return Future finishes with the resolved binary row.
      */
-    private CompletableFuture<BinaryRow> resolvePlainReadResult(RowId rowId, 
UUID txId) {
+    private CompletableFuture<@Nullable TimedBinaryRow> 
resolvePlainReadResult(RowId rowId, UUID txId) {
         return resolvePlainReadResult(rowId, txId, null).thenCompose(row -> {
-            if (row == null) {
+            if (row == null || row.binaryRow() == null) {
                 return completedFuture(null);
             }
 
-            return 
schemaCompatValidator.validateBackwards(row.schemaVersion(), tableId(), txId)
+            return 
schemaCompatValidator.validateBackwards(row.binaryRow().schemaVersion(), 
tableId(), txId)
                     .thenApply(validationResult -> {
                         if (validationResult.isSuccessful()) {
                             return row;
@@ -1121,27 +1135,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         });
     }
 
-    private CompletableFuture<Void> continueReadOnlyIndexLookup(
-            Cursor<RowId> indexCursor,
-            HybridTimestamp timestamp,
-            int batchSize,
-            List<BinaryRow> result
-    ) {
-        if (result.size() >= batchSize || !indexCursor.hasNext()) {
-            return completedFuture(null);
-        }
-
-        RowId rowId = indexCursor.next();
-
-        return resolvePlainReadResult(rowId, null, 
timestamp).thenComposeAsync(resolvedReadResult -> {
-            if (resolvedReadResult != null) {
-                result.add(resolvedReadResult);
-            }
-
-            return continueReadOnlyIndexLookup(indexCursor, timestamp, 
batchSize, result);
-        }, scanRequestExecutor);
-    }
-
     /**
      * Processes transaction finish request.
      * <ol>
@@ -1433,7 +1426,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private <T> CompletableFuture<T> resolveRowByPk(
             BinaryTuple pk,
             UUID txId,
-            BiFunction<@Nullable RowId, @Nullable BinaryRow, 
CompletableFuture<T>> action
+            IgniteTriFunction<@Nullable RowId, @Nullable BinaryRow, @Nullable 
HybridTimestamp, CompletableFuture<T>> action
     ) {
         IndexLocker pkLocker = 
indexesLockers.get().get(pkIndexStorage.get().id());
 
@@ -1466,22 +1459,21 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private <T> CompletableFuture<T> continueResolvingByPk(
             Cursor<RowId> cursor,
             UUID txId,
-            BiFunction<@Nullable RowId, @Nullable BinaryRow, 
CompletableFuture<T>> action
+            IgniteTriFunction<@Nullable RowId, @Nullable BinaryRow, @Nullable 
HybridTimestamp, CompletableFuture<T>> action
     ) {
         if (!cursor.hasNext()) {
-            return action.apply(null, null);
+            return action.apply(null, null, null);
         }
 
         RowId rowId = cursor.next();
 
         return resolvePlainReadResult(rowId, txId).thenCompose(row -> {
-            if (row != null) {
-                return action.apply(rowId, row);
+            if (row != null && row.binaryRow() != null) {
+                return action.apply(rowId, row.binaryRow(), 
row.commitTimestamp());
             } else {
                 return continueResolvingByPk(cursor, txId, action);
             }
         });
-
     }
 
     /**
@@ -1557,7 +1549,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param ts A timestamp regarding which we need to resolve the given row.
      * @return Result of the given action.
      */
-    private CompletableFuture<BinaryRow> resolveRowByPkForReadOnly(BinaryTuple 
pk, HybridTimestamp ts) {
+    private CompletableFuture<@Nullable BinaryRow> 
resolveRowByPkForReadOnly(BinaryTuple pk, HybridTimestamp ts) {
         // Indexes store values associated with different versions of one 
entry.
         // It's possible to have multiple entries for a particular search key
         // only if we insert, delete and again insert an entry with the same 
indexed fields.
@@ -1717,14 +1709,20 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             case RW_DELETE_EXACT_ALL: {
                 CompletableFuture<RowId>[] deleteExactLockFuts = new 
CompletableFuture[searchRows.size()];
 
+                Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>();
+
                 for (int i = 0; i < searchRows.size(); i++) {
                     BinaryRow searchRow = searchRows.get(i);
 
-                    deleteExactLockFuts[i] = 
resolveRowByPk(extractPk(searchRow), txId, (rowId, row) -> {
+                    deleteExactLockFuts[i] = 
resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                         if (rowId == null) {
                             return completedFuture(null);
                         }
 
+                        if (lastCommitTime != null) {
+                            lastCommitTimes.put(rowId.uuid(), lastCommitTime);
+                        }
+
                         return takeLocksForDeleteExact(searchRow, rowId, row, 
txId);
                     });
                 }
@@ -1747,7 +1745,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(new ReplicaResult(result, 
null));
                     }
 
-                    return 
validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, 
txCoordinatorId)
+                    return 
validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, 
lastCommitTimes, txCoordinatorId)
                             .thenCompose(cmd -> applyUpdateAllCommand(cmd, 
request.skipDelayedAck()))
                             .thenApply(res -> new ReplicaResult(result, res));
                 });
@@ -1762,7 +1760,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     pks.add(pk);
 
-                    pkReadLockFuts[i] = resolveRowByPk(pk, txId, (rowId, row) 
-> completedFuture(rowId));
+                    pkReadLockFuts[i] = resolveRowByPk(pk, txId, (rowId, row, 
lastCommitTime) -> completedFuture(rowId));
                 }
 
                 return allOf(pkReadLockFuts).thenCompose(ignore -> {
@@ -1794,7 +1792,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     Map<UUID, BinaryRowMessage> convertedMap = 
rowsToInsert.entrySet().stream()
-                            .collect(Collectors.toMap(
+                            .collect(toMap(
                                     e -> e.getKey().uuid(),
                                     e -> MSG_FACTORY.binaryRowMessage()
                                             
.binaryTuple(e.getValue().tupleSlice())
@@ -1803,7 +1801,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             ));
 
                     return allOf(insertLockFuts)
-                            .thenCompose(ignored -> 
validateAtTimestampAndBuildUpdateAllCommand(request, convertedMap, 
txCoordinatorId))
+                            .thenCompose(ignored ->
+                                    // We are inserting completely new rows - 
no need to cleanup anything in this case, hence empty times.
+                                    
validateAtTimestampAndBuildUpdateAllCommand(request, convertedMap, emptyMap(), 
txCoordinatorId))
                             .thenCompose(cmd -> applyUpdateAllCommand(cmd, 
request.skipDelayedAck()))
                             .thenApply(res -> {
                                 // Release short term locks.
@@ -1819,14 +1819,20 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             case RW_UPSERT_ALL: {
                 CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] 
rowIdFuts = new CompletableFuture[searchRows.size()];
 
+                Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>();
+
                 for (int i = 0; i < searchRows.size(); i++) {
                     BinaryRow searchRow = searchRows.get(i);
 
-                    rowIdFuts[i] = resolveRowByPk(extractPk(searchRow), txId, 
(rowId, row) -> {
+                    rowIdFuts[i] = resolveRowByPk(extractPk(searchRow), txId, 
(rowId, row, lastCommitTime) -> {
                         boolean insert = rowId == null;
 
                         RowId rowId0 = insert ? new RowId(partId(), 
UUID.randomUUID()) : rowId;
 
+                        if (lastCommitTime != null) {
+                            lastCommitTimes.put(rowId.uuid(), lastCommitTime);
+                        }
+
                         return insert
                                 ? takeLocksForInsert(searchRow, rowId0, txId)
                                 : takeLocksForUpdate(searchRow, rowId0, txId);
@@ -1847,7 +1853,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(new ReplicaResult(null, null));
                     }
 
-                    return 
validateAtTimestampAndBuildUpdateAllCommand(request, rowsToUpdate, 
txCoordinatorId)
+                    return 
validateAtTimestampAndBuildUpdateAllCommand(request, rowsToUpdate, 
lastCommitTimes, txCoordinatorId)
                             .thenCompose(cmd -> applyUpdateAllCommand(cmd, 
request.skipDelayedAck()))
                             .thenApply(res -> {
                                 // Release short term locks.
@@ -1867,7 +1873,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         }
     }
 
-
     /**
      * Precesses multi request.
      *
@@ -1888,7 +1893,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 CompletableFuture<BinaryRow>[] rowFuts = new 
CompletableFuture[primaryKeys.size()];
 
                 for (int i = 0; i < primaryKeys.size(); i++) {
-                    rowFuts[i] = resolveRowByPk(primaryKeys.get(i), txId, 
(rowId, row) -> {
+                    rowFuts[i] = resolveRowByPk(primaryKeys.get(i), txId, 
(rowId, row, lastCommitTime) -> {
                         if (rowId == null) {
                             return completedFuture(null);
                         }
@@ -1917,12 +1922,18 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             case RW_DELETE_ALL: {
                 CompletableFuture<RowId>[] rowIdLockFuts = new 
CompletableFuture[primaryKeys.size()];
 
+                Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>();
+
                 for (int i = 0; i < primaryKeys.size(); i++) {
-                    rowIdLockFuts[i] = resolveRowByPk(primaryKeys.get(i), 
txId, (rowId, row) -> {
+                    rowIdLockFuts[i] = resolveRowByPk(primaryKeys.get(i), 
txId, (rowId, row, lastCommitTime) -> {
                         if (rowId == null) {
                             return completedFuture(null);
                         }
 
+                        if (lastCommitTime != null) {
+                            lastCommitTimes.put(rowId.uuid(), lastCommitTime);
+                        }
+
                         return takeLocksForDelete(row, rowId, txId);
                     });
                 }
@@ -1947,7 +1958,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(new ReplicaResult(result, 
null));
                     }
 
-                    return 
validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, 
txCoordinatorId)
+                    return 
validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, 
lastCommitTimes, txCoordinatorId)
                             .thenCompose(cmd -> applyUpdateAllCommand(cmd, 
request.skipDelayedAck()))
                             .thenApply(res -> new ReplicaResult(result, res));
                 });
@@ -2016,7 +2027,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     cmd.row(),
                     true,
                     null,
-                    null);
+                    null,
+                    cmd.lastCommitTimestamp());
 
             // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp
             synchronized (safeTime) {
@@ -2039,7 +2051,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                 cmd.row(),
                                 false,
                                 null,
-                                cmd.safeTime());
+                                cmd.safeTime(),
+                                cmd.lastCommitTimestamp());
 
                         updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
                     }
@@ -2065,7 +2078,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         cmd.tablePartitionId().asTablePartitionId(),
                         true,
                         null,
-                        null);
+                        null,
+                        cmd.lastCommitTimestamps());
 
                 // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp
                 synchronized (safeTime) {
@@ -2091,7 +2105,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             cmd.tablePartitionId().asTablePartitionId(),
                             true,
                             null,
-                            null);
+                            null,
+                            cmd.lastCommitTimestamps());
 
                     updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
                 }
@@ -2110,7 +2125,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                 cmd.tablePartitionId().asTablePartitionId(),
                                 false,
                                 null,
-                                cmd.safeTime());
+                                cmd.safeTime(),
+                                cmd.lastCommitTimestamps());
 
                         updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
                     }
@@ -2155,7 +2171,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         switch (request.requestType()) {
             case RW_DELETE_EXACT: {
-                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) 
-> {
+                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
                         return completedFuture(new ReplicaResult(false, null));
                     }
@@ -2166,14 +2182,20 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     return completedFuture(new 
ReplicaResult(false, request.full() ? null : completedFuture(null)));
                                 }
 
-                                return 
validateAtTimestampAndBuildUpdateCommand(request, validatedRowId.uuid(), null, 
txCoordinatorId)
+                                return 
validateAtTimestampAndBuildUpdateCommand(
+                                        request,
+                                        validatedRowId.uuid(),
+                                        null,
+                                        lastCommitTime,
+                                        txCoordinatorId
+                                )
                                         .thenCompose(this::applyUpdateCommand)
                                         .thenApply(res -> new 
ReplicaResult(true, res));
                             });
                 });
             }
             case RW_INSERT: {
-                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) 
-> {
+                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId != null) {
                         return completedFuture(new ReplicaResult(false, null));
                     }
@@ -2182,7 +2204,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForInsert(searchRow, rowId0, txId)
                             .thenCompose(rowIdLock -> 
validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow,
-                                    txCoordinatorId
+                                    lastCommitTime, txCoordinatorId
                             )
                                     .thenCompose(this::applyUpdateCommand)
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
@@ -2195,7 +2217,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
             }
             case RW_UPSERT: {
-                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) 
-> {
+                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     boolean insert = rowId == null;
 
                     RowId rowId0 = insert ? new RowId(partId(), 
UUID.randomUUID()) : rowId;
@@ -2206,7 +2228,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return lockFut
                             .thenCompose(rowIdLock -> 
validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow,
-                                    txCoordinatorId
+                                    lastCommitTime, txCoordinatorId
                             )
                                     .thenCompose(this::applyUpdateCommand)
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
@@ -2219,7 +2241,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
             }
             case RW_GET_AND_UPSERT: {
-                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) 
-> {
+                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     boolean insert = rowId == null;
 
                     RowId rowId0 = insert ? new RowId(partId(), 
UUID.randomUUID()) : rowId;
@@ -2230,7 +2252,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return lockFut
                             .thenCompose(rowIdLock -> 
validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow,
-                                    txCoordinatorId
+                                    lastCommitTime, txCoordinatorId
                             )
                                     .thenCompose(this::applyUpdateCommand)
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
@@ -2243,14 +2265,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
             }
             case RW_GET_AND_REPLACE: {
-                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) 
-> {
+                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
                         return completedFuture(new ReplicaResult(null, null));
                     }
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
                             .thenCompose(rowIdLock -> 
validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), searchRow,
-                                    txCoordinatorId
+                                    lastCommitTime, txCoordinatorId
                             )
                                     .thenCompose(this::applyUpdateCommand)
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
@@ -2263,14 +2285,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
             }
             case RW_REPLACE_IF_EXIST: {
-                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) 
-> {
+                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
                         return completedFuture(new ReplicaResult(false, null));
                     }
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
                             .thenCompose(rowIdLock -> 
validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), searchRow,
-                                    txCoordinatorId
+                                    lastCommitTime, txCoordinatorId
                             )
                                     .thenCompose(this::applyUpdateCommand)
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
@@ -2306,7 +2328,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         switch (request.requestType()) {
             case RW_GET: {
-                return resolveRowByPk(primaryKey, txId, (rowId, row) -> {
+                return resolveRowByPk(primaryKey, txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
                         return completedFuture(null);
                     }
@@ -2317,25 +2339,38 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
             }
             case RW_DELETE: {
-                return resolveRowByPk(primaryKey, txId, (rowId, row) -> {
+                return resolveRowByPk(primaryKey, txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
                         return completedFuture(new ReplicaResult(false, null));
                     }
 
                     return takeLocksForDelete(row, rowId, txId)
-                            .thenCompose(rowLock -> 
validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), null, 
txCoordinatorId))
+                            .thenCompose(rowLock ->
+                                    validateAtTimestampAndBuildUpdateCommand(
+                                            request,
+                                            rowId.uuid(),
+                                            null,
+                                            lastCommitTime,
+                                            txCoordinatorId))
                             .thenCompose(this::applyUpdateCommand)
                             .thenApply(res -> new ReplicaResult(true, res));
                 });
             }
             case RW_GET_AND_DELETE: {
-                return resolveRowByPk(primaryKey, txId, (rowId, row) -> {
+                return resolveRowByPk(primaryKey, txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
                         return completedFuture(null);
                     }
 
                     return takeLocksForDelete(row, rowId, txId)
-                            .thenCompose(ignored -> 
validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), null, 
txCoordinatorId))
+                            .thenCompose(ignored ->
+                                    validateAtTimestampAndBuildUpdateCommand(
+                                            request,
+                                            rowId.uuid(),
+                                            null,
+                                            lastCommitTime,
+                                            txCoordinatorId)
+                            )
                             .thenCompose(this::applyUpdateCommand)
                             .thenApply(res -> new ReplicaResult(row, res));
                 });
@@ -2506,7 +2541,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         UUID txId = request.transactionId();
 
         if (request.requestType() == RequestType.RW_REPLACE) {
-            return resolveRowByPk(extractPk(newRow), txId, (rowId, row) -> {
+            return resolveRowByPk(extractPk(newRow), txId, (rowId, row, 
lastCommitTime) -> {
                 if (rowId == null) {
                     return completedFuture(new ReplicaResult(false, null));
                 }
@@ -2518,7 +2553,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             }
 
                             return 
validateAtTimestampAndBuildUpdateCommand(commitPartitionId.asTablePartitionId(),
-                                    rowIdLock.get1().uuid(), newRow, txId, 
request.full(), txCoordinatorId
+                                    rowIdLock.get1().uuid(), newRow, 
lastCommitTime, txId, request.full(), txCoordinatorId
                             )
                                     .thenCompose(this::applyUpdateCommand)
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock))
@@ -2543,7 +2578,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param txId Transaction id.
      * @return Future completes with tuple {@link RowId} and collection of 
{@link Lock} or {@code null} if there is no suitable row.
      */
-    private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
takeLocksForReplace(BinaryRow expectedRow, BinaryRow oldRow,
+    private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
takeLocksForReplace(BinaryRow expectedRow, @Nullable BinaryRow oldRow,
             BinaryRow newRow, RowId rowId, UUID txId) {
         return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
                 .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.S))
@@ -2636,24 +2671,24 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param lastCommitted Action to get the latest committed row.
      * @return Future to resolved binary row.
      */
-    private CompletableFuture<BinaryRow> resolveReadResult(
+    private CompletableFuture<@Nullable TimedBinaryRow> resolveReadResult(
             ReadResult readResult,
             @Nullable UUID txId,
             @Nullable HybridTimestamp timestamp,
-            Supplier<BinaryRow> lastCommitted
+            Supplier<@Nullable TimedBinaryRow> lastCommitted
     ) {
         if (readResult == null) {
             return completedFuture(null);
         } else if (!readResult.isWriteIntent()) {
-            return completedFuture(readResult.binaryRow());
+            return completedFuture(new TimedBinaryRow(readResult.binaryRow(), 
readResult.commitTimestamp()));
         } else {
-            // RW resolution.
+            // RW write intent resolution.
             if (timestamp == null) {
                 UUID retrievedResultTxId = readResult.transactionId();
 
                 if (txId.equals(retrievedResultTxId)) {
-                    // Same transaction - return retrieved value. It may be 
both writeIntent or regular value.
-                    return completedFuture(readResult.binaryRow());
+                    // Same transaction - return the retrieved value. It may 
be either a writeIntent or a regular value.
+                    return completedFuture(new 
TimedBinaryRow(readResult.binaryRow()));
                 }
             }
 
@@ -2669,16 +2704,21 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param lastCommitted Action to get a last committed row.
      * @return Result future.
      */
-    private CompletableFuture<BinaryRow> resolveWriteIntentAsync(
+    private CompletableFuture<@Nullable TimedBinaryRow> 
resolveWriteIntentAsync(
             ReadResult readResult,
-            HybridTimestamp timestamp,
-            Supplier<BinaryRow> lastCommitted
+            @Nullable HybridTimestamp timestamp,
+            Supplier<@Nullable TimedBinaryRow> lastCommitted
     ) {
         return inBusyLockAsync(busyLock, () ->
                 resolveWriteIntentReadability(readResult, timestamp)
                         .thenApply(writeIntentReadable ->
                                 inBusyLock(busyLock, () ->
-                                        writeIntentReadable ? 
readResult.binaryRow() : lastCommitted.get()
+                                        // TODO: If the write intent is 
readable - it's either committed or aborted.
+                                        //  Local (primary) cleanup should be 
done first.
+                                        //  
https://issues.apache.org/jira/browse/IGNITE-20395
+                                        //  Once the cleanup is performed, the 
timestamp should be added to
+                                        //  `new 
TimedBinaryRow(readResult.binaryRow())`.
+                                        writeIntentReadable ? new 
TimedBinaryRow(readResult.binaryRow()) : lastCommitted.get()
                                 )
                         )
         );
@@ -2726,7 +2766,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return The future completes with {@code true} when the transaction is 
committed and commit time <= read time, {@code false}
      *         otherwise (whe the transaction is either in progress, or 
aborted, or committed and commit time > read time).
      */
-    private CompletableFuture<Boolean> 
resolveWriteIntentReadability(ReadResult writeIntent, HybridTimestamp 
timestamp) {
+    private CompletableFuture<Boolean> 
resolveWriteIntentReadability(ReadResult writeIntent, @Nullable HybridTimestamp 
timestamp) {
         UUID txId = writeIntent.transactionId();
 
         return transactionStateResolver.resolveTxState(
@@ -2789,12 +2829,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             ReadWriteSingleRowReplicaRequest request,
             UUID rowUuid,
             @Nullable BinaryRow row,
+            @Nullable HybridTimestamp lastCommitTimestamp,
             String txCoordinatorId
     ) {
         return validateAtTimestampAndBuildUpdateCommand(
                 request.commitPartitionId().asTablePartitionId(),
                 rowUuid,
                 row,
+                lastCommitTimestamp,
                 request.transactionId(),
                 request.full(),
                 txCoordinatorId
@@ -2814,12 +2856,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             ReadWriteSingleRowPkReplicaRequest request,
             UUID rowUuid,
             @Nullable BinaryRow row,
+            @Nullable HybridTimestamp lastCommitTimestamp,
             String txCoordinatorId
     ) {
         return validateAtTimestampAndBuildUpdateCommand(
                 request.commitPartitionId().asTablePartitionId(),
                 rowUuid,
                 row,
+                lastCommitTimestamp,
                 request.transactionId(),
                 request.full(),
                 txCoordinatorId
@@ -2841,6 +2885,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             TablePartitionId tablePartId,
             UUID rowUuid,
             @Nullable BinaryRow row,
+            @Nullable HybridTimestamp lastCommitTimestamp,
             UUID txId,
             boolean full,
             String txCoordinatorId
@@ -2851,7 +2896,17 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .thenApply(catalogVersion -> {
                     failIfSchemaChangedSinceTxStart(txId, operationTimestamp);
 
-                    return updateCommand(tablePartId, rowUuid, row, txId, 
full, txCoordinatorId, operationTimestamp, catalogVersion);
+                    return updateCommand(
+                            tablePartId,
+                            rowUuid,
+                            row,
+                            lastCommitTimestamp,
+                            txId,
+                            full,
+                            txCoordinatorId,
+                            operationTimestamp,
+                            catalogVersion
+                    );
                 });
     }
 
@@ -2859,6 +2914,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             TablePartitionId tablePartId,
             UUID rowUuid,
             @Nullable BinaryRow row,
+            @Nullable HybridTimestamp lastCommitTimestamp,
             UUID txId,
             boolean full,
             String txCoordinatorId,
@@ -2874,6 +2930,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .txCoordinatorId(txCoordinatorId)
                 .requiredCatalogVersion(catalogVersion);
 
+        if (lastCommitTimestamp != null) {
+            bldr.lastCommitTimestampLong(lastCommitTimestamp.longValue());
+        }
+
         if (row != null) {
             BinaryRowMessage rowMessage = MSG_FACTORY.binaryRowMessage()
                     .binaryTuple(row.tupleSlice())
@@ -2906,10 +2966,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private CompletableFuture<UpdateAllCommand> 
validateAtTimestampAndBuildUpdateAllCommand(
             ReadWriteMultiRowReplicaRequest request,
             Map<UUID, BinaryRowMessage> rowsToUpdate,
+            Map<UUID, HybridTimestamp> lastCommitTimes,
             String txCoordinatorId
     ) {
         return validateAtTimestampAndBuildUpdateAllCommand(
                 rowsToUpdate,
+                lastCommitTimes,
                 request.commitPartitionId(),
                 request.transactionId(),
                 request.full(),
@@ -2928,10 +2990,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private CompletableFuture<UpdateAllCommand> 
validateAtTimestampAndBuildUpdateAllCommand(
             ReadWriteMultiRowPkReplicaRequest request,
             Map<UUID, BinaryRowMessage> rowsToUpdate,
+            Map<UUID, HybridTimestamp> lastCommitTimes,
             String txCoordinatorId
     ) {
         return validateAtTimestampAndBuildUpdateAllCommand(
                 rowsToUpdate,
+                lastCommitTimes,
                 request.commitPartitionId(),
                 request.transactionId(),
                 request.full(),
@@ -2951,6 +3015,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      */
     private CompletableFuture<UpdateAllCommand> 
validateAtTimestampAndBuildUpdateAllCommand(
             Map<UUID, BinaryRowMessage> rowsToUpdate,
+            Map<UUID, HybridTimestamp> lastCommitTimes,
             TablePartitionIdMessage commitPartitionId,
             UUID transactionId,
             boolean full,
@@ -2970,6 +3035,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             .full(full)
                             .txCoordinatorId(txCoordinatorId)
                             .requiredCatalogVersion(catalogVersion)
+                            .lastCommitTimestampsLong(
+                                    // Also make sure lastCommitTimes contains 
only those entries that match rowsToUpdate.
+                                    lastCommitTimes.entrySet().stream()
+                                            .filter(entry -> 
rowsToUpdate.containsKey(entry.getKey()))
+                                            .collect(toMap(Entry::getKey, 
entry -> entry.getValue().longValue()))
+                            )
                             .build();
                 });
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java
similarity index 51%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java
index 821e449311..2dee94ab59 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java
@@ -15,34 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.command;
+package org.apache.ignite.internal.table.distributed.replicator;
 
-import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
-import org.apache.ignite.network.annotations.Transferable;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * State machine command to update a row specified by a row id.
+ * Row with time.
  */
-@Transferable(TableMessageGroup.Commands.UPDATE)
-public interface UpdateCommand extends PartitionCommand {
-    TablePartitionIdMessage tablePartitionId();
+class TimedBinaryRow {
 
-    UUID rowUuid();
+    private final @Nullable BinaryRow binaryRow;
 
-    @Nullable
-    BinaryRowMessage rowMessage();
+    private final @Nullable HybridTimestamp commitTimestamp;
 
-    String txCoordinatorId();
+    TimedBinaryRow(@Nullable BinaryRow binaryRow, @Nullable HybridTimestamp 
commitTimestamp) {
+        this.binaryRow = binaryRow;
+        this.commitTimestamp = commitTimestamp;
+    }
+
+    TimedBinaryRow(@Nullable BinaryRow binaryRow) {
+        this(binaryRow, null);
+    }
 
-    /** Returns the row to update or {@code null} if the row should be 
removed. */
-    @Nullable
-    default BinaryRow row() {
-        BinaryRowMessage message = rowMessage();
+    public @Nullable BinaryRow binaryRow() {
+        return binaryRow;
+    }
 
-        return message == null ? null : message.asBinaryRow();
+    public @Nullable HybridTimestamp commitTimestamp() {
+        return commitTimestamp;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index fdad65a016..956bc69fd6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -140,7 +140,7 @@ public class TransactionStateResolver {
     public CompletableFuture<TransactionMeta> resolveTxState(
             UUID txId,
             ReplicationGroupId commitGrpId,
-            HybridTimestamp timestamp
+            @Nullable HybridTimestamp timestamp
     ) {
         TxStateMeta localMeta = txManager.stateMeta(txId);
 
@@ -176,7 +176,7 @@ public class TransactionStateResolver {
             UUID txId,
             @Nullable TxStateMeta localMeta,
             ReplicationGroupId commitGrpId,
-            HybridTimestamp timestamp,
+            @Nullable HybridTimestamp timestamp,
             CompletableFuture<TransactionMeta> txMetaFuture
     ) {
         assert localMeta == null || !isFinalState(localMeta.txState()) : 
"Unexpected tx meta [txId" + txId + ", meta=" + localMeta + ']';
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index b604afee6b..db2084f269 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
 import static java.util.stream.Collectors.toList;
 import static org.mockito.Mockito.mock;
@@ -170,9 +171,13 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
     }
 
     static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable 
BinaryRow row) {
+        addWrite(handler, rowUuid, row, null);
+    }
+
+    static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable 
BinaryRow row, @Nullable HybridTimestamp lastCommitTime) {
         TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
 
-        handler.handleUpdate(TX_ID, rowUuid, partitionId, row, false, null, 
null);
+        handler.handleUpdate(TX_ID, rowUuid, partitionId, row, false, null, 
null, lastCommitTime);
     }
 
     static BinaryRow defaultRow() {
@@ -232,7 +237,8 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
         USE_UPDATE {
             @Override
             void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
-                handler.handleUpdate(TX_ID, rowUuid, partitionId, row, true, 
null, null);
+                // TODO: perhaps need to pass last commit time as a param
+                handler.handleUpdate(TX_ID, rowUuid, partitionId, row, true, 
null, null, null);
             }
         },
         /** Uses updateAll api. */
@@ -252,7 +258,8 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
                         partitionId,
                         true,
                         null,
-                        null
+                        null,
+                        emptyMap()
                 );
             }
         };
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
index 75cf522ac1..de61b6b648 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.internal.table.distributed;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.clearInvocations;
@@ -92,6 +95,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
     private StorageUpdateHandler storageUpdateHandler;
 
     private GcUpdateHandler gcUpdateHandler;
+    private IndexUpdateHandler indexUpdateHandler;
 
     @BeforeEach
     void setUp(@InjectConfiguration GcConfiguration gcConfig) {
@@ -143,7 +147,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         TestPartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(tableId, PARTITION_ID, storage);
 
-        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes));
+        indexUpdateHandler = spy(new 
IndexUpdateHandler(DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes)));
 
         gcUpdateHandler = new GcUpdateHandler(
                 partitionDataStorage,
@@ -171,9 +175,9 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
 
-        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row1, true, null, null);
-        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row2, true, null, null);
-        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row3, true, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row1, true, null, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row2, true, null, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row3, true, null, null, null);
 
         assertEquals(3, storage.rowsCount());
 
@@ -194,9 +198,9 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
 
-        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row1, true, null, null);
-        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row2, true, null, null);
-        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row3, true, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row1, true, null, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row2, true, null, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), 
partitionId, row3, true, null, null, null);
 
         assertEquals(3, storage.rowsCount());
         // We have three writes to the storage.
@@ -236,9 +240,9 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         UUID row2Id = UUID.randomUUID();
         UUID row3Id = UUID.randomUUID();
 
-        storageUpdateHandler.handleUpdate(txUuid, row1Id, partitionId, row1, 
false, null, null);
-        storageUpdateHandler.handleUpdate(txUuid, row2Id, partitionId, row2, 
false, null, null);
-        storageUpdateHandler.handleUpdate(txUuid, row3Id, partitionId, row3, 
false, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, row1Id, partitionId, row1, 
false, null, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, row2Id, partitionId, row2, 
false, null, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, row3Id, partitionId, row3, 
false, null, null, null);
 
         assertEquals(3, storage.rowsCount());
 
@@ -264,7 +268,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         UUID newTxUuid = UUID.randomUUID();
 
         // Insert new write intent in the new transaction.
-        storageUpdateHandler.handleUpdate(newTxUuid, row1Id, partitionId, 
row4, true, null, null);
+        storageUpdateHandler.handleUpdate(newTxUuid, row1Id, partitionId, 
row4, true, null, null, null);
 
         // And concurrently the other two intents were also resolved and 
scheduled for cleanup.
         storageUpdateHandler.handleWriteIntentRead(txUuid, new 
RowId(PARTITION_ID, row2Id));
@@ -279,4 +283,301 @@ public class StorageCleanupTest extends 
BaseMvStoragesTest {
         // Only those two entries will be affected.
         verify(storage, times(2)).commitWrite(any(), any());
     }
+
+    @Test
+    void testCleanupBeforeUpdateNoData() {
+        UUID runningTx = UUID.randomUUID();
+
+        TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+        BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, 
"bar"));
+
+        UUID rowId = UUID.randomUUID();
+
+        HybridTimestamp commitTs = CLOCK.now();
+
+        storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1, 
false, null, null, commitTs);
+
+        verify(storage, never()).commitWrite(any(), any());
+        verify(storage, never()).abortWrite(any());
+        verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), 
any());
+    }
+
+    @Test
+    void testCleanupBeforeUpdateNoWriteIntent() {
+        UUID committedTx = UUID.randomUUID();
+
+        TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+        // First commit a row
+
+        BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, 
"bar"));
+
+        UUID rowId = UUID.randomUUID();
+
+        HybridTimestamp commitTs = CLOCK.now();
+
+        storageUpdateHandler.handleUpdate(committedTx, rowId, partitionId, 
row1, true, null, null, null);
+
+        storageUpdateHandler.handleTransactionCleanup(committedTx, true, 
commitTs);
+
+        assertEquals(1, storage.rowsCount());
+
+        assertFalse(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        // Now create a new write intent over the committed data. No cleanup 
should be triggered.
+
+        BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, 
"baz"));
+
+        UUID runningTx = UUID.randomUUID();
+
+        clearInvocations(storage, indexUpdateHandler);
+
+        storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row2, 
true, null, null, commitTs);
+
+        verify(storage, never()).commitWrite(any(), any());
+        verify(storage, never()).abortWrite(any());
+        verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), 
any());
+    }
+
+    @Test
+    void testCleanupBeforeUpdateSameTxOnlyWriteIntent() {
+        UUID runningTx = UUID.randomUUID();
+
+        TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+        // Create a write intent.
+
+        BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, 
"bar"));
+
+        UUID rowId = UUID.randomUUID();
+
+        HybridTimestamp commitTs = CLOCK.now();
+
+        storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1, 
true, null, null, null);
+
+        assertEquals(1, storage.rowsCount());
+
+        assertTrue(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        // Then create another one for the same row in the same transaction. 
The entry will be replaced.
+
+        BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, 
"baz"));
+
+        clearInvocations(storage, indexUpdateHandler);
+
+        storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row2, 
true, null, null, commitTs);
+
+        assertEquals(1, storage.rowsCount());
+
+        assertTrue(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        verify(storage, never()).commitWrite(any(), any());
+        verify(storage, never()).abortWrite(any());
+        verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(), 
any(), any());
+    }
+
+    @Test
+    void testCleanupBeforeUpdateDifferentTxOnlyWriteIntent() {
+        UUID runningTx = UUID.randomUUID();
+
+        TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+        // Create a new write intent.
+
+        BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, 
"bar"));
+
+        UUID rowId = UUID.randomUUID();
+
+        HybridTimestamp commitTs = CLOCK.now();
+
+        storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1, 
true, null, null, null);
+
+        assertEquals(1, storage.rowsCount());
+
+        assertTrue(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        // Create another one and pass `last commit time`. The previous value 
should be committed automatically.
+
+        BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, 
"baz"));
+
+        clearInvocations(storage, indexUpdateHandler);
+
+        UUID runningTx2 = UUID.randomUUID();
+
+        // Previous value will be committed even though the cleanup was not 
called explicitly.
+        storageUpdateHandler.handleUpdate(runningTx2, rowId, partitionId, 
row2, true, null, null, commitTs);
+
+        assertEquals(1, storage.rowsCount());
+
+        assertTrue(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        verify(storage, times(1)).commitWrite(any(), any());
+        verify(storage, never()).abortWrite(any());
+        verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), 
any());
+    }
+
+    @Test
+    void testCleanupBeforeUpdateAbortWriteIntent() {
+        UUID committed1 = UUID.randomUUID();
+
+        TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+        // First commit an entry.
+
+        BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, 
"bar"));
+
+        UUID rowId = UUID.randomUUID();
+
+        HybridTimestamp commitTs = CLOCK.now();
+
+        storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, 
row1, true, null, null, null);
+
+        storageUpdateHandler.handleTransactionCleanup(committed1, true, 
commitTs);
+
+        assertEquals(1, storage.rowsCount());
+
+        assertFalse(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        // Now add a new write intent.
+
+        BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, 
"baz"));
+
+        UUID committed2 = UUID.randomUUID();
+
+        storageUpdateHandler.handleUpdate(committed2, rowId, partitionId, 
row2, true, null, null, null);
+
+        assertTrue(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        // Create another write intent and provide `last commit time`.
+
+        clearInvocations(storage, indexUpdateHandler);
+
+        UUID runningTx = UUID.randomUUID();
+
+        BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, 
"zzu"));
+
+        // Last commit time equal to the time of the previously committed 
value => previous write intent will be reverted.
+        storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row3, 
true, null, null, commitTs);
+
+        assertEquals(1, storage.rowsCount());
+
+        assertTrue(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        verify(storage, never()).commitWrite(any(), any());
+        verify(storage, times(1)).abortWrite(any());
+        verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(), 
any(), any());
+    }
+
+    @Test
+    void testCleanupBeforeUpdateCommitWriteIntent() {
+        UUID committed1 = UUID.randomUUID();
+
+        TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+        // First commit an entry.
+
+        BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, 
"bar"));
+
+        UUID rowId = UUID.randomUUID();
+
+        HybridTimestamp commitTs = CLOCK.now();
+
+        storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, 
row1, true, null, null, null);
+
+        storageUpdateHandler.handleTransactionCleanup(committed1, true, 
commitTs);
+
+        assertEquals(1, storage.rowsCount());
+
+        assertFalse(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        // Now add a new write intent.
+
+        BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, 
"baz"));
+
+        UUID committed2 = UUID.randomUUID();
+
+        storageUpdateHandler.handleUpdate(committed2, rowId, partitionId, 
row2, true, null, null, null);
+
+        assertTrue(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        // Create another write intent and provide `last commit time`.
+
+        clearInvocations(storage, indexUpdateHandler);
+
+        UUID runningTx = UUID.randomUUID();
+
+        BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, 
"zzu"));
+
+        HybridTimestamp lastCommitTs = commitTs.addPhysicalTime(100);
+
+        // Last commit time is after the time of the previously committed 
value => previous write intent will be committed.
+        storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row3, 
true, null, null, lastCommitTs);
+
+        assertEquals(1, storage.rowsCount());
+
+        assertTrue(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        verify(storage, times(1)).commitWrite(any(), any());
+        verify(storage, never()).abortWrite(any());
+        verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), 
any());
+    }
+
+    @Test
+    void testCleanupBeforeUpdateError() {
+        UUID committed1 = UUID.randomUUID();
+
+        TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+        // First commit an entry.
+
+        BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, 
"bar"));
+
+        UUID rowId = UUID.randomUUID();
+
+        HybridTimestamp commitTs = CLOCK.now();
+
+        storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, 
row1, true, null, null, null);
+
+        storageUpdateHandler.handleTransactionCleanup(committed1, true, 
commitTs);
+
+        assertEquals(1, storage.rowsCount());
+
+        assertFalse(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        // Now add a new write intent.
+
+        BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, 
"baz"));
+
+        UUID committed2 = UUID.randomUUID();
+
+        storageUpdateHandler.handleUpdate(committed2, rowId, partitionId, 
row2, true, null, null, null);
+
+        assertTrue(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        // Create another write intent and provide `last commit time`.
+
+        clearInvocations(storage, indexUpdateHandler);
+
+        UUID runningTx = UUID.randomUUID();
+
+        BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, 
"zzu"));
+
+        // This should lead to an exception
+        HybridTimestamp lastCommitTs = commitTs.subtractPhysicalTime(100);
+
+        // Last commit time is before the time of the previously committed 
value => this should not happen.
+        assertThrows(AssertionError.class, () ->
+                storageUpdateHandler.handleUpdate(runningTx, rowId, 
partitionId, row3, true, null, null, lastCommitTs));
+
+
+        assertEquals(1, storage.rowsCount());
+
+        assertTrue(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        verify(storage, never()).commitWrite(any(), any());
+        verify(storage, never()).abortWrite(any());
+        verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), 
any());
+    }
+
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
index 22d981e0f4..1934ed5477 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import static java.util.Collections.emptyMap;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -116,6 +117,7 @@ public class StorageUpdateHandlerTest extends 
BaseIgniteAbstractTest {
                 null,
                 false,
                 null,
+                null,
                 null
         );
 
@@ -138,7 +140,8 @@ public class StorageUpdateHandlerTest extends 
BaseIgniteAbstractTest {
                 new TablePartitionId(TABLE_ID, PARTITION_ID),
                 false,
                 null,
-                null
+                null,
+                emptyMap()
         );
 
         verify(partitionStorage).peek(lwm);
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 5ccba2992c..028d4a47b9 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
@@ -1769,7 +1770,7 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
      * @param rows Rows.
      * @param expected Expected values.
      */
-    private static void validateBalance(Collection<Tuple> rows, @Nullable 
Double... expected) {
+    protected static void validateBalance(Collection<Tuple> rows, @Nullable 
Double... expected) {
         assertThat(
                 rows.stream().map(tuple -> tuple == null ? null : 
tuple.doubleValue("balance")).collect(toList()),
                 contains(expected)
@@ -1970,12 +1971,20 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
 
     @Test
     public void testTransactionAlreadyCommitted() {
-        testTransactionAlreadyFinished(true);
+        testTransactionAlreadyFinished(true, (transaction, uuid) -> {
+            transaction.commit();
+
+            log.info("Committed transaction {}", uuid);
+        });
     }
 
     @Test
     public void testTransactionAlreadyRolledback() {
-        testTransactionAlreadyFinished(false);
+        testTransactionAlreadyFinished(false, (transaction, uuid) -> {
+            transaction.rollback();
+
+            log.info("Rolled back transaction {}", uuid);
+        });
     }
 
     @Test
@@ -2075,7 +2084,7 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
      *
      * @param commit True when transaction is committed, false the transaction 
is rolled back.
      */
-    private void testTransactionAlreadyFinished(boolean commit) {
+    protected void testTransactionAlreadyFinished(boolean commit, 
BiConsumer<Transaction, UUID> finisher) {
         Transaction tx = igniteTransactions.begin();
 
         var txId = ((ReadWriteTransactionImpl) tx).id();
@@ -2091,15 +2100,7 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
 
         validateBalance(res, 100., 200.);
 
-        if (commit) {
-            tx.commit();
-
-            log.info("Committed transaction {}", txId);
-        } else {
-            tx.rollback();
-
-            log.info("Rolled back transaction {}", txId);
-        }
+        finisher.accept(tx, txId);
 
         TransactionException ex = assertThrows(TransactionException.class, () 
-> accountsRv.get(tx, makeKey(1)));
         assertTrue(ex.getMessage().contains("Transaction is already 
finished."));


Reply via email to