This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new fb3c5e2033 IGNITE-15927 One-phase commit - Fixes #2329.
fb3c5e2033 is described below

commit fb3c5e2033629f9b5b92e23d9bb999b08d6c3c7b
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Mon Aug 7 18:31:46 2023 +0300

    IGNITE-15927 One-phase commit - Fixes #2329.
    
    Signed-off-by: Alexey Scherbakov <[email protected]>
---
 .../ignite/internal/util/ExceptionUtils.java       |   8 +-
 ...eFiveFunction.java => IgnitePentaFunction.java} |   2 +-
 ...eTetraFunction.java => IgniteQuadFunction.java} |   2 +-
 .../ignite/internal/replicator/ReplicaManager.java |  25 ++-
 .../ignite/internal/replicator/ReplicaService.java |   2 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   1 +
 .../ItAbstractInternalTableScanTest.java           |   4 +-
 .../table/distributed/StorageUpdateHandler.java    |  40 +++--
 .../distributed/command/PartitionCommand.java      |   5 +
 .../distributed/raft/PartitionDataStorage.java     |   9 +
 .../table/distributed/raft/PartitionListener.java  |  27 ++-
 .../SnapshotAwarePartitionDataStorage.java         |   8 +
 .../request/ReadWriteMultiRowReplicaRequest.java   |   5 +
 .../ReadWriteScanRetrieveBatchReplicaRequest.java  |   4 +
 .../request/ReadWriteSingleRowReplicaRequest.java  |   5 +
 .../request/ReadWriteSwapRowReplicaRequest.java    |   5 +
 .../replicator/PartitionReplicaListener.java       | 183 ++++++++++++-------
 .../distributed/storage/InternalTableImpl.java     | 195 ++++++++++++---------
 .../apache/ignite/internal/table/TxLocalTest.java  |  56 ++++--
 .../internal/table/distributed/IndexBaseTest.java  |   7 +-
 .../distributed/StorageUpdateHandlerTest.java      |   2 +
 .../distributed/TestPartitionDataStorage.java      |   5 +
 .../ignite/internal/table/TxAbstractTest.java      |  31 +++-
 .../table/impl/DummyInternalTableImpl.java         |  79 ++++-----
 .../ignite/internal/tx/InternalTransaction.java    |   1 -
 .../internal/tx/impl/ReadWriteTransactionImpl.java |  29 +--
 .../apache/ignite/internal/tx/TxManagerTest.java   |   5 +-
 27 files changed, 471 insertions(+), 274 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index 30dec96249..0f66d776cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -43,7 +43,7 @@ import org.apache.ignite.lang.IgniteCheckedException;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IgniteTetraFunction;
+import org.apache.ignite.lang.IgniteQuadFunction;
 import org.apache.ignite.lang.IgniteTriFunction;
 import org.apache.ignite.lang.TraceableException;
 import org.jetbrains.annotations.Nullable;
@@ -392,7 +392,7 @@ public final class ExceptionUtils {
      * @return New exception with the given cause.
      */
     public static <T extends Exception> T withCause(
-            IgniteTetraFunction<UUID, Integer, String, Throwable, T> supplier,
+            IgniteQuadFunction<UUID, Integer, String, Throwable, T> supplier,
             int defaultCode,
             String message,
             Throwable t
@@ -432,7 +432,7 @@ public final class ExceptionUtils {
      * @return New exception with the given cause.
      */
     public static <T extends Exception> T withCauseAndCode(
-            IgniteTetraFunction<UUID, Integer, String, Throwable, T> supplier,
+            IgniteQuadFunction<UUID, Integer, String, Throwable, T> supplier,
             int code,
             String message,
             Throwable t
@@ -450,7 +450,7 @@ public final class ExceptionUtils {
      * @return New
      */
     private static <T extends Exception> T withCauseInternal(
-            IgniteTetraFunction<UUID, Integer, String, Throwable, T> supplier,
+            IgniteQuadFunction<UUID, Integer, String, Throwable, T> supplier,
             int defaultCode,
             Throwable t
     ) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFiveFunction.java 
b/modules/core/src/main/java/org/apache/ignite/lang/IgnitePentaFunction.java
similarity index 97%
rename from 
modules/core/src/main/java/org/apache/ignite/lang/IgniteFiveFunction.java
rename to 
modules/core/src/main/java/org/apache/ignite/lang/IgnitePentaFunction.java
index 2372f91e26..fa491b0fac 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFiveFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgnitePentaFunction.java
@@ -36,7 +36,7 @@ import java.util.function.Function;
  * @see Function
  */
 @FunctionalInterface
-public interface IgniteFiveFunction<T, U, V, M, N, R> {
+public interface IgnitePentaFunction<T, U, V, M, N, R> {
     /**
      * Applies this function to the given arguments.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteTetraFunction.java 
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteQuadFunction.java
old mode 100755
new mode 100644
similarity index 97%
rename from 
modules/core/src/main/java/org/apache/ignite/lang/IgniteTetraFunction.java
rename to 
modules/core/src/main/java/org/apache/ignite/lang/IgniteQuadFunction.java
index ecbece826b..ed6dc6d39f
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteTetraFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteQuadFunction.java
@@ -35,7 +35,7 @@ import java.util.function.Function;
  * @see Function
  */
 @FunctionalInterface
-public interface IgniteTetraFunction<T, U, V, M, R> {
+public interface IgniteQuadFunction<T, U, V, M, R> {
     /**
      * Applies this function to the given arguments.
      *
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index a6db95fbb9..37b1950182 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -226,20 +226,27 @@ public class ReplicaManager implements IgniteComponent {
                 return;
             }
 
+            HybridTimestamp sendTimestamp = null;
+
+            if (requestTimestamp != null) {
+                sendTimestamp = clock.update(requestTimestamp);
+            }
+
             // replicaFut is always completed here.
             Replica replica = replicaFut.join();
 
             CompletableFuture<?> result = replica.processRequest(request);
 
+            HybridTimestamp finalSendTimestamp = sendTimestamp;
             result.handle((res, ex) -> {
                 NetworkMessage msg;
 
                 if (ex == null) {
-                    msg = prepareReplicaResponse(requestTimestamp, res);
+                    msg = prepareReplicaResponse(finalSendTimestamp, res);
                 } else {
                     LOG.warn("Failed to process replica request [request={}]", 
ex, request);
 
-                    msg = prepareReplicaErrorResponse(requestTimestamp, ex);
+                    msg = prepareReplicaErrorResponse(finalSendTimestamp, ex);
                 }
 
                 clusterNetSvc.messagingService().respond(senderConsistentId, 
msg, correlationId);
@@ -361,7 +368,7 @@ public class ReplicaManager implements IgniteComponent {
     /**
      * Internal method for starting a replica.
      *
-     * @param replicaGrpId   Replication group id.
+     * @param replicaGrpId Replication group id.
      * @param whenReplicaReady Future that completes when the replica become 
ready.
      * @param listener Replica listener.
      * @param raftClient Topology aware Raft client.
@@ -558,12 +565,12 @@ public class ReplicaManager implements IgniteComponent {
     /**
      * Prepares replica response.
      */
-    private NetworkMessage prepareReplicaResponse(HybridTimestamp 
requestTimestamp, Object result) {
-        if (requestTimestamp != null) {
+    private NetworkMessage prepareReplicaResponse(@Nullable HybridTimestamp 
sendTimestamp, Object result) {
+        if (sendTimestamp != null) {
             return REPLICA_MESSAGES_FACTORY
                     .timestampAwareReplicaResponse()
                     .result(result)
-                    .timestampLong(clock.update(requestTimestamp).longValue())
+                    .timestampLong(sendTimestamp.longValue())
                     .build();
         } else {
             return REPLICA_MESSAGES_FACTORY
@@ -576,12 +583,12 @@ public class ReplicaManager implements IgniteComponent {
     /**
      * Prepares replica error response.
      */
-    private NetworkMessage prepareReplicaErrorResponse(HybridTimestamp 
requestTimestamp, Throwable ex) {
-        if (requestTimestamp != null) {
+    private NetworkMessage prepareReplicaErrorResponse(@Nullable 
HybridTimestamp sendTimestamp, Throwable ex) {
+        if (sendTimestamp != null) {
             return REPLICA_MESSAGES_FACTORY
                     .errorTimestampAwareReplicaResponse()
                     .throwable(ex)
-                    .timestampLong(clock.update(requestTimestamp).longValue())
+                    .timestampLong(sendTimestamp.longValue())
                     .build();
         } else {
             return REPLICA_MESSAGES_FACTORY
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 2360a20a8b..712122fb4c 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -48,7 +48,7 @@ import org.apache.ignite.network.NetworkMessage;
  */
 public class ReplicaService {
     /** Network timeout. */
-    private static final int RPC_TIMEOUT = 3000;
+    private static final long RPC_TIMEOUT = 3000;
 
     /** Message service. */
     private final MessagingService messagingService;
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index c7b6996dd8..ef6b508582 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -951,6 +951,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
      * checks that the table created before node stop, is not available when 
majority if lost.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20137";)
     public void testOneNodeRestartWithGap() throws InterruptedException {
         IgniteImpl ignite = startNode(0);
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index dabd1d3d2d..69f4accb17 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -268,7 +268,7 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
             }
         });
 
-        gotExceptionLatch.await();
+        assertTrue(gotExceptionLatch.await(10_000, TimeUnit.MILLISECONDS));
 
         assertEquals(gotException.get().getCause().getClass(), 
StorageException.class);
 
@@ -352,7 +352,7 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
             }
         });
 
-        gotExceptionLatch.await();
+        assertTrue(gotExceptionLatch.await(10_000, TimeUnit.MILLISECONDS));
 
         assertEquals(gotException.get().getClass(), 
IllegalStateException.class);
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 146ff6620a..0d9ff37d99 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -103,13 +103,15 @@ public class StorageUpdateHandler {
      * @param commitPartitionId Commit partition id.
      * @param row Row.
      * @param onApplication Callback on application.
+     * @param commitTs Commit timestamp to use on autocommit.
      */
     public void handleUpdate(
             UUID txId,
             UUID rowUuid,
             TablePartitionId commitPartitionId,
             @Nullable BinaryRow row,
-            @Nullable Consumer<RowId> onApplication
+            @Nullable Consumer<RowId> onApplication,
+            @Nullable HybridTimestamp commitTs
     ) {
         indexUpdateHandler.waitIndexes();
 
@@ -120,11 +122,16 @@ public class StorageUpdateHandler {
 
             locker.lock(rowId);
 
-            BinaryRow oldRow = storage.addWrite(rowId, row, txId, commitTblId, 
commitPartId);
+            if (commitTs != null) {
+                storage.addWriteCommitted(rowId, row, commitTs);
+            } else {
+                BinaryRow oldRow = storage.addWrite(rowId, row, txId, 
commitTblId, commitPartId);
 
-            if (oldRow != null) {
-                // Previous uncommitted row should be removed from indexes.
-                tryRemovePreviousWritesIndex(rowId, oldRow);
+                if (oldRow != null) {
+                    assert commitTs == null : String.format("Expecting 
explicit txn: [txId=%s]", txId);
+                    // Previous uncommitted row should be removed from indexes.
+                    tryRemovePreviousWritesIndex(rowId, oldRow);
+                }
             }
 
             indexUpdateHandler.addToIndexes(row, rowId);
@@ -145,13 +152,15 @@ public class StorageUpdateHandler {
      * @param txId Transaction id.
      * @param rowsToUpdate Collection of rows to update.
      * @param commitPartitionId Commit partition id.
-     * @param onReplication On replication callback.
+     * @param onApplication Callback on application.
+     * @param commitTs Commit timestamp to use on autocommit.
      */
     public void handleUpdateAll(
             UUID txId,
             Map<UUID, BinaryRowMessage> rowsToUpdate,
             TablePartitionId commitPartitionId,
-            @Nullable Consumer<Collection<RowId>> onReplication
+            @Nullable Consumer<Collection<RowId>> onApplication,
+            @Nullable HybridTimestamp commitTs
     ) {
         indexUpdateHandler.waitIndexes();
 
@@ -171,19 +180,24 @@ public class StorageUpdateHandler {
 
                     locker.lock(rowId);
 
-                    BinaryRow oldRow = storage.addWrite(rowId, row, txId, 
commitTblId, commitPartId);
+                    if (commitTs != null) {
+                        storage.addWriteCommitted(rowId, row, commitTs);
+                    } else {
+                        BinaryRow oldRow = storage.addWrite(rowId, row, txId, 
commitTblId, commitPartId);
 
-                    if (oldRow != null) {
-                        // Previous uncommitted row should be removed from 
indexes.
-                        tryRemovePreviousWritesIndex(rowId, oldRow);
+                        if (oldRow != null) {
+                            assert commitTs == null : String.format("Expecting 
explicit txn: [txId=%s]", txId);
+                            // Previous uncommitted row should be removed from 
indexes.
+                            tryRemovePreviousWritesIndex(rowId, oldRow);
+                        }
                     }
 
                     rowIds.add(rowId);
                     indexUpdateHandler.addToIndexes(row, rowId);
                 }
 
-                if (onReplication != null) {
-                    onReplication.accept(rowIds);
+                if (onApplication != null) {
+                    onApplication.accept(rowIds);
                 }
             }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
index d28ed66ea4..36b776256f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
@@ -28,4 +28,9 @@ public interface PartitionCommand extends 
SafeTimePropagatingCommand {
      * Returns a transaction id.
      */
     UUID txId();
+
+    /**
+     * Returns {@code true} if a command represents a full (including all 
keys) transaction.
+     */
+    boolean full();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index 8d9f3426d5..945e414282 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -130,6 +130,15 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
     @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID 
txId, int commitTableId, int commitPartitionId)
             throws TxIdMismatchException, StorageException;
 
+    /**
+     * Write and commit the row in one step.
+     *
+     * @param rowId Row id.
+     * @param row Row (null to remove existing)
+     * @param commitTs Commit timestamp.
+     */
+    void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, 
HybridTimestamp commitTs);
+
     /**
      * Aborts a pending update of the ongoing uncommitted transaction. Invoked 
during rollback.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 41c1ee0d3c..d2cdede161 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -233,10 +233,14 @@ public class PartitionListener implements 
RaftGroupListener {
 
         storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), 
cmd.tablePartitionId().asTablePartitionId(), cmd.row(),
                 rowId -> {
-                    txsPendingRowIds.computeIfAbsent(cmd.txId(), entry -> new 
TreeSet<>()).add(rowId);
+                    // Cleanup is not required for one-phase transactions.
+                    if (!cmd.full()) {
+                        txsPendingRowIds.computeIfAbsent(cmd.txId(), entry -> 
new TreeSet<>()).add(rowId);
+                    }
 
                     storage.lastApplied(commandIndex, commandTerm);
-                }
+                },
+                cmd.full() ? cmd.safeTime() : null
         );
     }
 
@@ -253,13 +257,18 @@ public class PartitionListener implements 
RaftGroupListener {
             return;
         }
 
-        storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(), 
cmd.tablePartitionId().asTablePartitionId(), rowIds -> {
-            for (RowId rowId : rowIds) {
-                txsPendingRowIds.computeIfAbsent(cmd.txId(), entry0 -> new 
TreeSet<>()).add(rowId);
-            }
+        storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(), 
cmd.tablePartitionId().asTablePartitionId(),
+                rowIds -> {
+                    // Cleanup is not required for one-phase transactions.
+                    if (!cmd.full()) {
+                        for (RowId rowId : rowIds) {
+                            txsPendingRowIds.computeIfAbsent(cmd.txId(), 
entry0 -> new TreeSet<>()).add(rowId);
+                        }
+                    }
 
-            storage.lastApplied(commandIndex, commandTerm);
-        });
+                    storage.lastApplied(commandIndex, commandTerm);
+                },
+                cmd.full() ? cmd.safeTime() : null);
     }
 
     /**
@@ -365,7 +374,7 @@ public class PartitionListener implements RaftGroupListener 
{
      *
      * @param cmd Command.
      * @param commandIndex RAFT index of the command.
-     * @param commandTerm  RAFT term of the command.
+     * @param commandTerm RAFT term of the command.
      */
     private void handleSafeTimeSyncCommand(SafeTimeSyncCommand cmd, long 
commandIndex, long commandTerm) {
         // Skips the write command because the storage has already executed it.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 09144845e0..f1dbcc4404 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -117,6 +117,14 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
         return partitionStorage.addWrite(rowId, row, txId, commitTableId, 
commitPartitionId);
     }
 
+    @Override
+    public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, 
HybridTimestamp commitTs)
+            throws TxIdMismatchException, StorageException {
+        handleSnapshotInterference(rowId);
+
+        partitionStorage.addWriteCommitted(rowId, row, commitTs);
+    }
+
     @Override
     public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException 
{
         handleSnapshotInterference(rowId);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
index f2330dbdc3..8888ee65b6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
@@ -34,4 +34,9 @@ public interface ReadWriteMultiRowReplicaRequest extends 
MultipleRowReplicaReque
      */
     @Marshallable
     TablePartitionId commitPartitionId();
+
+    /**
+     * Return {@code true} if this is a full transaction.
+     */
+    boolean full();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java
index 39c29b1e48..6de6825362 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java
@@ -25,4 +25,8 @@ import org.apache.ignite.network.annotations.Transferable;
  */
 @Transferable(TableMessageGroup.RW_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST)
 public interface ReadWriteScanRetrieveBatchReplicaRequest extends 
ScanRetrieveBatchReplicaRequest, ReadWriteReplicaRequest {
+    /**
+     * Return {@code true} if this is a full transaction.
+     */
+    boolean full();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
index bd8b1344fd..b9e5edf122 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
@@ -34,4 +34,9 @@ public interface ReadWriteSingleRowReplicaRequest extends 
SingleRowReplicaReques
      */
     @Marshallable
     TablePartitionId commitPartitionId();
+
+    /**
+     * Return {@code true} if this is a full transaction.
+     */
+    boolean full();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
index 1018ad2a99..1f3f325f95 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
@@ -34,4 +34,9 @@ public interface ReadWriteSwapRowReplicaRequest extends 
SwapRowReplicaRequest, R
      */
     @Marshallable
     TablePartitionId commitPartitionId();
+
+    /**
+     * Return {@code true} if this is a full transaction.
+     */
+    boolean full();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 1979ef7730..5533971e1f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -147,6 +147,7 @@ import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Replicator;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteExceptionUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.network.ClusterNode;
@@ -334,19 +335,31 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         if (request instanceof ReadWriteSingleRowReplicaRequest) {
             var req = (ReadWriteSingleRowReplicaRequest) request;
 
-            return appendTxCommand(req.transactionId(), req.requestType(), () 
-> processSingleEntryAction(req));
+            return appendTxCommand(req.transactionId(), req.requestType(), 
req.full(), () -> processSingleEntryAction(req));
         } else if (request instanceof ReadWriteMultiRowReplicaRequest) {
             var req = (ReadWriteMultiRowReplicaRequest) request;
 
-            return appendTxCommand(req.transactionId(), req.requestType(), () 
-> processMultiEntryAction(req));
+            return appendTxCommand(req.transactionId(), req.requestType(), 
req.full(), () -> processMultiEntryAction(req));
         } else if (request instanceof ReadWriteSwapRowReplicaRequest) {
             var req = (ReadWriteSwapRowReplicaRequest) request;
 
-            return appendTxCommand(req.transactionId(), req.requestType(), () 
-> processTwoEntriesAction(req));
+            return appendTxCommand(req.transactionId(), req.requestType(), 
req.full(), () -> processTwoEntriesAction(req));
         } else if (request instanceof 
ReadWriteScanRetrieveBatchReplicaRequest) {
             var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;
 
-            return appendTxCommand(req.transactionId(), RequestType.RW_SCAN, 
() -> processScanRetrieveBatchAction(req));
+            // Implicit RW scan can be committed locally on a last batch or 
error.
+            return appendTxCommand(req.transactionId(), RequestType.RW_SCAN, 
false, () -> processScanRetrieveBatchAction(req)).handle(
+                    (rows, err) -> {
+                        if (req.full() && (err != null || rows.size() < 
req.batchSize())) {
+                            releaseTxLocks(req.transactionId());
+                        }
+
+                        if (err != null) {
+                            IgniteExceptionUtils.sneakyThrow(err);
+                        }
+
+                        return rows;
+                    });
         } else if (request instanceof ReadWriteScanCloseReplicaRequest) {
             processScanCloseAction((ReadWriteScanCloseReplicaRequest) request);
 
@@ -1338,29 +1351,37 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      *
      * @param txId Transaction id.
      * @param cmdType Command type.
+     * @param full {@code True} if a full transaction and can be immediately 
committed.
      * @param op Operation closure.
      * @param <T> Type of execution result.
      * @return A future object representing the result of the given operation.
      */
-    private <T> CompletableFuture<T> appendTxCommand(UUID txId, RequestType 
cmdType, Supplier<CompletableFuture<T>> op) {
+    private <T> CompletableFuture<T> appendTxCommand(UUID txId, RequestType 
cmdType, boolean full, Supplier<CompletableFuture<T>> op) {
         var fut = new CompletableFuture<T>();
 
-        txCleanupReadyFutures.compute(txId, (id, txOps) -> {
-            if (txOps == null) {
-                txOps = new TxCleanupReadyFutureList();
-            }
+        if (!full) {
+            txCleanupReadyFutures.compute(txId, (id, txOps) -> {
+                if (txOps == null) {
+                    txOps = new TxCleanupReadyFutureList();
+                }
 
-            if (txOps.state == TxState.ABORTED || txOps.state == 
TxState.COMMITED) {
-                fut.completeExceptionally(new 
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is 
already finished."));
-            } else {
-                txOps.futures.computeIfAbsent(cmdType, type -> new 
ArrayList<>()).add(fut);
-            }
+                if (txOps.state == TxState.ABORTED || txOps.state == 
TxState.COMMITED) {
+                    fut.completeExceptionally(
+                            new 
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is 
already finished."));
+                } else {
+                    txOps.futures.computeIfAbsent(cmdType, type -> new 
ArrayList<>()).add(fut);
+                }
 
-            return txOps;
-        });
+                return txOps;
+            });
+        }
 
         if (!fut.isDone()) {
             op.get().whenComplete((v, th) -> {
+                if (full) { // Fast unlock.
+                    releaseTxLocks(txId);
+                }
+
                 if (th != null) {
                     fut.completeExceptionally(th);
                 } else {
@@ -1487,6 +1508,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private CompletableFuture<Object> 
processMultiEntryAction(ReadWriteMultiRowReplicaRequest request) {
         UUID txId = request.transactionId();
         TablePartitionId committedPartitionId = request.commitPartitionId();
+        boolean full = request.full();
 
         assert committedPartitionId != null || request.requestType() == 
RequestType.RW_GET_ALL
                 : "Commit partition is null [type=" + request.requestType() + 
']';
@@ -1554,7 +1576,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(result);
                     }
 
-                    return 
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete, 
txId))
+                    return 
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete, 
txId, full))
                             .thenApply(ignored -> result);
                 });
             }
@@ -1590,7 +1612,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     CompletableFuture<Object> raftFut = 
rowIdsToDelete.isEmpty() ? completedFuture(null)
-                            : 
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete, 
txId));
+                            : 
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete, 
txId, full));
 
                     return raftFut.thenApply(ignored -> result);
                 });
@@ -1655,7 +1677,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return allOf(insertLockFuts)
                             .thenCompose(ignored -> applyUpdateAllCommand(
-                                    updateAllCommand(committedPartitionId, 
convertedMap, txId)))
+                                    updateAllCommand(committedPartitionId, 
convertedMap, txId, full)))
                             .thenApply(ignored -> {
                                 // Release short term locks.
                                 for (CompletableFuture<IgniteBiTuple<RowId, 
Collection<Lock>>> insertLockFut : insertLockFuts) {
@@ -1699,7 +1721,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(null);
                     }
 
-                    return 
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowsToUpdate, 
txId))
+                    return 
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowsToUpdate, 
txId, full))
                             .thenApply(ignored -> {
                                 // Release short term locks.
                                 for (CompletableFuture<IgniteBiTuple<RowId, 
Collection<Lock>>> rowIdFut : rowIdFuts) {
@@ -1749,23 +1771,38 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Raft future, see {@link 
#applyCmdWithExceptionHandling(Command)}.
      */
     private CompletableFuture<Object> applyUpdateCommand(UpdateCommand cmd) {
-        storageUpdateHandler.handleUpdate(
-                cmd.txId(),
-                cmd.rowUuid(),
-                cmd.tablePartitionId().asTablePartitionId(),
-                cmd.row(),
-                rowId -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
-                    if (v == null) {
-                        v = new TreeSet<>();
-                    }
+        if (!cmd.full()) {
+            storageUpdateHandler.handleUpdate(
+                    cmd.txId(),
+                    cmd.rowUuid(),
+                    cmd.tablePartitionId().asTablePartitionId(),
+                    cmd.row(),
+                    rowId -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
+                        if (v == null) {
+                            v = new TreeSet<>();
+                        }
 
-                    v.add(rowId);
+                        v.add(rowId);
 
-                    return v;
-                })
-        );
+                        return v;
+                    }),
+                    null);
+        }
 
-        return applyCmdWithExceptionHandling(cmd);
+        return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
+            // Try to avoid double write if an entry is already replicated.
+            if (cmd.full() && cmd.safeTime().compareTo(safeTime.current()) > 
0) {
+                storageUpdateHandler.handleUpdate(
+                        cmd.txId(),
+                        cmd.rowUuid(),
+                        cmd.tablePartitionId().asTablePartitionId(),
+                        cmd.row(),
+                        null,
+                        cmd.safeTime());
+            }
+
+            return res;
+        });
     }
 
     /**
@@ -1775,21 +1812,35 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Raft future, see {@link 
#applyCmdWithExceptionHandling(Command)}.
      */
     private CompletableFuture<Object> applyUpdateAllCommand(UpdateAllCommand 
cmd) {
-        storageUpdateHandler.handleUpdateAll(
-                cmd.txId(),
-                cmd.rowsToUpdate(),
-                cmd.tablePartitionId().asTablePartitionId(),
-                rowIds -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
-                    if (v == null) {
-                        v = new TreeSet<>();
-                    }
+        if (!cmd.full()) {
+            storageUpdateHandler.handleUpdateAll(
+                    cmd.txId(),
+                    cmd.rowsToUpdate(),
+                    cmd.tablePartitionId().asTablePartitionId(),
+                    rowIds -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
+                        if (v == null) {
+                            v = new TreeSet<>();
+                        }
 
-                    v.addAll(rowIds);
+                        v.addAll(rowIds);
 
-                    return v;
-                }));
+                        return v;
+                    }),
+                    null);
+        }
+
+        return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
+            if (cmd.full() && cmd.safeTime().compareTo(safeTime.current()) > 
0) {
+                storageUpdateHandler.handleUpdateAll(
+                        cmd.txId(),
+                        cmd.rowsToUpdate(),
+                        cmd.tablePartitionId().asTablePartitionId(),
+                        null,
+                        cmd.safeTime());
+            }
 
-        return applyCmdWithExceptionHandling(cmd);
+            return res;
+        });
     }
 
     /**
@@ -1802,6 +1853,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         UUID txId = request.transactionId();
         BinaryRow searchRow = request.binaryRow();
         TablePartitionId commitPartitionId = request.commitPartitionId();
+        boolean full = request.full();
 
         assert commitPartitionId != null || request.requestType() == 
RequestType.RW_GET :
                 "Commit partition is null [type=" + request.requestType() + 
']';
@@ -1825,7 +1877,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForDelete(row, rowId, txId)
                             .thenCompose(ignored -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId.uuid(), null, txId)))
+                                    updateCommand(commitPartitionId, 
rowId.uuid(), null, txId, full)))
                             .thenApply(ignored -> true);
                 });
             }
@@ -1837,7 +1889,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForDelete(row, rowId, txId)
                             .thenCompose(ignored -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId.uuid(), null, txId)))
+                                    updateCommand(commitPartitionId, 
rowId.uuid(), null, txId, full)))
                             .thenApply(ignored -> row);
                 });
             }
@@ -1854,7 +1906,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                 }
 
                                 return applyUpdateCommand(
-                                        updateCommand(commitPartitionId, 
validatedRowId.uuid(), null, txId))
+                                        updateCommand(commitPartitionId, 
validatedRowId.uuid(), null, txId, full))
                                         .thenApply(ignored -> true);
                             });
                 });
@@ -1869,7 +1921,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForInsert(searchRow, rowId0, txId)
                             .thenCompose(rowIdLock -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow, txId))
+                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow, txId, full))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
                                 // Release short term locks.
@@ -1891,7 +1943,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return lockFut
                             .thenCompose(rowIdLock -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow, txId))
+                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow, txId, full))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
                                 // Release short term locks.
@@ -1913,7 +1965,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return lockFut
                             .thenCompose(rowIdLock -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow, txId))
+                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow, txId, full))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
                                 // Release short term locks.
@@ -1931,7 +1983,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
                             .thenCompose(rowIdLock -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId.uuid(), searchRow, txId))
+                                    updateCommand(commitPartitionId, 
rowId.uuid(), searchRow, txId, full))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
                                 // Release short term locks.
@@ -1949,7 +2001,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
                             .thenCompose(rowLock -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId.uuid(), searchRow, txId))
+                                    updateCommand(commitPartitionId, 
rowId.uuid(), searchRow, txId, full))
                                     .thenApply(ignored -> rowLock))
                             .thenApply(rowIdLock -> {
                                 // Release short term locks.
@@ -2111,7 +2163,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             }
 
                             return applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
validatedRowId.get1().uuid(), newRow, txId))
+                                    updateCommand(commitPartitionId, 
validatedRowId.get1().uuid(), newRow, txId,
+                                            request.full()))
                                     .thenApply(ignored -> validatedRowId)
                                     .thenApply(rowIdLock -> {
                                         // Release short term locks.
@@ -2348,17 +2401,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
     }
 
-    /**
-     * Compounds a RAFT group unique name.
-     *
-     * @param tblId Table identifier.
-     * @param partition Number of table partitions.
-     * @return A RAFT group name.
-     */
-    private String partitionRaftGroupName(UUID tblId, int partition) {
-        return tblId + "_part_" + partition;
-    }
-
     /**
      * Method to construct {@link UpdateCommand} object.
      *
@@ -2366,13 +2408,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param rowUuid Row UUID.
      * @param row Row.
      * @param txId Transaction ID.
+     * @param full {@code True} if this is a full transaction.
      * @return Constructed {@link UpdateCommand} object.
      */
-    private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID 
rowUuid, @Nullable BinaryRow row, UUID txId) {
+    private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID 
rowUuid, @Nullable BinaryRow row, UUID txId, boolean full) {
         UpdateCommandBuilder bldr = MSG_FACTORY.updateCommand()
                 .tablePartitionId(tablePartitionId(tablePartId))
                 .rowUuid(rowUuid)
                 .txId(txId)
+                .full(full)
                 .safeTimeLong(hybridClock.nowLong());
 
         if (row != null) {
@@ -2393,14 +2437,17 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param tablePartId {@link TablePartitionId} object to construct {@link 
UpdateCommand} object with.
      * @param rowsToUpdate All {@link BinaryRow}s represented as {@link 
ByteBuffer}s to be updated.
      * @param txId Transaction ID.
+     * @param full {@code True} if full transaction.
      * @return Constructed {@link UpdateAllCommand} object.
      */
-    private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, 
Map<UUID, BinaryRowMessage> rowsToUpdate, UUID txId) {
+    private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, 
Map<UUID, BinaryRowMessage> rowsToUpdate, UUID txId,
+            boolean full) {
         return MSG_FACTORY.updateAllCommand()
                 .tablePartitionId(tablePartitionId(tablePartId))
                 .rowsToUpdate(rowsToUpdate)
                 .txId(txId)
                 .safeTimeLong(hybridClock.nowLong())
+                .full(full)
                 .build();
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index ce3b671430..792a5202a8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -21,6 +21,9 @@ import static 
it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITED;
+import static org.apache.ignite.internal.tx.TxState.PENDING;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
@@ -90,10 +93,10 @@ import 
org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteFiveFunction;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgnitePentaFunction;
 import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteTetraFunction;
+import org.apache.ignite.lang.IgniteTriFunction;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
@@ -217,14 +220,14 @@ public class InternalTableImpl implements InternalTable {
      * Enlists a single row into a transaction.
      *
      * @param row The row.
-     * @param tx The transaction.
-     * @param op Replica requests factory.
+     * @param tx The transaction, not null if explicit.
+     * @param fac Replica requests factory.
      * @return The future.
      */
     private <R> CompletableFuture<R> enlistInTx(
             BinaryRowEx row,
             @Nullable InternalTransaction tx,
-            IgniteTetraFunction<TablePartitionId, InternalTransaction, 
ReplicationGroupId, Long, ReplicaRequest> op
+            IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, 
ReplicaRequest> fac
     ) {
         // Check whether proposed tx is read-only. Complete future 
exceptionally if true.
         // Attempting to enlist a read-only in a read-write transaction does 
not corrupt the transaction itself, thus read-write transaction
@@ -251,9 +254,9 @@ public class InternalTableImpl implements InternalTable {
         CompletableFuture<R> fut;
 
         if (primaryReplicaAndTerm != null) {
-            TablePartitionId commitPart = tx.commitPartition();
+            assert !implicit;
 
-            ReplicaRequest request = op.apply(commitPart, tx0, partGroupId, 
primaryReplicaAndTerm.get2());
+            ReplicaRequest request = fac.apply(tx, partGroupId, 
primaryReplicaAndTerm.get2());
 
             try {
                 fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
@@ -263,15 +266,10 @@ public class InternalTableImpl implements InternalTable {
                 throw new TransactionException("Failed to invoke the replica 
request.");
             }
         } else {
-            fut = enlistWithRetry(
-                    tx0,
-                    partId,
-                    (commitPart, term) -> op.apply(commitPart, tx0, 
partGroupId, term),
-                    ATTEMPTS_TO_ENLIST_PARTITION
-            );
+            fut = enlistWithRetry(tx0, partId, term -> fac.apply(tx0, 
partGroupId, term), ATTEMPTS_TO_ENLIST_PARTITION);
         }
 
-        return postEnlist(fut, implicit, tx0);
+        return postEnlist(fut, false, tx0, implicit);
     }
 
     /**
@@ -279,14 +277,14 @@ public class InternalTableImpl implements InternalTable {
      *
      * @param keyRows Rows.
      * @param tx The transaction.
-     * @param op Replica requests factory.
+     * @param fac Replica requests factory.
      * @param reducer Transform reducer.
      * @return The future.
      */
     private <T> CompletableFuture<T> enlistInTx(
             Collection<BinaryRowEx> keyRows,
             @Nullable InternalTransaction tx,
-            IgniteFiveFunction<TablePartitionId, Collection<BinaryRow>, 
InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> op,
+            IgnitePentaFunction<Collection<BinaryRow>, InternalTransaction, 
ReplicationGroupId, Long, Boolean, ReplicaRequest> fac,
             Function<Collection<RowBatch>, CompletableFuture<T>> reducer
     ) {
         // Check whether proposed tx is read-only. Complete future 
exceptionally if true.
@@ -313,6 +311,8 @@ public class InternalTableImpl implements InternalTable {
 
         Int2ObjectMap<RowBatch> rowBatchByPartitionId = 
toRowBatchByPartitionId(keyRows);
 
+        boolean singlePart = rowBatchByPartitionId.size() == 1;
+
         for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : 
rowBatchByPartitionId.int2ObjectEntrySet()) {
             int partitionId = partitionRowBatch.getIntKey();
             RowBatch rowBatch = partitionRowBatch.getValue();
@@ -324,9 +324,8 @@ public class InternalTableImpl implements InternalTable {
             CompletableFuture<Object> fut;
 
             if (primaryReplicaAndTerm != null) {
-                TablePartitionId commitPart = tx.commitPartition();
-
-                ReplicaRequest request = op.apply(commitPart, 
rowBatch.requestedRows, tx0, partGroupId, primaryReplicaAndTerm.get2());
+                assert !implicit;
+                ReplicaRequest request = fac.apply(rowBatch.requestedRows, 
tx0, partGroupId, primaryReplicaAndTerm.get2(), false);
 
                 try {
                     fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), 
request);
@@ -339,7 +338,7 @@ public class InternalTableImpl implements InternalTable {
                 fut = enlistWithRetry(
                         tx0,
                         partitionId,
-                        (commitPart, term) -> op.apply(commitPart, 
rowBatch.requestedRows, tx0, partGroupId, term),
+                        term -> fac.apply(rowBatch.requestedRows, tx0, 
partGroupId, term, implicit && singlePart),
                         ATTEMPTS_TO_ENLIST_PARTITION
                 );
             }
@@ -349,7 +348,7 @@ public class InternalTableImpl implements InternalTable {
 
         CompletableFuture<T> fut = 
reducer.apply(rowBatchByPartitionId.values());
 
-        return postEnlist(fut, implicit, tx0);
+        return postEnlist(fut, implicit && !singlePart, tx0, implicit && 
singlePart);
     }
 
     /**
@@ -364,6 +363,7 @@ public class InternalTableImpl implements InternalTable {
      * @param upperBound Upper search bound.
      * @param flags Control flags. See {@link 
org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
      * @param columnsToInclude Row projection.
+     * @param implicit {@code True} if the implicit txn.
      * @return Batch of retrieved rows.
      */
     private CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(
@@ -376,7 +376,8 @@ public class InternalTableImpl implements InternalTable {
             @Nullable BinaryTuplePrefix lowerBound,
             @Nullable BinaryTuplePrefix upperBound,
             int flags,
-            @Nullable BitSet columnsToInclude
+            @Nullable BitSet columnsToInclude,
+            boolean implicit
     ) {
         TablePartitionId partGroupId = new TablePartitionId(tableId, partId);
 
@@ -395,6 +396,7 @@ public class InternalTableImpl implements InternalTable {
                 .upperBoundPrefix(binaryTupleMessage(upperBound))
                 .flags(flags)
                 .columnsToInclude(columnsToInclude)
+                .full(implicit) // Intent for one phase commit.
                 .batchSize(batchSize);
 
         if (primaryReplicaAndTerm != null) {
@@ -408,10 +410,10 @@ public class InternalTableImpl implements InternalTable {
                 throw new TransactionException("Failed to invoke the replica 
request.");
             }
         } else {
-            fut = enlistWithRetry(tx, partId, (commitPart, term) -> 
requestBuilder.term(term).build(), ATTEMPTS_TO_ENLIST_PARTITION);
+            fut = enlistWithRetry(tx, partId, term -> 
requestBuilder.term(term).build(), ATTEMPTS_TO_ENLIST_PARTITION);
         }
 
-        return postEnlist(fut, false, tx);
+        return postEnlist(fut, false, tx, false);
     }
 
     private @Nullable BinaryTupleMessage binaryTupleMessage(@Nullable 
BinaryTupleReader binaryTuple) {
@@ -430,14 +432,14 @@ public class InternalTableImpl implements InternalTable {
      *
      * @param tx Internal transaction.
      * @param partId Partition number.
-     * @param requestFunction Function to create replica request with new raft 
term.
+     * @param mapFunc Function to create replica request with new raft term.
      * @param attempts Number of attempts.
      * @return The future.
      */
     private <R> CompletableFuture<R> enlistWithRetry(
             InternalTransaction tx,
             int partId,
-            BiFunction<TablePartitionId, Long, ReplicaRequest> requestFunction,
+            Function<Long, ReplicaRequest> mapFunc,
             int attempts
     ) {
         CompletableFuture<R> result = new CompletableFuture<>();
@@ -447,7 +449,7 @@ public class InternalTableImpl implements InternalTable {
                             try {
                                 return replicaSvc.invoke(
                                         primaryReplicaAndTerm.get1(),
-                                        
requestFunction.apply(tx.commitPartition(), primaryReplicaAndTerm.get2())
+                                        
mapFunc.apply(primaryReplicaAndTerm.get2())
                                 );
                             } catch (PrimaryReplicaMissException e) {
                                 throw new TransactionException(e);
@@ -466,7 +468,7 @@ public class InternalTableImpl implements InternalTable {
                 .handle((res0, e) -> {
                     if (e != null) {
                         if (e.getCause() instanceof 
PrimaryReplicaMissException && attempts > 0) {
-                            return enlistWithRetry(tx, partId, 
requestFunction, attempts - 1).handle((r2, e2) -> {
+                            return enlistWithRetry(tx, partId, mapFunc, 
attempts - 1).handle((r2, e2) -> {
                                 if (e2 != null) {
                                     return result.completeExceptionally(e2);
                                 } else {
@@ -488,13 +490,22 @@ public class InternalTableImpl implements InternalTable {
      * Performs post enlist operation.
      *
      * @param fut The future.
-     * @param implicit {@code true} for implicit tx.
+     * @param autoCommit {@code True} for auto commit.
      * @param tx0 The transaction.
+     * @param full If this is full transaction.
      * @param <T> Operation return type.
      * @return The future.
      */
-    private <T> CompletableFuture<T> postEnlist(CompletableFuture<T> fut, 
boolean implicit, InternalTransaction tx0) {
+    private <T> CompletableFuture<T> postEnlist(CompletableFuture<T> fut, 
boolean autoCommit, InternalTransaction tx0, boolean full) {
+        assert !(autoCommit && full) : "Invalid combination of flags";
+
         return fut.handle((BiFunction<T, Throwable, CompletableFuture<T>>) (r, 
e) -> {
+            if (full) { // Full txn is already finished remotely. Just update 
local state.
+                // TODO: IGNITE-17638 TestOnly code, let's consider using Txn 
state map instead of states.
+                txManager.changeState(tx0.id(), PENDING, e == null ? COMMITED 
: ABORTED);
+                return e != null ? failedFuture(wrapReplicationException(e)) : 
completedFuture(r);
+            }
+
             if (e != null) {
                 RuntimeException e0 = wrapReplicationException(e);
 
@@ -508,7 +519,7 @@ public class InternalTableImpl implements InternalTable {
             } else {
                 tx0.enlistResultFuture(fut);
 
-                if (implicit) {
+                if (autoCommit) {
                     return tx0.commitAsync()
                             .exceptionally(ex -> {
                                 throw wrapReplicationException(ex);
@@ -531,14 +542,15 @@ public class InternalTableImpl implements InternalTable {
             return enlistInTx(
                     keyRow,
                     tx,
-                    (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                    (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                             .groupId(groupId)
                             .binaryRowMessage(serializeBinaryRow(keyRow))
-                            .commitPartitionId(commitPart)
+                            .commitPartitionId(txo.commitPartition())
                             .transactionId(txo.id())
                             .term(term)
                             .requestType(RequestType.RW_GET)
                             .timestampLong(clock.nowLong())
+                            .full(tx == null)
                             .build()
             );
         }
@@ -578,14 +590,15 @@ public class InternalTableImpl implements InternalTable {
             return enlistInTx(
                     keyRows,
                     tx,
-                    (commitPart, keyRows0, txo, groupId, term) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
+                    (keyRows0, txo, groupId, term, full) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
                             .groupId(groupId)
                             .binaryRowMessages(serializeBinaryRows(keyRows0))
-                            .commitPartitionId(commitPart)
+                            .commitPartitionId(txo.commitPartition())
                             .transactionId(txo.id())
                             .term(term)
                             .requestType(RequestType.RW_GET_ALL)
                             .timestampLong(clock.nowLong())
+                            .full(full)
                             .build(),
                     
InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder
             );
@@ -640,14 +653,15 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 row,
                 tx,
-                (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_UPSERT)
                         .timestampLong(clock.nowLong())
+                        .full(tx == null)
                         .build());
     }
 
@@ -671,11 +685,11 @@ public class InternalTableImpl implements InternalTable {
         CompletableFuture<Void> fut = enlistWithRetry(
                 tx,
                 partition,
-                (commitPart, term) -> upsertAllInternal(commitPart, rows, tx, 
partGroupId, term),
+                term -> upsertAllInternal(rows, tx, partGroupId, term, true),
                 ATTEMPTS_TO_ENLIST_PARTITION
         );
 
-        return postEnlist(fut, true, tx);
+        return postEnlist(fut, false, tx, true); // Will be committed in one 
RTT.
     }
 
     /** {@inheritDoc} */
@@ -684,14 +698,15 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 row,
                 tx,
-                (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_GET_AND_UPSERT)
                         .timestampLong(clock.nowLong())
+                        .full(tx == null)
                         .build()
         );
     }
@@ -702,14 +717,15 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 row,
                 tx,
-                (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_INSERT)
                         .timestampLong(clock.nowLong())
+                        .full(tx == null)
                         .build()
         );
     }
@@ -720,14 +736,15 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 rows,
                 tx,
-                (commitPart, keyRows0, txo, groupId, term) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
+                (keyRows0, txo, groupId, term, full) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .binaryRowMessages(serializeBinaryRows(keyRows0))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_INSERT_ALL)
                         .timestampLong(clock.nowLong())
+                        .full(full)
                         .build(),
                 InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
         );
@@ -739,14 +756,15 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 row,
                 tx,
-                (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_REPLACE_IF_EXIST)
                         .timestampLong(clock.nowLong())
+                        .full(tx == null)
                         .build()
         );
     }
@@ -757,15 +775,16 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 newRow,
                 tx,
-                (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSwapRowReplicaRequest()
+                (txo, groupId, term) -> 
tableMessagesFactory.readWriteSwapRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .oldBinaryRowMessage(serializeBinaryRow(oldRow))
                         .binaryRowMessage(serializeBinaryRow(newRow))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_REPLACE)
                         .timestampLong(clock.nowLong())
+                        .full(tx == null)
                         .build()
         );
     }
@@ -776,14 +795,15 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 row,
                 tx,
-                (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_GET_AND_REPLACE)
                         .timestampLong(clock.nowLong())
+                        .full(tx == null)
                         .build()
         );
     }
@@ -794,14 +814,15 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 keyRow,
                 tx,
-                (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .binaryRowMessage(serializeBinaryRow(keyRow))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE)
                         .timestampLong(clock.nowLong())
+                        .full(tx == null)
                         .build()
         );
     }
@@ -812,14 +833,15 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 oldRow,
                 tx,
-                (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .binaryRowMessage(serializeBinaryRow(oldRow))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE_EXACT)
                         .timestampLong(clock.nowLong())
+                        .full(tx == null)
                         .build()
         );
     }
@@ -830,14 +852,15 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 row,
                 tx,
-                (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_GET_AND_DELETE)
                         .timestampLong(clock.nowLong())
+                        .full(tx == null)
                         .build()
         );
     }
@@ -848,14 +871,15 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 rows,
                 tx,
-                (commitPart, keyRows0, txo, groupId, term) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
+                (keyRows0, txo, groupId, term, full) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .binaryRowMessages(serializeBinaryRows(keyRows0))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE_ALL)
                         .timestampLong(clock.nowLong())
+                        .full(full)
                         .build(),
                 InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
         );
@@ -870,14 +894,15 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 rows,
                 tx,
-                (commitPart, keyRows0, txo, groupId, term) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
+                (keyRows0, txo, groupId, term, full) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
                         .groupId(groupId)
-                        .commitPartitionId(commitPart)
+                        .commitPartitionId(txo.commitPartition())
                         .binaryRowMessages(serializeBinaryRows(keyRows0))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE_EXACT_ALL)
                         .timestampLong(clock.nowLong())
+                        .full(full)
                         .build(),
                 InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
         );
@@ -958,7 +983,7 @@ public class InternalTableImpl implements InternalTable {
                     return replicaSvc.invoke(recipientNode, request);
                 },
                 // TODO: IGNITE-17666 Close cursor tx finish.
-                Function.identity());
+                (unused, fut) -> fut);
     }
 
     @Override
@@ -1011,9 +1036,10 @@ public class InternalTableImpl implements InternalTable {
                         lowerBound,
                         upperBound,
                         flags,
-                        columnsToInclude
+                        columnsToInclude,
+                        implicit
                 ),
-                fut -> postEnlist(fut, implicit, tx0)
+                (commit, fut) -> postEnlist(fut, commit, tx0, implicit && 
!commit)
         );
     }
 
@@ -1060,12 +1086,13 @@ public class InternalTableImpl implements InternalTable 
{
                             .columnsToInclude(columnsToInclude)
                             .batchSize(batchSize)
                             .term(recipient.term())
+                            .full(false) // Set explicitly.
                             .build();
 
                     return replicaSvc.invoke(recipient.node(), request);
                 },
                 // TODO: IGNITE-17666 Close cursor tx finish.
-                Function.identity());
+                (unused, fut) -> fut);
     }
 
     /**
@@ -1348,7 +1375,7 @@ public class InternalTableImpl implements InternalTable {
         private final BiFunction<Long, Integer, 
CompletableFuture<Collection<BinaryRow>>> retrieveBatch;
 
         /** The closure will be invoked before the cursor closed. */
-        Function<CompletableFuture<Void>, CompletableFuture<Void>> onClose;
+        BiFunction<Boolean, CompletableFuture<Void>, CompletableFuture<Void>> 
onClose;
 
         /** True when the publisher has a subscriber, false otherwise. */
         private final AtomicBoolean subscribed;
@@ -1362,7 +1389,7 @@ public class InternalTableImpl implements InternalTable {
          */
         PartitionScanPublisher(
                 BiFunction<Long, Integer, 
CompletableFuture<Collection<BinaryRow>>> retrieveBatch,
-                Function<CompletableFuture<Void>, CompletableFuture<Void>> 
onClose
+                BiFunction<Boolean, CompletableFuture<Void>, 
CompletableFuture<Void>> onClose
         ) {
             this.retrieveBatch = retrieveBatch;
             this.onClose = onClose;
@@ -1420,7 +1447,7 @@ public class InternalTableImpl implements InternalTable {
             @Override
             public void request(long n) {
                 if (n <= 0) {
-                    cancel();
+                    cancel(null, true);
 
                     subscriber.onError(new 
IllegalArgumentException(IgniteStringFormatter
                             .format("Invalid requested amount of items 
[requested={}, minValue=1]", n))
@@ -1447,20 +1474,21 @@ public class InternalTableImpl implements InternalTable 
{
             /** {@inheritDoc} */
             @Override
             public void cancel() {
-                cancel(null);
+                cancel(null, true); // Explicit cancel.
             }
 
             /**
              * After the method is called, a subscriber won't be received 
updates from the publisher.
              *
              * @param t An exception which was thrown when entries were 
retrieving from the cursor.
+             * @param commit {@code True} to commit.
              */
-            public void cancel(Throwable t) {
+            private void cancel(Throwable t, boolean commit) {
                 if (!canceled.compareAndSet(false, true)) {
                     return;
                 }
 
-                onClose.apply(t == null ? completedFuture(null) : 
failedFuture(t)).handle((ignore, th) -> {
+                onClose.apply(commit, t == null ? completedFuture(null) : 
failedFuture(t)).handle((ignore, th) -> {
                     if (th != null) {
                         subscriber.onError(th);
                     } else {
@@ -1482,18 +1510,13 @@ public class InternalTableImpl implements InternalTable 
{
                 }
 
                 retrieveBatch.apply(scanId, n).thenAccept(binaryRows -> {
-                    if (binaryRows == null) {
-                        cancel();
-
-                        return;
-                    } else {
-                        assert binaryRows.size() <= n : "Rows more then 
requested " + binaryRows.size() + " " + n;
+                    assert binaryRows != null;
+                    assert binaryRows.size() <= n : "Rows more then requested 
" + binaryRows.size() + " " + n;
 
-                        binaryRows.forEach(subscriber::onNext);
-                    }
+                    binaryRows.forEach(subscriber::onNext);
 
                     if (binaryRows.size() < n) {
-                        cancel();
+                        cancel(null, false);
                     } else {
                         long remaining = 
requestedItemsCnt.addAndGet(Math.negateExact(binaryRows.size()));
 
@@ -1502,7 +1525,7 @@ public class InternalTableImpl implements InternalTable {
                         }
                     }
                 }).exceptionally(t -> {
-                    cancel(t);
+                    cancel(t, false);
 
                     return null;
                 });
@@ -1620,19 +1643,23 @@ public class InternalTableImpl implements InternalTable 
{
     }
 
     private ReplicaRequest upsertAllInternal(
-            TablePartitionId commitPart,
             Collection<? extends BinaryRow> keyRows0,
             InternalTransaction txo,
             ReplicationGroupId groupId,
-            Long term) {
+            Long term,
+            boolean full
+    ) {
+        assert txo.commitPartition() != null;
+
         return tableMessagesFactory.readWriteMultiRowReplicaRequest()
                 .groupId(groupId)
-                .commitPartitionId(commitPart)
+                .commitPartitionId(txo.commitPartition())
                 .binaryRowMessages(serializeBinaryRows(keyRows0))
                 .transactionId(txo.id())
                 .term(term)
                 .requestType(RequestType.RW_UPSERT_ALL)
                 .timestampLong(clock.nowLong())
+                .full(full)
                 .build();
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 6c08f536cf..64cdcb87ab 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -19,19 +19,23 @@ package org.apache.ignite.internal.table;
 
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
@@ -42,7 +46,6 @@ import 
org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.table.Table;
@@ -68,19 +71,44 @@ public class TxLocalTest extends TxAbstractTest {
 
         lockManager = new HeapLockManager();
 
-        ReplicaService replicaSvc = mock(ReplicaService.class, 
RETURNS_DEEP_STUBS);
-        PlacementDriver placementDriver = mock(PlacementDriver.class, 
RETURNS_DEEP_STUBS);
+        ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
+
+        TestHybridClock localClock = new TestHybridClock(() -> 1);
+        MessagingService msgSvc = mock(MessagingService.class, 
RETURNS_DEEP_STUBS);
+        ReplicaService replicaSvc = new ReplicaService(msgSvc, localClock);
 
         Map<ReplicationGroupId, DummyInternalTableImpl> tables = new 
HashMap<>();
+        doAnswer(invocationOnMock -> {
+            ReplicaRequest request = invocationOnMock.getArgument(1);
+            ReplicaListener replicaListener = 
tables.get(request.groupId()).getReplicaListener();
+
+            if (request instanceof TimestampAware) {
+                TimestampAware aware = (TimestampAware) request;
+                HybridTimestamp updated = 
DummyInternalTableImpl.CLOCK.update(aware.timestamp());
+
+                return replicaListener.invoke(request).handle((res, err) -> 
err == null ? replicaMessagesFactory
+                        .timestampAwareReplicaResponse()
+                        .result(res)
+                        .timestampLong(updated.longValue())
+                        .build() :
+                        replicaMessagesFactory
+                                .errorTimestampAwareReplicaResponse()
+                                .throwable(err)
+                                .timestampLong(updated.longValue())
+                                .build());
+            } else {
+                return replicaListener.invoke(request).handle((res, err) -> 
err == null ? replicaMessagesFactory
+                        .replicaResponse()
+                        .result(res)
+                        .build() : replicaMessagesFactory
+                        .errorReplicaResponse()
+                        .throwable(err)
+                        .build());
+            }
 
-        lenient().doAnswer(
-            invocationOnMock -> {
-                    ReplicaRequest request = invocationOnMock.getArgument(1);
-                    ReplicaListener replicaListener = 
tables.get(request.groupId()).getReplicaListener();
+        }).when(msgSvc).invoke((String) isNull(), any(), anyLong());
 
-                    return replicaListener.invoke(request);
-            }
-        ).when(replicaSvc).invoke(any(ClusterNode.class), any());
+        PlacementDriver placementDriver = mock(PlacementDriver.class, 
RETURNS_DEEP_STUBS);
 
         doAnswer(invocationOnMock -> {
             TxStateReplicaRequest request = invocationOnMock.getArgument(1);
@@ -89,7 +117,7 @@ public class TxLocalTest extends TxAbstractTest {
                     
tables.get(request.groupId()).txStateStorage().getTxStateStorage(0).get(request.txId()));
         }).when(placementDriver).sendMetaRequest(any(), any());
 
-        txManager = new TxManagerImpl(replicaSvc, lockManager, new 
HybridClockImpl(), new TransactionIdGenerator(0xdeadbeef));
+        txManager = new TxManagerImpl(replicaSvc, lockManager, localClock, new 
TransactionIdGenerator(0xdeadbeef));
 
         igniteTransactions = new IgniteTransactionsImpl(txManager);
 
@@ -101,8 +129,6 @@ public class TxLocalTest extends TxAbstractTest {
 
         customers = new TableImpl(table2, new 
DummySchemaManagerImpl(CUSTOMERS_SCHEMA), lockManager);
 
-        
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class, 
RETURNS_DEEP_STUBS));
-
         tables.put(table.groupId(), table);
         tables.put(table2.groupId(), table2);
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index 7307f934e4..f51b228ac1 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -163,7 +163,7 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
     static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable 
BinaryRow row) {
         TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
 
-        handler.handleUpdate(TX_ID, rowUuid, partitionId, row, (unused) -> {});
+        handler.handleUpdate(TX_ID, rowUuid, partitionId, row, (unused) -> {}, 
null);
     }
 
     static BinaryRow defaultRow() {
@@ -223,7 +223,7 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
         USE_UPDATE {
             @Override
             void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
-                handler.handleUpdate(TX_ID, rowUuid, partitionId, row, 
(unused) -> {});
+                handler.handleUpdate(TX_ID, rowUuid, partitionId, row, 
(unused) -> {}, null);
             }
         },
         /** Uses updateAll api. */
@@ -241,7 +241,8 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
                         TX_ID,
                         singletonMap(rowUuid, rowMessage),
                         partitionId,
-                        (unused) -> {}
+                        (unused) -> {},
+                        null
                 );
             }
         };
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
index 0a0ce0ee33..76308459c7 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
@@ -111,6 +111,7 @@ public class StorageUpdateHandlerTest {
                 UUID.randomUUID(),
                 new TablePartitionId(1, PARTITION_ID),
                 null,
+                null,
                 null
         );
 
@@ -131,6 +132,7 @@ public class StorageUpdateHandlerTest {
                 UUID.randomUUID(),
                 Map.of(),
                 new TablePartitionId(1, PARTITION_ID),
+                null,
                 null
         );
 
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 4e0b7f6a7f..6a18c727ec 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -98,6 +98,11 @@ public class TestPartitionDataStorage implements 
PartitionDataStorage {
         return partitionStorage.addWrite(rowId, row, txId, commitTableId, 
commitPartitionId);
     }
 
+    @Override
+    public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, 
HybridTimestamp commitTs) {
+        partitionStorage.addWriteCommitted(rowId, row, commitTs);
+    }
+
     @Override
     public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException 
{
         return partitionStorage.abortWrite(rowId);
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 5457661d54..c39638ebb0 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -1235,11 +1235,25 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
 
     @Test
     public void testScan() throws Exception {
-        accounts.recordView().upsertAll(null, List.of(makeValue(1, 100.), 
makeValue(2, 200.)));
+        doTestScan(null);
+    }
+
+    @Test
+    public void testScanExplicit() throws Exception {
+        igniteTransactions.runInTransaction(this::doTestScan);
+    }
+
+    /**
+     * Do scan in test.
+     *
+     * @param tx The transaction.
+     */
+    private void doTestScan(@Nullable Transaction tx) {
+        accounts.recordView().upsertAll(tx, List.of(makeValue(1, 100.), 
makeValue(2, 200.)));
 
-        CompletableFuture<List<Tuple>> scanFut = 
scan(accounts.internalTable(), null);
+        CompletableFuture<List<Tuple>> scanFut = 
scan(accounts.internalTable(), tx == null ? null : (InternalTransaction) tx);
 
-        var rows = scanFut.get(10, TimeUnit.SECONDS);
+        var rows = scanFut.join();
 
         Map<Long, Tuple> map = new HashMap<>();
 
@@ -1249,6 +1263,9 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
 
         assertEquals(100., map.get(1L).doubleValue("balance"));
         assertEquals(200., map.get(2L).doubleValue("balance"));
+
+        // Attempt to overwrite.
+        accounts.recordView().upsertAll(tx, List.of(makeValue(1, 300.), 
makeValue(2, 400.)));
     }
 
     /**
@@ -1258,7 +1275,7 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
      * @param internalTx Internal transaction of {@code null}.
      * @return Future to scanning result.
      */
-    private CompletableFuture<List<Tuple>> scan(InternalTable internalTable, 
InternalTransaction internalTx) {
+    private CompletableFuture<List<Tuple>> scan(InternalTable internalTable, 
@Nullable InternalTransaction internalTx) {
         Flow.Publisher<BinaryRow> pub = internalTx != null && 
internalTx.isReadOnly()
                 ? internalTable.scan(0, internalTx.readTimestamp(), 
internalTable.leaderAssignment(0))
                 : internalTable.scan(0, internalTx);
@@ -1912,6 +1929,12 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         testTransactionAlreadyFinished(false);
     }
 
+    @Test
+    public void testImplicit() {
+        accounts.recordView().upsert(null, makeValue(1, BALANCE_1));
+        assertEquals(BALANCE_1, accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
+    }
+
     /**
      * Checks operations that act after a transaction is committed, are 
finished with exception.
      *
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 99d307c6c0..feb2acbc02 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -31,11 +31,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
+import java.util.function.LongSupplier;
 import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.TestHybridClock;
 import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -95,10 +96,17 @@ import org.jetbrains.annotations.Nullable;
  * Dummy table storage implementation.
  */
 public class DummyInternalTableImpl extends InternalTableImpl {
-    private static final IgniteLogger LOG = 
Loggers.forClass(DummyInternalTableImpl.class);
+    public static final IgniteLogger LOG = 
Loggers.forClass(DummyInternalTableImpl.class);
 
     public static final NetworkAddress ADDR = new NetworkAddress("127.0.0.1", 
2004);
 
+    public static final HybridClock CLOCK = new TestHybridClock(new 
LongSupplier() {
+        @Override
+        public long getAsLong() {
+            return 0;
+        }
+    });
+
     private static final int PART_ID = 0;
 
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
@@ -107,8 +115,6 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final HybridClock CLOCK = new HybridClockImpl();
-
     private static final ReplicationGroupId crossTableGroupId = new 
TablePartitionId(333, 0);
 
     private PartitionListener partitionListener;
@@ -118,7 +124,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
     private final ReplicationGroupId groupId;
 
     /** The thread updates safe time on the dummy replica. */
-    private Thread safeTimeUpdaterThread;
+    final private PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime;
 
     private static final AtomicInteger nextTableId = new AtomicInteger(10_001);
 
@@ -131,6 +137,12 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         this(replicaSvc, SCHEMA);
     }
 
+    /**
+     * Creates a new local table.
+     *
+     * @param replicaSvc Replica service.
+     * @param schema Schema.
+     */
     public DummyInternalTableImpl(ReplicaService replicaSvc, SchemaDescriptor 
schema) {
         this(replicaSvc, new TestMvPartitionStorage(0), schema);
     }
@@ -273,8 +285,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
         IndexLocker pkLocker = new HashIndexLocker(indexId, true, 
this.txManager.lockManager(), row2Tuple);
 
-        PendingComparableValuesTracker<HybridTimestamp, Void> safeTime =
-                new PendingComparableValuesTracker<>(new HybridTimestamp(1, 
0));
+        safeTime = mock(PendingComparableValuesTracker.class);
         PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartStorage);
         TableIndexStoragesSupplier indexes = 
createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()));
 
@@ -320,6 +331,9 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 mock(TablesConfiguration.class)
         );
 
+        
lenient().when(safeTime.waitFor(any())).thenReturn(completedFuture(null));
+        lenient().when(safeTime.current()).thenReturn(new HybridTimestamp(1, 
0));
+
         partitionListener = new PartitionListener(
                 new TestPartitionDataStorage(mvPartStorage),
                 storageUpdateHandler,
@@ -327,34 +341,22 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 safeTime,
                 new PendingComparableValuesTracker<>(0L)
         );
-
-        safeTimeUpdaterThread = new Thread(new SafeTimeUpdater(safeTime), 
"safe-time-updater");
-
-        safeTimeUpdaterThread.start();
     }
 
     /**
-     * A process to update safe time periodically.
+     * Set a safe timestamp.
+     *
+     * @param ts Timestamp.
      */
-    private static class SafeTimeUpdater implements Runnable {
-        PendingComparableValuesTracker<HybridTimestamp, Void> safeTime;
-
-        public SafeTimeUpdater(PendingComparableValuesTracker<HybridTimestamp, 
Void> safeTime) {
-            this.safeTime = safeTime;
-        }
+    public void updateSafeTime(HybridTimestamp ts) {
+        safeTime.update(ts, null);
+    }
 
-        @Override
-        public void run() {
-            while (true) {
-                safeTime.update(CLOCK.now(), null);
-
-                try {
-                    Thread.sleep(1_000);
-                } catch (InterruptedException e) {
-                    LOG.warn("The sfe time updater thread is interrupted");
-                }
-            }
-        }
+    /**
+     * @return Current safe time,
+     */
+    public HybridTimestamp getSafeTime() {
+        return safeTime.current();
     }
 
     /**
@@ -384,12 +386,6 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         return txManager;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public @NotNull String name() {
-        return null;
-    }
-
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, 
InternalTransaction tx) {
@@ -414,17 +410,6 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         return completedFuture(mock(ClusterNode.class));
     }
 
-    @Override
-    public void close() {
-        super.close();
-
-        if (safeTimeUpdaterThread != null) {
-            safeTimeUpdaterThread.interrupt();
-
-            safeTimeUpdaterThread = null;
-        }
-    }
-
     /**
      * Returns dummy table index storages supplier.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index bf1b4c00bc..3f3695cb5a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -84,7 +84,6 @@ public interface InternalTransaction extends Transaction {
      *
      * @param resultFuture Operation result future.
      */
-    @Deprecated
     void enlistResultFuture(CompletableFuture<?> resultFuture);
 
     /**
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index fdc4302e15..1c38556e15 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -30,7 +30,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -46,19 +46,29 @@ import org.jetbrains.annotations.NotNull;
  * The read-write implementation of an internal transaction.
  */
 public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl {
+    /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(InternalTransaction.class);
 
+    /** Commit partition updater. */
+    private static final AtomicReferenceFieldUpdater<ReadWriteTransactionImpl, 
TablePartitionId> COMMIT_PART_UPDATER =
+            
AtomicReferenceFieldUpdater.newUpdater(ReadWriteTransactionImpl.class, 
TablePartitionId.class, "commitPart");
+
+    /** Finish future updater. */
+    @SuppressWarnings("rawtypes")
+    private static final AtomicReferenceFieldUpdater<ReadWriteTransactionImpl, 
CompletableFuture> FINISH_FUT_UPDATER =
+            
AtomicReferenceFieldUpdater.newUpdater(ReadWriteTransactionImpl.class, 
CompletableFuture.class, "finishFut");
+
     /** Enlisted partitions: partition id -> (primary replica node, raft 
term). */
     private final Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlisted = new ConcurrentHashMap<>();
 
     /** Enlisted operation futures in this transaction. */
     private final List<CompletableFuture<?>> enlistedResults = new 
CopyOnWriteArrayList<>();
 
-    /** Reference to the partition that stores the transaction state. */
-    private final AtomicReference<TablePartitionId> commitPartitionRef = new 
AtomicReference<>();
+    /** A partition which stores the transaction state. */
+    private volatile TablePartitionId commitPart;
 
     /** The future used on repeated commit/rollback. */
-    private final AtomicReference<CompletableFuture<Void>> finishFut = new 
AtomicReference<>();
+    private volatile CompletableFuture<Void> finishFut;
 
     /**
      * The constructor.
@@ -73,13 +83,13 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /** {@inheritDoc} */
     @Override
     public boolean assignCommitPartition(TablePartitionId tablePartitionId) {
-        return commitPartitionRef.compareAndSet(null, tablePartitionId);
+        return COMMIT_PART_UPDATER.compareAndSet(this, null, tablePartitionId);
     }
 
     /** {@inheritDoc} */
     @Override
     public TablePartitionId commitPartition() {
-        return commitPartitionRef.get();
+        return commitPart;
     }
 
     /** {@inheritDoc} */
@@ -97,8 +107,8 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /** {@inheritDoc} */
     @Override
     protected CompletableFuture<Void> finish(boolean commit) {
-        if (!finishFut.compareAndSet(null, new CompletableFuture<>())) {
-            return finishFut.get();
+        if (!FINISH_FUT_UPDATER.compareAndSet(this, null, new 
CompletableFuture<>())) {
+            return finishFut;
         }
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-17688 Add proper 
exception handling.
@@ -123,7 +133,6 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
                                     }
                                 });
 
-                                TablePartitionId commitPart = 
commitPartitionRef.get();
                                 ClusterNode recipientNode = 
enlisted.get(commitPart).get1();
                                 Long term = enlisted.get(commitPart).get2();
 
@@ -150,7 +159,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
                         }
                 );
 
-        mainFinishFut.handle((res, e) -> finishFut.get().complete(null));
+        mainFinishFut.handle((res, e) -> finishFut.complete(null));
 
         return mainFinishFut;
     }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index d53bde4ba2..f0165c4d3b 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -51,6 +51,7 @@ import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -237,7 +238,7 @@ public class TxManagerTest extends IgniteAbstractTest {
 
     @Test
     public void testObservableTimestamp() {
-        int compareThreshold = 50;
+        long compareThreshold = 50;
         // Check that idle safe time propagation period is significantly 
greater than compareThreshold.
         assertTrue(IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW 
> compareThreshold * 5);
 
@@ -260,7 +261,7 @@ public class TxManagerTest extends IgniteAbstractTest {
         tx = txManager.begin(true, timestampInPast);
 
         long readTime = now.getPhysical() - 
IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS - CLOCK_SKEW;
-        assertTrue(abs(readTime - tx.readTimestamp().getPhysical()) < 
compareThreshold);
+        assertThat(abs(readTime - tx.readTimestamp().getPhysical()), 
Matchers.lessThan(compareThreshold));
         tx.commit();
 
         assertThrows(AssertionError.class, () -> txManager.begin(false, now));

Reply via email to