This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7873c18a68 IGNITE-23303 Improve transaction performance.
7873c18a68 is described below

commit 7873c18a6820a1136b0ca4e6a32aa2134d3e7f02
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Wed Nov 20 19:00:24 2024 +0300

    IGNITE-23303 Improve transaction performance.
---
 .../handler/ClientPrimaryReplicaTrackerTest.java   |  12 +-
 .../apache/ignite/client/fakes/FakeTxManager.java  |   9 +-
 .../apache/ignite/internal/hlc/HybridClock.java    |   4 +-
 .../ignite/internal/hlc/HybridClockImpl.java       |  61 ++--
 .../internal/util/IgniteStripedReadWriteLock.java  |  10 +
 .../util/PendingComparableValuesTracker.java       |  23 +-
 .../ignite/internal/hlc/HybridClockTest.java       |  41 +--
 .../util/PendingComparableValuesTrackerTest.java   |   4 +-
 .../apache/ignite/internal/TestHybridClock.java    | 116 +------
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  |  80 -----
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  19 --
 .../apache/ignite/raft/jraft/core/Replicator.java  |   6 -
 ...caResult.java => CommandApplicationResult.java} |  37 +--
 .../ignite/internal/replicator/ReplicaManager.java |  42 +--
 .../ignite/internal/replicator/ReplicaResult.java  |  16 +-
 .../ignite/internal/replicator/ReplicaService.java |  32 +-
 .../benchmark/AbstractMultiNodeBenchmark.java      |   2 +-
 .../internal/benchmark/UpsertKvBenchmark.java      |  13 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   8 +-
 .../ItTxObservableTimePropagationTest.java         |  89 +++++
 .../ignite/internal/table/ItColocationTest.java    |  43 ++-
 .../table/distributed/raft/PartitionListener.java  |   4 +-
 .../replicator/PartitionReplicaListener.java       | 119 ++++---
 .../distributed/storage/InternalTableImpl.java     |  34 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |  17 +-
 .../ignite/internal/table/TxAbstractTest.java      | 297 +----------------
 .../internal/table/TxInfrastructureTest.java       | 363 +++++++++++++++++++++
 .../table/impl/DummyInternalTableImpl.java         |  43 ++-
 .../org/apache/ignite/internal/tx/TxManager.java   |   7 +-
 .../internal/tx/impl/ReadWriteTransactionImpl.java |   6 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  13 +-
 31 files changed, 805 insertions(+), 765 deletions(-)

diff --git 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
index 01e1f6b0b2..8f9ccef1bc 100644
--- 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
+++ 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
@@ -89,10 +89,10 @@ class ClientPrimaryReplicaTrackerTest extends 
BaseIgniteAbstractTest {
     public void testUpdateByEvent() {
         tracker.start();
 
-        assertEquals(1, tracker.maxStartTime());
-        driver.updateReplica("s3", TABLE_ID, 0, 2);
-
         assertEquals(2, tracker.maxStartTime());
+        driver.updateReplica("s3", TABLE_ID, 0, 3);
+
+        assertEquals(3, tracker.maxStartTime());
 
         PrimaryReplicasResult replicas = 
tracker.primaryReplicasAsync(TABLE_ID, null).join();
         assertEquals(PARTITIONS, replicas.nodeNames().size());
@@ -105,10 +105,10 @@ class ClientPrimaryReplicaTrackerTest extends 
BaseIgniteAbstractTest {
         driver.updateReplica(null, TABLE_ID, 0, 2);
         tracker.start();
 
-        assertEquals(1, tracker.maxStartTime());
-        driver.updateReplica(null, TABLE_ID, 1, 2);
-
         assertEquals(2, tracker.maxStartTime());
+        driver.updateReplica(null, TABLE_ID, 1, 3);
+
+        assertEquals(3, tracker.maxStartTime());
 
         PrimaryReplicasResult replicas = 
tracker.primaryReplicasAsync(TABLE_ID, null).join();
         assertEquals(PARTITIONS, replicas.nodeNames().size());
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 7c08564e70..4d8c00d9cc 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -183,10 +183,6 @@ public class FakeTxManager implements TxManager {
         return CompletableFuture.runAsync(runnable);
     }
 
-    @Override
-    public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, 
boolean commit) {
-    }
-
     @Override
     public CompletableFuture<Void> finish(
             HybridTimestampTracker timestampTracker,
@@ -239,4 +235,9 @@ public class FakeTxManager implements TxManager {
     public int pending() {
         return 0;
     }
+
+    @Override
+    public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, 
HybridTimestamp ts, boolean commit) {
+        // No-op.
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
index 6c3460d99e..d8a16f595f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
@@ -22,7 +22,7 @@ package org.apache.ignite.internal.hlc;
  */
 public interface HybridClock {
     /**
-     * Creates a timestamp for new event.
+     * Creates a timestamp for new event. A timestamp is guarantied to be 
unique and monotonically grown.
      *
      * @return The hybrid timestamp.
      */
@@ -37,7 +37,7 @@ public interface HybridClock {
     long currentLong();
 
     /**
-     * Creates a timestamp for new event.
+     * Creates a timestamp for new event. A timestamp is guarantied to be 
unique and monotonically grown.
      *
      * @return The hybrid timestamp.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
index 47e4d2b956..4960824454 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
@@ -21,10 +21,9 @@ import static java.lang.Math.max;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.tostring.S;
@@ -38,55 +37,42 @@ public class HybridClockImpl implements HybridClock {
     /**
      * Var handle for {@link #latestTime}.
      */
-    private static final VarHandle LATEST_TIME;
-
-    static {
-        try {
-            LATEST_TIME = 
MethodHandles.lookup().findVarHandle(HybridClockImpl.class, "latestTime", 
long.class);
-        } catch (NoSuchFieldException | IllegalAccessException e) {
-            throw new ExceptionInInitializerError(e);
-        }
-    }
+    private static final AtomicLongFieldUpdater<HybridClockImpl> LATEST_TIME = 
AtomicLongFieldUpdater.newUpdater(HybridClockImpl.class,
+            "latestTime");
 
     private volatile long latestTime;
 
     private final List<ClockUpdateListener> updateListeners = new 
CopyOnWriteArrayList<>();
 
     /**
-     * The constructor which initializes the latest time to current time by 
system clock.
-     */
-    public HybridClockImpl() {
-        this.latestTime = currentTime();
-    }
-
-    /**
-     * System current time in milliseconds shifting left to free insignificant 
bytes.
-     * This method is marked with a public modifier to mock in tests because 
there is no way to mock currentTimeMillis.
+     * Returns current physical time in milliseconds.
      *
-     * @return Current time in milliseconds shifted right on two bytes.
+     * @return Current time.
      */
-    public static long currentTime() {
-        return System.currentTimeMillis() << LOGICAL_TIME_BITS_SIZE;
+    protected long physicalTime() {
+        return System.currentTimeMillis();
     }
 
     @Override
-    public long nowLong() {
+    public final long nowLong() {
         while (true) {
             long now = currentTime();
 
             // Read the latest time after accessing UTC time to reduce 
contention.
             long oldLatestTime = latestTime;
 
-            long newLatestTime = max(oldLatestTime + 1, now);
+            if (oldLatestTime >= now) {
+                return LATEST_TIME.incrementAndGet(this);
+            }
 
-            if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) 
{
-                return newLatestTime;
+            if (LATEST_TIME.compareAndSet(this, oldLatestTime, now)) {
+                return now;
             }
         }
     }
 
     @Override
-    public long currentLong() {
+    public final long currentLong() {
         long current = currentTime();
 
         return max(latestTime, current);
@@ -107,33 +93,34 @@ public class HybridClockImpl implements HybridClock {
     }
 
     @Override
-    public HybridTimestamp now() {
+    public final HybridTimestamp now() {
         return hybridTimestamp(nowLong());
     }
 
     @Override
-    public HybridTimestamp current() {
+    public final HybridTimestamp current() {
         return hybridTimestamp(currentLong());
     }
 
     /**
-     * Updates the clock in accordance with an external event timestamp. If 
the supplied timestamp is ahead of the
-     * current clock timestamp, the clock gets adjusted to make sure it never 
returns any timestamp before (or equal to)
-     * the supplied external timestamp.
+     * Updates the clock in accordance with an external event timestamp. If 
the supplied timestamp is ahead of the current clock timestamp,
+     * the clock gets adjusted to make sure it never returns any timestamp 
before (or equal to) the supplied external timestamp.
      *
      * @param requestTime Timestamp from request.
      * @return The resulting timestamp (guaranteed to exceed both previous 
clock 'currentTs' and the supplied external ts).
      */
     @Override
-    public HybridTimestamp update(HybridTimestamp requestTime) {
+    public final HybridTimestamp update(HybridTimestamp requestTime) {
         while (true) {
             long now = currentTime();
 
             // Read the latest time after accessing UTC time to reduce 
contention.
-            long oldLatestTime = this.latestTime;
+            long oldLatestTime = latestTime;
 
             long newLatestTime = max(requestTime.longValue() + 1, max(now, 
oldLatestTime + 1));
 
+            // TODO https://issues.apache.org/jira/browse/IGNITE-23707 avoid 
CAS on logical part update.
+
             if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) 
{
                 notifyUpdateListeners(newLatestTime);
 
@@ -142,6 +129,10 @@ public class HybridClockImpl implements HybridClock {
         }
     }
 
+    private long currentTime() {
+        return physicalTime() << LOGICAL_TIME_BITS_SIZE;
+    }
+
     @Override
     public void addUpdateListener(ClockUpdateListener listener) {
         updateListeners.add(listener);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
index 796b557e1d..b799cc59f8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
@@ -30,6 +30,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * supports reentrancy semantics like {@link ReentrantReadWriteLock}.
  */
 public class IgniteStripedReadWriteLock implements ReadWriteLock {
+    /** Default concurrency. */
+    private static final int CONCURRENCY = Math.max(1, 
Runtime.getRuntime().availableProcessors() / 2);
+
     /** Index generator. */
     private static final AtomicInteger IDX_GEN = new AtomicInteger();
 
@@ -42,6 +45,13 @@ public class IgniteStripedReadWriteLock implements 
ReadWriteLock {
     /** Composite write lock. */
     private final WriteLock writeLock;
 
+    /**
+     * Creates a new instance with default concurrency level.
+     */
+    public IgniteStripedReadWriteLock() {
+        this(CONCURRENCY);
+    }
+
     /**
      * Creates a new instance with given concurrency level.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
index 794641d3ce..4563e3b791 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
@@ -24,6 +24,7 @@ import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
 import java.util.Comparator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -61,7 +62,7 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>, R> implemen
     private volatile boolean closeGuard;
 
     /** Busy lock to close synchronously. */
-    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+    private final IgniteStripedReadWriteLock busyLock = new 
IgniteStripedReadWriteLock();
 
     private final Comparator<Map.Entry<T, @Nullable R>> comparator;
 
@@ -86,7 +87,7 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>, R> implemen
      */
     public void update(T newValue, @Nullable R futureResult) {
         while (true) {
-            if (!busyLock.enterBusy()) {
+            if (!busyLock.readLock().tryLock()) {
                 throw new TrackerClosedException();
             }
 
@@ -104,7 +105,7 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>, R> implemen
                     break;
                 }
             } finally {
-                busyLock.leaveBusy();
+                busyLock.readLock().unlock();
             }
         }
     }
@@ -118,18 +119,20 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>, R> implemen
      * @param valueToWait Value to wait.
      */
     public CompletableFuture<R> waitFor(T valueToWait) {
-        if (!busyLock.enterBusy()) {
+        if (!busyLock.readLock().tryLock()) {
             return failedFuture(new TrackerClosedException());
         }
 
         try {
-            if (current.getKey().compareTo(valueToWait) >= 0) {
-                return completedFuture(current.getValue());
+            Entry<T, @Nullable R> currentKeyValue = current;
+
+            if (currentKeyValue.getKey().compareTo(valueToWait) >= 0) {
+                return completedFuture(currentKeyValue.getValue());
             }
 
             return addNewWaiter(valueToWait);
         } finally {
-            busyLock.leaveBusy();
+            busyLock.readLock().unlock();
         }
     }
 
@@ -139,14 +142,14 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>, R> implemen
      * @throws TrackerClosedException if the tracker is closed.
      */
     public T current() {
-        if (!busyLock.enterBusy()) {
+        if (!busyLock.readLock().tryLock()) {
             throw new TrackerClosedException();
         }
 
         try {
             return current.getKey();
         } finally {
-            busyLock.leaveBusy();
+            busyLock.readLock().unlock();
         }
     }
 
@@ -156,7 +159,7 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>, R> implemen
             return;
         }
 
-        busyLock.block();
+        busyLock.writeLock().lock();
 
         TrackerClosedException trackerClosedException = new 
TrackerClosedException();
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
index f4954882d0..f7d3b68e3a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
@@ -17,21 +17,18 @@
 
 package org.apache.ignite.internal.hlc;
 
-import static 
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.TestHybridClock;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
-import org.mockito.MockedStatic;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 /**
@@ -40,27 +37,19 @@ import org.mockito.junit.jupiter.MockitoExtension;
  */
 @ExtendWith(MockitoExtension.class)
 class HybridClockTest extends BaseIgniteAbstractTest {
-    /**
-     * Mock of a system clock.
-     */
-    private static MockedStatic<HybridClockImpl> clockMock;
-
     @Mock
     private ClockUpdateListener updateListener;
 
-    @AfterEach
-    public void afterEach() {
-        closeClockMock();
-    }
+    private long mockedTime;
 
     /**
      * Tests a {@link HybridClock#now()}.
      */
     @Test
     public void testNow() {
-        clockMock = mockToEpochMilli(100);
+        mockedTime = 100;
 
-        HybridClock clock = new HybridClockImpl();
+        HybridClock clock = new TestHybridClock(() -> mockedTime);
 
         assertTimestampEquals(100, new HybridTimestamp(100, 1), clock::now);
 
@@ -76,9 +65,9 @@ class HybridClockTest extends BaseIgniteAbstractTest {
      */
     @Test
     public void testTick() {
-        clockMock = mockToEpochMilli(100);
+        mockedTime = 100;
 
-        HybridClock clock = new HybridClockImpl();
+        HybridClock clock = new TestHybridClock(() -> mockedTime);
 
         assertTimestampEquals(100, new HybridTimestamp(100, 1),
                 () -> clock.update(new HybridTimestamp(50, 1)));
@@ -103,19 +92,11 @@ class HybridClockTest extends BaseIgniteAbstractTest {
     }
 
     private void assertTimestampEquals(long sysTime, HybridTimestamp expTs, 
Supplier<HybridTimestamp> clo) {
-        closeClockMock();
-
-        clockMock = mockToEpochMilli(sysTime);
+        mockedTime = sysTime;
 
         assertEquals(expTs, clo.get());
     }
 
-    private void closeClockMock() {
-        if (clockMock != null && !clockMock.isClosed()) {
-            clockMock.close();
-        }
-    }
-
     @Test
     void updateListenerIsNotNotifiedOnNowCall() {
         HybridClock clock = new HybridClockImpl();
@@ -163,12 +144,4 @@ class HybridClockTest extends BaseIgniteAbstractTest {
 
         verify(updateListener, never()).onUpdate(anyLong());
     }
-
-    private static MockedStatic<HybridClockImpl> mockToEpochMilli(long 
expected) {
-        MockedStatic<HybridClockImpl> clockMock = 
mockStatic(HybridClockImpl.class);
-
-        clockMock.when(HybridClockImpl::currentTime).thenReturn(expected << 
LOGICAL_TIME_BITS_SIZE);
-
-        return clockMock;
-    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
index 2fcf257814..03530ef753 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
@@ -40,6 +40,7 @@ import java.util.stream.Stream;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -256,7 +257,8 @@ public class PendingComparableValuesTrackerTest {
 
         CompletableFuture<Void> future0 = tracker.waitFor(2);
 
-        tracker.close();
+        // Close is called from dedicated stop worker.
+        IgniteTestUtils.runAsync(tracker::close).join();
 
         assertThrows(TrackerClosedException.class, tracker::current);
         assertThrows(TrackerClosedException.class, () -> tracker.update(2, 
null));
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
index eeab11539f..65c830280b 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
@@ -17,127 +17,23 @@
 
 package org.apache.ignite.internal;
 
-import static java.lang.Math.max;
-import static 
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.LongSupplier;
-import org.apache.ignite.internal.hlc.ClockUpdateListener;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 
 /**
- * Test hybrid clock with custom supplier of current time.
+ * Test hybrid clock with custom supplier of current time. TODO delete
  */
-public class TestHybridClock implements HybridClock {
+public class TestHybridClock extends HybridClockImpl {
     /** Supplier of current time in milliseconds. */
     private final LongSupplier currentTimeMillisSupplier;
 
-    /** Latest time. */
-    private volatile long latestTime;
-
-    private final List<ClockUpdateListener> updateListeners = new 
CopyOnWriteArrayList<>();
-
-    /**
-     * Var handle for {@link #latestTime}.
-     */
-    private static final VarHandle LATEST_TIME;
-
-    static {
-        try {
-            LATEST_TIME = 
MethodHandles.lookup().findVarHandle(TestHybridClock.class, "latestTime", 
long.class);
-        } catch (NoSuchFieldException | IllegalAccessException e) {
-            throw new ExceptionInInitializerError(e);
-        }
-    }
-
     public TestHybridClock(LongSupplier currentTimeMillisSupplier) {
         this.currentTimeMillisSupplier = currentTimeMillisSupplier;
-        this.latestTime = currentTime();
-    }
-
-    private long currentTime() {
-        return currentTimeMillisSupplier.getAsLong() << LOGICAL_TIME_BITS_SIZE;
-    }
-
-    @Override
-    public long nowLong() {
-        while (true) {
-            long now = currentTime();
-
-            // Read the latest time after accessing UTC time to reduce 
contention.
-            long oldLatestTime = latestTime;
-
-            long newLatestTime = max(oldLatestTime + 1, now);
-
-            if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) 
{
-                return newLatestTime;
-            }
-        }
-    }
-
-    @Override
-    public long currentLong() {
-        long current = currentTime();
-
-        return max(latestTime, current);
-    }
-
-    private void notifyUpdateListeners(long newLatestTime) {
-        updateListeners.forEach(listener -> listener.onUpdate(newLatestTime));
-    }
-
-    @Override
-    public HybridTimestamp now() {
-        return hybridTimestamp(nowLong());
-    }
-
-    @Override
-    public HybridTimestamp current() {
-        return hybridTimestamp(currentLong());
-    }
-
-    /**
-     * Creates a timestamp for a received event.
-     *
-     * @param requestTime Timestamp from request.
-     * @return The hybrid timestamp.
-     */
-    @Override
-    public HybridTimestamp update(HybridTimestamp requestTime) {
-        while (true) {
-            long now = currentTime();
-
-            // Read the latest time after accessing UTC time to reduce 
contention.
-            long oldLatestTime = this.latestTime;
-
-            long newLatestTime = max(requestTime.longValue() + 1, max(now, 
oldLatestTime + 1));
-
-            if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) 
{
-                notifyUpdateListeners(newLatestTime);
-
-                return hybridTimestamp(newLatestTime);
-            }
-        }
-    }
-
-    @Override
-    public void addUpdateListener(ClockUpdateListener listener) {
-        updateListeners.add(listener);
-    }
-
-    @Override
-    public void removeUpdateListener(ClockUpdateListener listener) {
-        updateListeners.remove(listener);
+        now();
     }
 
     @Override
-    public String toString() {
-        return S.toString(HybridClock.class, this);
+    protected long physicalTime() {
+        return currentTimeMillisSupplier.getAsLong();
     }
 }
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 892e79603d..18d0512ba4 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -76,8 +76,6 @@ import java.util.function.BiPredicate;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
@@ -119,8 +117,6 @@ import org.apache.ignite.raft.jraft.option.BootstrapOptions;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.option.ReadOnlyOption;
-import org.apache.ignite.raft.jraft.rpc.AppendEntriesRequestImpl;
-import org.apache.ignite.raft.jraft.rpc.AppendEntriesResponseImpl;
 import org.apache.ignite.raft.jraft.rpc.RpcClientEx;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
 import org.apache.ignite.raft.jraft.rpc.RpcServer;
@@ -3815,82 +3811,6 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         assertTrue(res.get().isOk());
     }
 
-    /**
-     * Tests propagation of HLC on heartbeat request and response.
-     */
-    @Test
-    public void testHlcPropagation() throws Exception {
-        List<TestPeer> peers = TestUtils.generatePeers(testInfo, 2);
-
-        cluster = new TestCluster("unitest", dataPath, peers, 3_000, testInfo);
-
-        for (TestPeer peer : peers) {
-            RaftOptions opts = new RaftOptions();
-            opts.setElectionHeartbeatFactor(4); // Election timeout divisor.
-            HybridClock clock = new HybridClockImpl();
-            assertTrue(cluster.start(peer, false, 300, false, null, opts, 
clock));
-        }
-
-        List<NodeImpl> nodes = cluster.getNodes();
-
-        for (NodeImpl node : nodes) {
-            RpcClientEx rpcClientEx = sender(node);
-            rpcClientEx.recordMessages((msg, nodeId) -> {
-                if (msg instanceof AppendEntriesRequestImpl ||
-                    msg instanceof AppendEntriesResponseImpl) {
-                    return true;
-                }
-
-                return false;
-
-            });
-        }
-
-        Node leader = cluster.waitAndGetLeader();
-        cluster.ensureLeader(leader);
-
-        RpcClientEx client = sender(leader);
-
-        AtomicBoolean heartbeatRequest = new AtomicBoolean(false);
-        AtomicBoolean appendEntriesRequest = new AtomicBoolean(false);
-        AtomicBoolean heartbeatResponse = new AtomicBoolean(false);
-        AtomicBoolean appendEntriesResponse = new AtomicBoolean(false);
-
-        waitForCondition(() -> {
-            client.recordedMessages().forEach(msgs -> {
-                if (msgs[0] instanceof AppendEntriesRequestImpl) {
-                    AppendEntriesRequestImpl msg = (AppendEntriesRequestImpl) 
msgs[0];
-
-                    if (msg.entriesList() == null && msg.data() == null) {
-                        heartbeatRequest.set(true);
-                    } else {
-                        appendEntriesRequest.set(true);
-                    }
-
-                    assertTrue(msg.timestamp() != null);
-                } else if (msgs[0] instanceof AppendEntriesResponseImpl) {
-                    AppendEntriesResponseImpl msg = 
(AppendEntriesResponseImpl) msgs[0];
-                    if (msg.timestamp() == null) {
-                        appendEntriesResponse.set(true);
-                    } else {
-                        heartbeatResponse.set(true);
-                    }
-                }
-            });
-
-            return heartbeatRequest.get() &&
-                    appendEntriesRequest.get() &&
-                    heartbeatResponse.get() &&
-                    appendEntriesResponse.get();
-        },
-                5000);
-
-        assertTrue(heartbeatRequest.get());
-        assertTrue(appendEntriesRequest.get());
-        assertTrue(heartbeatResponse.get());
-        assertTrue(appendEntriesResponse.get());
-    }
-
     @Test
     public void 
exLeaderDoesntBecomeLeaderIfExternallyEnforcedConfigDoesNotContainIt() throws 
Exception {
         final long configFromResetIndex = 2L;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index c187b8bcf4..1d7bc59143 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -2243,10 +2243,6 @@ public class NodeImpl implements Node, RaftServerService 
{
                         .success(false)
                         .term(this.currTerm);
 
-                if (request.timestamp() != null) {
-                    rb.timestamp(clock.update(request.timestamp()));
-                }
-
                 return rb.build();
             }
 
@@ -2264,10 +2260,6 @@ public class NodeImpl implements Node, RaftServerService 
{
                         .success(false) //
                         .term(request.term() + 1);
 
-                if (request.timestamp() != null) {
-                    rb.timestamp(clock.update(request.timestamp()));
-                }
-
                 return rb.build();
             }
 
@@ -2297,10 +2289,6 @@ public class NodeImpl implements Node, RaftServerService 
{
                         .term(this.currTerm)
                         .lastLogIndex(lastLogIndex);
 
-                if (request.timestamp() != null) {
-                    rb.timestamp(clock.update(request.timestamp()));
-                }
-
                 return rb.build();
             }
 
@@ -2311,9 +2299,6 @@ public class NodeImpl implements Node, RaftServerService {
                     .success(true)
                     .term(this.currTerm)
                     .lastLogIndex(this.logManager.getLastLogIndex());
-                if (request.timestamp() != null) {
-                    respBuilder.timestamp(clock.update(request.timestamp()));
-                }
                 doUnlock = false;
                 this.writeLock.unlock();
                 // see the comments at FollowerStableClosure#run()
@@ -2332,10 +2317,6 @@ public class NodeImpl implements Node, RaftServerService 
{
                         .errorMsg(String.format("Node %s:%s log manager is 
busy.", this.groupId, this.serverId))
                         .term(this.currTerm);
 
-                if (request.timestamp() != null) {
-                    rb.timestamp(clock.update(request.timestamp()));
-                }
-
                 return rb.build();
             }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index 0028b06253..e088207357 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -743,7 +743,6 @@ public class Replicator implements ThreadId.OnError {
     private void sendEmptyEntries(final boolean isHeartbeat,
         final RpcResponseClosure<AppendEntriesResponse> heartBeatClosure) {
         final AppendEntriesRequestBuilder rb = 
raftOptions.getRaftMessagesFactory().appendEntriesRequest();
-        rb.timestamp(options.getNode().clockNow());
         if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) {
             // id is unlock in installSnapshot
             installSnapshot();
@@ -1163,9 +1162,6 @@ public class Replicator implements ThreadId.OnError {
         if ((r = (Replicator) id.lock()) == null) {
             return;
         }
-        if (response != null && response.timestamp() != null) {
-            r.options.getNode().clockUpdate(response.timestamp());
-        }
         boolean doUnlock = true;
         try {
             final boolean isLogDebugEnabled = LOG.isDebugEnabled();
@@ -1671,8 +1667,6 @@ public class Replicator implements ThreadId.OnError {
             RecycleUtil.recycle(byteBufList);
         }
 
-        rb.timestamp(this.options.getNode().clockNow());
-
         final AppendEntriesRequest request = rb.build();
         if (LOG.isDebugEnabled()) {
             LOG.debug(
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/CommandApplicationResult.java
similarity index 58%
copy from 
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
copy to 
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/CommandApplicationResult.java
index 6a4d850c01..a5e253bf7b 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/CommandApplicationResult.java
@@ -18,44 +18,25 @@
 package org.apache.ignite.internal.replicator;
 
 import java.util.concurrent.CompletableFuture;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 
 /**
- * Represents replica execution result.
+ * Replication command application result.
  */
-public class ReplicaResult {
-    /** The result. */
-    private final Object res;
-
-    /** The replication future. */
+public final class CommandApplicationResult {
+    private final HybridTimestamp commitTs;
     private final CompletableFuture<?> repFut;
 
-    /**
-     * Construct a replica result.
-     *
-     * @param res The result.
-     * @param repFut The replication future.
-     */
-    public ReplicaResult(@Nullable Object res, @Nullable CompletableFuture<?> 
repFut) {
-        this.res = res;
+    public CommandApplicationResult(HybridTimestamp commitTs, 
CompletableFuture<?> repFut) {
+        this.commitTs = commitTs;
         this.repFut = repFut;
     }
 
-    /**
-     * Get the result.
-     *
-     * @return The result.
-     */
-    public @Nullable Object result() {
-        return res;
+    public HybridTimestamp getCommitTimestamp() {
+        return commitTs;
     }
 
-    /**
-     * Get the replication future.
-     *
-     * @return The replication future.
-     */
-    public @Nullable CompletableFuture<?> replicationFuture() {
+    public CompletableFuture<?> replicationFuture() {
         return repFut;
     }
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index b213e36f65..50904d3c13 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -123,6 +123,7 @@ import org.apache.ignite.internal.thread.ExecutorChooser;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteStripedReadWriteLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteException;
@@ -150,7 +151,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     private static final PlacementDriverMessagesFactory 
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
 
     /** Busy lock to stop synchronously. */
-    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+    private final IgniteStripedReadWriteLock busyLock = new 
IgniteStripedReadWriteLock();
 
     /** Prevents double stopping of the component. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
@@ -384,7 +385,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     }
 
     private void handleReplicaRequest(ReplicaRequest request, ClusterNode 
sender, @Nullable Long correlationId) {
-        if (!busyLock.enterBusy()) {
+        if (!busyLock.readLock().tryLock()) {
             if (LOG.isInfoEnabled()) {
                 LOG.info("Failed to process replica request (the node is 
stopping) [request={}].", request);
             }
@@ -452,7 +453,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 NetworkMessage msg;
 
                 if (ex == null) {
-                    msg = prepareReplicaResponse(sendTimestamp, res.result());
+                    msg = prepareReplicaResponse(sendTimestamp, res);
                 } else {
                     if (indicatesUnexpectedProblem(ex)) {
                         LOG.warn("Failed to process replica request 
[request={}].", ex, request);
@@ -471,14 +472,14 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                     stopLeaseProlongation(groupId, null);
                 }
 
-                if (ex == null && res.replicationFuture() != null) {
-                    res.replicationFuture().whenComplete((res0, ex0) -> {
+                if (ex == null && res.applyResult().replicationFuture() != 
null) {
+                    res.applyResult().replicationFuture().whenComplete((res0, 
ex0) -> {
                         NetworkMessage msg0;
 
                         LOG.debug("Sending delayed response for replica 
request [request={}]", request);
 
                         if (ex0 == null) {
-                            msg0 = prepareReplicaResponse(sendTimestamp, res0);
+                            msg0 = prepareReplicaResponse(sendTimestamp, new 
ReplicaResult(res0, null));
                         } else {
                             LOG.warn("Failed to process delayed response 
[request={}]", ex0, request);
 
@@ -491,7 +492,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 }
             });
         } finally {
-            busyLock.leaveBusy();
+            busyLock.readLock().unlock();
         }
     }
 
@@ -524,7 +525,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
         var msg = (PlacementDriverReplicaMessage) msg0;
 
-        if (!busyLock.enterBusy()) {
+        if (!busyLock.readLock().tryLock()) {
             if (LOG.isInfoEnabled()) {
                 LOG.info("Failed to process placement driver message (the node 
is stopping) [msg={}].", msg);
             }
@@ -545,7 +546,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                         }
                     });
         } finally {
-            busyLock.leaveBusy();
+            busyLock.readLock().unlock();
         }
     }
 
@@ -658,7 +659,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             ReplicationGroupId replicaGrpId,
             PeersAndLearners newConfiguration
     ) throws NodeStoppingException {
-        if (!busyLock.enterBusy()) {
+        if (!busyLock.readLock().tryLock()) {
             throw new NodeStoppingException();
         }
 
@@ -682,7 +683,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                     )
             );
         } finally {
-            busyLock.leaveBusy();
+            busyLock.readLock().unlock();
         }
     }
 
@@ -835,14 +836,14 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * @throws NodeStoppingException If the node is stopping.
      */
     public CompletableFuture<Boolean> stopReplica(ReplicationGroupId 
replicaGrpId) throws NodeStoppingException {
-        if (!busyLock.enterBusy()) {
+        if (!busyLock.readLock().tryLock()) {
             throw new NodeStoppingException();
         }
 
         try {
             return stopReplicaInternal(replicaGrpId);
         } finally {
-            busyLock.leaveBusy();
+            busyLock.readLock().unlock();
         }
     }
 
@@ -862,7 +863,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 LOG.error("Error when notifying about BEFORE_REPLICA_STOPPED 
event.", e);
             }
 
-            if (!busyLock.enterBusy()) {
+            if (!busyLock.readLock().tryLock()) {
                 isRemovedFuture.completeExceptionally(new 
NodeStoppingException());
 
                 return;
@@ -895,7 +896,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                     return null;
                 });
             } finally {
-                busyLock.leaveBusy();
+                busyLock.readLock().unlock();
             }
         });
 
@@ -953,7 +954,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             return nullCompletedFuture();
         }
 
-        busyLock.block();
+        busyLock.writeLock().lock();
 
         int shutdownTimeoutSeconds = 10;
 
@@ -1040,17 +1041,18 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     /**
      * Prepares replica response.
      */
-    private NetworkMessage prepareReplicaResponse(boolean sendTimestamp, 
Object result) {
+    private NetworkMessage prepareReplicaResponse(boolean sendTimestamp, 
ReplicaResult result) {
         if (sendTimestamp) {
+            HybridTimestamp commitTs = 
result.applyResult().getCommitTimestamp();
             return REPLICA_MESSAGES_FACTORY
                     .timestampAwareReplicaResponse()
-                    .result(result)
-                    .timestamp(clockService.now())
+                    .result(result.result())
+                    .timestamp(commitTs == null ? clockService.current() : 
commitTs)
                     .build();
         } else {
             return REPLICA_MESSAGES_FACTORY
                     .replicaResponse()
-                    .result(result)
+                    .result(result.result())
                     .build();
         }
     }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
index 6a4d850c01..3f9a8d8187 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
@@ -17,28 +17,30 @@
 
 package org.apache.ignite.internal.replicator;
 
-import java.util.concurrent.CompletableFuture;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Represents replica execution result.
  */
 public class ReplicaResult {
+    /** Default replication outcome result. */
+    private static final CommandApplicationResult DEFAULT_RESULT = new 
CommandApplicationResult(null, null);
+
     /** The result. */
     private final Object res;
 
     /** The replication future. */
-    private final CompletableFuture<?> repFut;
+    private final CommandApplicationResult commandApplicationResult;
 
     /**
      * Construct a replica result.
      *
      * @param res The result.
-     * @param repFut The replication future.
+     * @param commandApplicationResult The replication result.
      */
-    public ReplicaResult(@Nullable Object res, @Nullable CompletableFuture<?> 
repFut) {
+    public ReplicaResult(@Nullable Object res, @Nullable 
CommandApplicationResult commandApplicationResult) {
         this.res = res;
-        this.repFut = repFut;
+        this.commandApplicationResult = commandApplicationResult == null ? 
DEFAULT_RESULT : commandApplicationResult;
     }
 
     /**
@@ -55,7 +57,7 @@ public class ReplicaResult {
      *
      * @return The replication future.
      */
-    public @Nullable CompletableFuture<?> replicationFuture() {
-        return repFut;
+    public CommandApplicationResult applyResult() {
+        return commandApplicationResult;
     }
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 729f88f50d..0a2d2da796 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -118,18 +118,22 @@ public class ReplicaService {
         this.retryExecutor = retryExecutor;
     }
 
+    private <R> CompletableFuture<R> sendToReplica(String 
targetNodeConsistentId, ReplicaRequest req) {
+        return (CompletableFuture<R>) sendToReplicaRaw(targetNodeConsistentId, 
req).thenApply(res -> res.result());
+    }
+
     /**
-     * Sends request to the replica node.
+     * Sends request to the replica node and provides raw response.
      *
-     * @param targetNodeConsistentId A consistent id of the replica node..
+     * @param targetNodeConsistentId A consistent id of the replica node.
      * @param req Replica request.
      * @return Response future with either evaluation result or completed 
exceptionally.
      * @see NodeStoppingException If either supplier or demander node is 
stopping.
      * @see ReplicaUnavailableException If replica with given replication 
group id doesn't exist or not started yet.
      * @see ReplicationTimeoutException If the response could not be received 
due to a timeout.
      */
-    private <R> CompletableFuture<R> sendToReplica(String 
targetNodeConsistentId, ReplicaRequest req) {
-        CompletableFuture<R> res = new CompletableFuture<>();
+    private CompletableFuture<ReplicaResponse> sendToReplicaRaw(String 
targetNodeConsistentId, ReplicaRequest req) {
+        CompletableFuture<ReplicaResponse> res = new CompletableFuture<>();
 
         messagingService.invoke(
                 targetNodeConsistentId,
@@ -224,11 +228,11 @@ public class ReplicaService {
                                     assert response0 instanceof 
AwaitReplicaResponse :
                                             "Incorrect response type [type=" + 
response0.getClass().getSimpleName() + ']';
 
-                                    sendToReplica(targetNodeConsistentId, 
req).whenComplete((r, e) -> {
+                                    sendToReplicaRaw(targetNodeConsistentId, 
req).whenComplete((r, e) -> {
                                         if (e != null) {
                                             res.completeExceptionally(e);
                                         } else {
-                                            res.complete((R) r);
+                                            res.complete(r);
                                         }
                                     });
                                 }
@@ -249,7 +253,7 @@ public class ReplicaService {
                         }
                     }
                 } else {
-                    res.complete((R) ((ReplicaResponse) response).result());
+                    res.complete((ReplicaResponse) response);
                 }
             }
         });
@@ -268,7 +272,7 @@ public class ReplicaService {
      * @see ReplicationTimeoutException If the response could not be received 
due to a timeout.
      */
     public <R> CompletableFuture<R> invoke(ClusterNode node, ReplicaRequest 
request) {
-        return sendToReplica(node.name(), request);
+        return invokeRaw(node, request).thenApply(r -> (R) r.result());
     }
 
     /**
@@ -300,6 +304,18 @@ public class ReplicaService {
         return sendToReplica(node.name(), request);
     }
 
+    /**
+     * Sends a request to the given replica {@code node} and returns a future 
that will be completed with a raw response.
+     * This can be used to carry additional metadata to the caller.
+     *
+     * @param node Cluster node.
+     * @param request The request.
+     * @return Response future with either evaluation raw response or 
completed exceptionally.
+     */
+    public CompletableFuture<ReplicaResponse> invokeRaw(ClusterNode node, 
ReplicaRequest request) {
+        return sendToReplicaRaw(node.name(), request);
+    }
+
     public MessagingService messagingService() {
         return messagingService;
     }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index cce6f4386f..cb85f9d8af 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -77,7 +77,7 @@ public class AbstractMultiNodeBenchmark {
 
     @Nullable
     protected String clusterConfiguration() {
-        return null;
+        return "";
     }
 
     /**
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
index 507f4d8387..2e1ce6d074 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.benchmark;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Tuple;
@@ -66,6 +66,13 @@ public class UpsertKvBenchmark extends 
AbstractMultiNodeBenchmark {
     @Param({"8"})
     private int partitionCount;
 
+    private static final AtomicInteger counter = new AtomicInteger();
+
+    private static final ThreadLocal<Integer> gen = ThreadLocal.withInitial(() 
-> {
+        int id = counter.getAndIncrement();
+        return id * 20_000_000;
+    });
+
     @Override
     public void nodeSetUp() throws Exception {
         
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, 
"true");
@@ -104,7 +111,9 @@ public class UpsertKvBenchmark extends 
AbstractMultiNodeBenchmark {
     }
 
     private int nextId() {
-        return ThreadLocalRandom.current().nextInt();
+        int cur = gen.get() + 1;
+        gen.set(cur);
+        return cur;
     }
 
     /**
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0a5b14c206..43b3f5a338 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -746,7 +746,7 @@ public class IgniteImpl implements Ignite {
         SchemaSynchronizationConfiguration schemaSyncConfig = 
clusterConfigRegistry
                 
.getConfiguration(SchemaSynchronizationExtensionConfiguration.KEY).schemaSync();
 
-        clockService = new ClockServiceImpl(clock, clockWaiter, new 
SameValueLongSupplier(() -> schemaSyncConfig.maxClockSkew().value()));
+        clockService = new ClockServiceImpl(clock, clockWaiter, () -> 
schemaSyncConfig.maxClockSkew().value());
 
         idempotentCacheVacuumizer = new IdempotentCacheVacuumizer(
                 name,
@@ -1179,12 +1179,12 @@ public class IgniteImpl implements Ignite {
         return Map.copyOf(decoratedEngines);
     }
 
-    private static SameValueLongSupplier 
delayDurationMsSupplier(SchemaSynchronizationConfiguration schemaSyncConfig) {
-        return new SameValueLongSupplier(() -> 
schemaSyncConfig.delayDuration().value());
+    private static LongSupplier 
delayDurationMsSupplier(SchemaSynchronizationConfiguration schemaSyncConfig) {
+        return () -> schemaSyncConfig.delayDuration().value();
     }
 
     private static LongSupplier 
partitionIdleSafeTimePropagationPeriodMsSupplier(ReplicationConfiguration 
replicationConfig) {
-        return new SameValueLongSupplier(() -> 
replicationConfig.idleSafeTimePropagationDuration().value());
+        return () -> 
replicationConfig.idleSafeTimePropagationDuration().value();
     }
 
     private AuthenticationManager createAuthenticationManager() {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
new file mode 100644
index 0000000000..e84d6cb4e6
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static org.apache.ignite.distributed.ItTxTestCluster.NODE_PORT_BASE;
+import static 
org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collection;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.TxInfrastructureTest;
+import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests if commit timestamp is propagated to observable time correctly.
+ */
+@ExtendWith(SystemPropertiesExtension.class)
+@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, 
value = "1000000")
+public class ItTxObservableTimePropagationTest extends TxInfrastructureTest {
+    private static final long CLIENT_FROZEN_PHYSICAL_TIME = 3000;
+
+    private static final int CLIENT_PORT = NODE_PORT_BASE - 1;
+
+    /**
+     * The constructor.
+     *
+     * @param testInfo Test info.
+     */
+    public ItTxObservableTimePropagationTest(TestInfo testInfo) {
+        super(testInfo);
+    }
+
+    @Override
+    protected int nodes() {
+        return 3;
+    }
+
+    @Override
+    protected int replicas() {
+        return 3;
+    }
+
+    @Override
+    protected HybridClock createClock(ClusterNode node) {
+        // Client physical time is frozen in the past, server time advances 
normally.
+        return new TestHybridClock(() -> node.address().port() == CLIENT_PORT 
? CLIENT_FROZEN_PHYSICAL_TIME : System.currentTimeMillis());
+    }
+
+    @Test
+    public void testImplicitObservableTimePropagation() {
+        RecordView<Tuple> view = accounts.recordView();
+        view.upsert(null, makeValue(1, 100.0));
+        TxManagerImpl clientTxManager = (TxManagerImpl) 
txTestCluster.clientTxManager;
+        Collection<TxStateMeta> states = clientTxManager.states();
+        assertEquals(1, states.size());
+        HybridTimestamp commitTs = states.iterator().next().commitTimestamp();
+        assertNotNull(commitTs);
+        assertEquals(commitTs, timestampTracker.get());
+        assertTrue(commitTs.getPhysical() != CLIENT_FROZEN_PHYSICAL_TIME, 
"Client time should be advanced to server time");
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index de0e2e1a4b..8a733b6aeb 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -57,13 +57,16 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.SingleClusterNodeResolver;
+import org.apache.ignite.internal.network.serialization.MessageSerializer;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
@@ -81,6 +84,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import 
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NullBinaryRow;
@@ -114,6 +118,7 @@ import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -121,6 +126,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.stubbing.Answer;
 
 /**
  * Tests for data colocation.
@@ -239,7 +245,7 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
             groupRafts.put(new TablePartitionId(tblId, i), r);
         }
 
-        when(replicaService.invoke(any(ClusterNode.class), 
any())).thenAnswer(invocation -> {
+        Answer<CompletableFuture<?>> clo = invocation -> {
             ClusterNode node = invocation.getArgument(0);
             ReplicaRequest request = invocation.getArgument(1);
             var commitPartId = new TablePartitionId(2, 0);
@@ -278,7 +284,40 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                         .txCoordinatorId(node.id())
                         .build());
             }
-        });
+        };
+        when(replicaService.invoke(any(ClusterNode.class), 
any())).thenAnswer(clo);
+        when(replicaService.invokeRaw(any(ClusterNode.class), 
any())).thenAnswer(
+                invocation -> clo.answer(invocation).thenApply(res -> new 
TimestampAwareReplicaResponse() {
+                    @Override
+                    public @Nullable Object result() {
+                        return res;
+                    }
+
+                    @Override
+                    public @Nullable HybridTimestamp timestamp() {
+                        return clock.now();
+                    }
+
+                    @Override
+                    public MessageSerializer<NetworkMessage> serializer() {
+                        return null;
+                    }
+
+                    @Override
+                    public short messageType() {
+                        return 0;
+                    }
+
+                    @Override
+                    public short groupType() {
+                        return 0;
+                    }
+
+                    @Override
+                    public NetworkMessage clone() {
+                        return null;
+                    }
+                }));
 
         intTable = new InternalTableImpl(
                 "PUBLIC.TEST",
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 3788f5475d..acf4dd65cf 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -588,7 +588,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
             if (maxObservableSafeTime == -1) {
                 maxObservableSafeTime = 
clockService.now().addPhysicalTime(clockService.maxClockSkewMillis()).longValue();
-                LOG.info("maxObservableSafeTime is initialized with [" + 
maxObservableSafeTime + "].");
+                LOG.info("maxObservableSafeTime has been initialized with 
[{}].", HybridTimestamp.hybridTimestamp(maxObservableSafeTime));
             }
 
             // Because of clock.tick it's guaranteed that two different 
commands will have different safe timestamps.
@@ -607,7 +607,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
     @Override
     public void onLeaderStop() {
         maxObservableSafeTime = -1;
-        LOG.info("maxObservableSafeTime is set to [" + maxObservableSafeTime + 
"] on leader stop.");
+        LOG.info("maxObservableSafeTime has been reset on leader stop.");
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 61bee26e96..0e63c388e5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -136,6 +136,7 @@ import 
org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
 import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.CommandApplicationResult;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
@@ -642,11 +643,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         HybridTimestamp opStartTs;
 
         if (request instanceof ReadWriteReplicaRequest) {
-            opStartTs = clockService.now();
+            opStartTs = clockService.current();
         } else if (request instanceof ReadOnlyReplicaRequest) {
             opStartTs = ((ReadOnlyReplicaRequest) request).readTimestamp();
         } else if (request instanceof ReadOnlyDirectReplicaRequest) {
-            opStartTs = clockService.now();
+            opStartTs = clockService.current();
         } else {
             opStartTs = null;
         }
@@ -1737,7 +1738,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         throw new TransactionException(commit ? TX_COMMIT_ERR 
: TX_ROLLBACK_ERR, ex);
                     }
 
-                    TransactionResult result = (TransactionResult) 
((ApplyCommandResult) txOutcome).result;
+                    TransactionResult result = (TransactionResult) 
((ResultWrapper) txOutcome).result;
 
                     markFinished(txId, result.transactionState(), 
result.commitTimestamp());
 
@@ -1812,7 +1813,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                                     catalogVersion
                                             );
 
-                                    return new ReplicaResult(null, 
commandReplicatedFuture);
+                                    return new ReplicaResult(null, new 
CommandApplicationResult(null, commandReplicatedFuture));
                                 });
                     } else {
                         return completedFuture(
@@ -2052,8 +2053,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 if (v instanceof ReplicaResult) {
                     ReplicaResult res = (ReplicaResult) v;
 
-                    if (res.replicationFuture() != null) {
-                        res.replicationFuture().whenComplete((v0, th0) -> {
+                    if (res.applyResult().replicationFuture() != null) {
+                        
res.applyResult().replicationFuture().whenComplete((v0, th0) -> {
                             if (th0 != null) {
                                 cleanupReadyFut.completeExceptionally(th0);
                             } else {
@@ -2619,7 +2620,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param cmd Raft command.
      * @return Raft future or raft decorated future with command that was 
processed.
      */
-    private <T> CompletableFuture<T> applyCmdWithExceptionHandling(Command 
cmd, CompletableFuture<T> resultFuture) {
+    private CompletableFuture<ResultWrapper<Object>> 
applyCmdWithExceptionHandling(Command cmd) {
+        CompletableFuture<ResultWrapper<Object>> resultFuture = new 
CompletableFuture<>();
+
         applyCmdWithRetryOnSafeTimeReorderException(cmd, resultFuture);
 
         return resultFuture.exceptionally(throwable -> {
@@ -2674,7 +2677,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     resultFuture.completeExceptionally(ex);
                 }
             } else {
-                resultFuture.complete((T) new ApplyCommandResult<>(cmd, res));
+                resultFuture.complete((T) new ResultWrapper<>(cmd, res));
             }
         });
     }
@@ -2692,7 +2695,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param catalogVersion Validated catalog version associated with given 
operation.
      * @return A local update ready future, possibly having a nested 
replication future as a result for delayed ack purpose.
      */
-    private CompletableFuture<CompletableFuture<?>> applyUpdateCommand(
+    private CompletableFuture<CommandApplicationResult> applyUpdateCommand(
             TablePartitionId tablePartId,
             UUID rowUuid,
             @Nullable BinaryRow row,
@@ -2735,16 +2738,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     );
                 }
 
-                CompletableFuture<UUID> fut = 
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
-                        .thenApply(res -> cmd.txId());
+                CompletableFuture<UUID> repFut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());
 
-                return completedFuture(fut);
+                return completedFuture(new CommandApplicationResult(null, 
repFut));
             } else {
-                CompletableFuture<ApplyCommandResult<Object>> resultFuture = 
new CompletableFuture<>();
-
-                applyCmdWithExceptionHandling(cmd, resultFuture);
-
-                return resultFuture.thenCompose(res -> {
+                return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
                     UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res.getResult();
 
                     if (updateCommandResult != null && 
!updateCommandResult.isPrimaryReplicaMatch()) {
@@ -2752,7 +2750,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     if (updateCommandResult != null && 
updateCommandResult.isPrimaryInPeersAndLearners()) {
-                        return safeTime.waitFor(((UpdateCommand) 
res.getCommand()).safeTime()).thenApply(ignored -> null);
+                        return safeTime.waitFor(((UpdateCommand) 
res.getCommand()).safeTime())
+                                .thenApply(ignored -> new 
CommandApplicationResult(((UpdateCommand) res.getCommand()).safeTime(), null));
                     } else {
                         if (!SKIP_UPDATES) {
                             // We don't need to take the partition snapshots 
read lock, see #INTERNAL_DOC_PLACEHOLDER why.
@@ -2769,7 +2768,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             );
                         }
 
-                        return nullCompletedFuture();
+                        // getCommand provides actual assigned safeTime (may 
be reassigned due to reorder)
+                        return completedFuture(new 
CommandApplicationResult(((UpdateCommand) res.getCommand()).safeTime(), null));
                     }
                 });
             }
@@ -2787,7 +2787,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param leaseStartTime Lease start time.
      * @return A local update ready future, possibly having a nested 
replication future as a result for delayed ack purpose.
      */
-    private CompletableFuture<CompletableFuture<?>> applyUpdateCommand(
+    private CompletableFuture<CommandApplicationResult> applyUpdateCommand(
             ReadWriteSingleRowReplicaRequest request,
             UUID rowUuid,
             @Nullable BinaryRow row,
@@ -2818,9 +2818,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param txCoordinatorId Transaction coordinator id.
      * @param catalogVersion Validated catalog version associated with given 
operation.
      * @param skipDelayedAck {@code true} to disable the delayed ack 
optimization.
-     * @return Raft future, see {@link #applyCmdWithExceptionHandling(Command, 
CompletableFuture)}.
+     * @return Raft future, see {@link 
#applyCmdWithExceptionHandling(Command)}.
      */
-    private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
+    private CompletableFuture<CommandApplicationResult> applyUpdateAllCommand(
             Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
             TablePartitionIdMessage commitPartitionId,
             UUID txId,
@@ -2857,7 +2857,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             indexIdsAtRwTxBeginTs(txId)
                     );
 
-                    return applyCmdWithExceptionHandling(cmd, new 
CompletableFuture<>()).thenApply(res -> null);
+                    return applyCmdWithExceptionHandling(cmd).thenApply(res -> 
null);
                 } else {
                     // We don't need to take the partition snapshots read 
lock, see #INTERNAL_DOC_PLACEHOLDER why.
                     storageUpdateHandler.handleUpdateAll(
@@ -2869,41 +2869,40 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             null,
                             indexIdsAtRwTxBeginTs(txId)
                     );
+                }
 
-                    CompletableFuture<Object> fut = 
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
-                            .thenApply(res -> cmd.txId());
+                CompletableFuture<UUID> repFut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());
 
-                    return completedFuture(fut);
-                }
+                return completedFuture(new CommandApplicationResult(null, 
repFut));
             } else {
-                return applyCmdWithExceptionHandling(cmd, new 
CompletableFuture<ApplyCommandResult<Object>>())
-                        .thenCompose(res -> {
-                            UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res.getResult();
-
-                            if (!updateCommandResult.isPrimaryReplicaMatch()) {
-                                throw new PrimaryReplicaMissException(
-                                        cmd.txId(),
-                                        cmd.leaseStartTime(),
-                                        
updateCommandResult.currentLeaseStartTime()
-                                );
-                            }
-                            if 
(updateCommandResult.isPrimaryInPeersAndLearners()) {
-                                return safeTime.waitFor(((UpdateAllCommand) 
res.getCommand()).safeTime()).thenApply(ignored -> null);
-                            } else {
-                                // We don't need to take the partition 
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
-                                storageUpdateHandler.handleUpdateAll(
-                                        cmd.txId(),
-                                        cmd.rowsToUpdate(),
-                                        
cmd.tablePartitionId().asTablePartitionId(),
-                                        false,
-                                        null,
-                                        cmd.safeTime(),
-                                        indexIdsAtRwTxBeginTs(txId)
-                                );
+                return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
+                    UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res.getResult();
 
-                                return null;
-                            }
-                        });
+                    if (!updateCommandResult.isPrimaryReplicaMatch()) {
+                        throw new PrimaryReplicaMissException(
+                                cmd.txId(),
+                                cmd.leaseStartTime(),
+                                updateCommandResult.currentLeaseStartTime()
+                        );
+                    }
+                    if (updateCommandResult.isPrimaryInPeersAndLearners()) {
+                        return safeTime.waitFor(((UpdateAllCommand) 
res.getCommand()).safeTime())
+                                .thenApply(ignored -> new 
CommandApplicationResult(((UpdateAllCommand) res.getCommand()).safeTime(), 
null));
+                    } else {
+                        // We don't need to take the partition snapshots read 
lock, see #INTERNAL_DOC_PLACEHOLDER why.
+                        storageUpdateHandler.handleUpdateAll(
+                                cmd.txId(),
+                                cmd.rowsToUpdate(),
+                                cmd.tablePartitionId().asTablePartitionId(),
+                                false,
+                                null,
+                                cmd.safeTime(),
+                                indexIdsAtRwTxBeginTs(txId)
+                        );
+
+                        return completedFuture(new 
CommandApplicationResult(((UpdateAllCommand) res.getCommand()).safeTime(), 
null));
+                    }
+                });
             }
         }
     }
@@ -2917,7 +2916,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param leaseStartTime Lease start time.
      * @return Raft future, see {@link #applyCmdWithExceptionHandling(Command, 
CompletableFuture)}.
      */
-    private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
+    private CompletableFuture<CommandApplicationResult> applyUpdateAllCommand(
             ReadWriteMultiRowReplicaRequest request,
             Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
             int catalogVersion,
@@ -2981,7 +2980,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     return takeLocksForDeleteExact(searchRow, rowId, row, txId)
                             .thenCompose(validatedRowId -> {
                                 if (validatedRowId == null) {
-                                    return completedFuture(new 
ReplicaResult(false, request.full() ? null : nullCompletedFuture()));
+                                    return completedFuture(new 
ReplicaResult(false, null));
                                 }
 
                                 return 
validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
@@ -3771,7 +3770,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future that will complete with catalog version associated with 
given operation though the operation timestamp.
      */
     private CompletableFuture<Integer> 
validateWriteAgainstSchemaAfterTakingLocks(UUID txId) {
-        HybridTimestamp operationTimestamp = clockService.now();
+        HybridTimestamp operationTimestamp = clockService.current();
 
         return reliableCatalogVersionFor(operationTimestamp)
                 .thenApply(catalogVersion -> {
@@ -3781,7 +3780,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
     }
 
-    private UpdateCommand updateCommand(
+    private static UpdateCommand updateCommand(
             TablePartitionId tablePartId,
             UUID rowUuid,
             @Nullable BinaryRow row,
@@ -4150,11 +4149,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * Wrapper for the update(All)Command processing result that besides 
result itself stores actual command that was processed. It helps to
      * manage commands substitutions on SafeTimeReorderException where cloned 
command with adjusted safeTime is sent.
      */
-    private static class ApplyCommandResult<T> {
+    private static class ResultWrapper<T> {
         private final Command command;
         private final T result;
 
-        ApplyCommandResult(Command command, T result) {
+        ResultWrapper(Command command, T result) {
             this.command = command;
             this.result = result;
         }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index fbe5a3d5de..6452594758 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -109,6 +109,7 @@ import 
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutExcepti
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryTuple;
@@ -633,7 +634,19 @@ public class InternalTableImpl implements InternalTable {
                 || request instanceof SwapRowReplicaRequest;
 
         if (full) { // Full transaction retries are handled in postEnlist.
-            return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), 
request);
+            return 
replicaSvc.invokeRaw(primaryReplicaAndConsistencyToken.get1(), 
request).handle((r, e) -> {
+                boolean hasError = e != null;
+                assert hasError || r instanceof TimestampAware;
+
+                // Timestamp is set to commit timestamp for full transactions.
+                tx.finish(!hasError, hasError ? null : ((TimestampAware) 
r).timestamp(), true);
+
+                if (e != null) {
+                    sneakyThrow(e);
+                }
+
+                return (R) r.result();
+            });
         } else {
             if (write) { // Track only write requests from explicit 
transactions.
                 if (!transactionInflights.addInflight(tx.id(), false)) {
@@ -714,11 +727,7 @@ public class InternalTableImpl implements InternalTable {
         assert !(autoCommit && full) : "Invalid combination of flags";
 
         return fut.handle((BiFunction<T, Throwable, CompletableFuture<T>>) (r, 
e) -> {
-            if (full) { // Full txn is already finished remotely. Just update 
local state.
-                CompletableFuture<Void> finishFullTxFut = tx0.finish(e == 
null, null, true);
-
-                assert finishFullTxFut.isDone() : "A full-state transaction 
must finish synchronously.";
-
+            if (full) {
                 return e != null ? failedFuture(e) : completedFuture(r);
             }
 
@@ -877,7 +886,7 @@ public class InternalTableImpl implements InternalTable {
         }
 
         if (tx.isReadOnly()) {
-            return evaluateReadOnlyRecipientNode(partitionId(keyRow))
+            return evaluateReadOnlyRecipientNode(partitionId(keyRow), 
tx.readTimestamp())
                     .thenCompose(recipientNode -> get(keyRow, 
tx.readTimestamp(), recipientNode));
         }
 
@@ -968,7 +977,7 @@ public class InternalTableImpl implements InternalTable {
         if (tx != null && tx.isReadOnly()) {
             BinaryRowEx firstRow = keyRows.iterator().next();
 
-            return evaluateReadOnlyRecipientNode(partitionId(firstRow))
+            return evaluateReadOnlyRecipientNode(partitionId(firstRow), 
tx.readTimestamp())
                     .thenCompose(recipientNode -> getAll(keyRows, 
tx.readTimestamp(), recipientNode));
         }
 
@@ -1107,7 +1116,7 @@ public class InternalTableImpl implements InternalTable {
                         .transactionId(txo.id())
                         .enlistmentConsistencyToken(enlistmentConsistencyToken)
                         .requestType(RW_UPSERT)
-                        .timestamp(clockService.now())
+                        .timestamp(txo.startTimestamp()) // TODO 
https://issues.apache.org/jira/browse/IGNITE-23712
                         .full(txo.implicit())
                         .coordinatorId(txo.coordinatorId())
                         .build(),
@@ -1947,7 +1956,7 @@ public class InternalTableImpl implements InternalTable {
      * @return The enlist future (then will a leader become known).
      */
     protected CompletableFuture<IgniteBiTuple<ClusterNode, Long>> enlist(int 
partId, InternalTransaction tx) {
-        HybridTimestamp now = clockService.now();
+        HybridTimestamp now = tx.startTimestamp();
 
         TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
         tx.assignCommitPartition(tablePartitionId);
@@ -2063,12 +2072,13 @@ public class InternalTableImpl implements InternalTable 
{
      * Evaluated cluster node for read-only request processing.
      *
      * @param partId Partition id.
+     * @param readTimestamp Read timestamp.
      * @return Cluster node to evalute read-only request.
      */
-    protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int 
partId) {
+    protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int 
partId, @Nullable HybridTimestamp readTimestamp) {
         TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
 
-        return awaitPrimaryReplica(tablePartitionId, clockService.now())
+        return awaitPrimaryReplica(tablePartitionId, readTimestamp)
                 .handle((res, e) -> {
                     if (e != null) {
                         throw withCause(TransactionException::new, 
REPLICA_UNAVAILABLE_ERR, e);
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 23428c0d60..7f80bc15f3 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -173,7 +173,6 @@ import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.tx.IgniteTransactions;
@@ -210,8 +209,6 @@ public class ItTxTestCluster {
 
     public static final int NODE_PORT_BASE = 20_000;
 
-    private static final RaftMessagesFactory FACTORY = new 
RaftMessagesFactory();
-
     private ClusterService client;
 
     private HybridClock clientClock;
@@ -403,7 +400,7 @@ public class ItTxTestCluster {
 
             ClusterNode node = clusterService.topologyService().localMember();
 
-            HybridClock clock = new HybridClockImpl();
+            HybridClock clock = createClock(node);
             ClockWaiter clockWaiter = new ClockWaiter("test-node" + i, clock);
             assertThat(clockWaiter.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
             TestClockService clockService = new TestClockService(clock, 
clockWaiter);
@@ -465,7 +462,7 @@ public class ItTxTestCluster {
                     Set.of(PartitionReplicationMessageGroup.class, 
TxMessageGroup.class),
                     placementDriver,
                     partitionOperationsExecutor,
-                    () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+                    this::getSafeTimePropagationTimeout,
                     new NoOpFailureManager(),
                     commandMarshaller,
                     raftClientFactory,
@@ -549,6 +546,14 @@ public class ItTxTestCluster {
         assertNotNull(clientTxManager);
     }
 
+    protected HybridClock createClock(ClusterNode node) {
+        return new HybridClockImpl();
+    }
+
+    protected long getSafeTimePropagationTimeout() {
+        return DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
+    }
+
     protected TxManagerImpl newTxManager(
             ClusterService clusterService,
             ReplicaService replicaSvc,
@@ -1027,7 +1032,7 @@ public class ItTxTestCluster {
 
         assertTrue(waitForTopology(client, nodes + 1, 1000));
 
-        clientClock = new HybridClockImpl();
+        clientClock = createClock(client.topologyService().localMember());
         clientClockWaiter = new ClockWaiter("client-node", clientClock);
         assertThat(clientClockWaiter.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
         clientClockService = new TestClockService(clientClock, 
clientClockWaiter);
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index b05c878d06..ea8bad1208 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -23,7 +23,6 @@ import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -36,7 +35,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -61,47 +59,25 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import org.apache.ignite.distributed.ItTxTestCluster;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.internal.network.NodeFinder;
-import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaImpl;
 import org.apache.ignite.internal.replicator.ReplicaManager;
-import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicaTestUtils;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
-import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
-import 
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
 import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
-import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.LockException;
@@ -111,261 +87,46 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxPriority;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
-import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
-import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
-import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
 import org.apache.ignite.tx.TransactionOptions;
 import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
-import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 /**
- * TODO asch IGNITE-15928 validate zero locks after test commit.
+ * Common tx test scenarios set. TODO asch IGNITE-15928 validate zero locks 
after test commit.
  */
 @ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
 @MockitoSettings(strictness = Strictness.LENIENT)
-public abstract class TxAbstractTest extends IgniteAbstractTest {
-    protected static final double BALANCE_1 = 500;
-
-    protected static final double BALANCE_2 = 500;
-
-    protected static final double DELTA = 100;
-
-    protected static final String ACC_TABLE_NAME = "accounts";
-
-    protected static final String CUST_TABLE_NAME = "customers";
-
-    protected static SchemaDescriptor ACCOUNTS_SCHEMA = new SchemaDescriptor(
-            1,
-            new Column[]{new Column("accountNumber".toUpperCase(), 
NativeTypes.INT64, false)},
-            new Column[]{new Column("balance".toUpperCase(), 
NativeTypes.DOUBLE, false)}
-    );
-
-    protected static SchemaDescriptor CUSTOMERS_SCHEMA = new SchemaDescriptor(
-            1,
-            new Column[]{new Column("accountNumber".toUpperCase(), 
NativeTypes.INT64, false)},
-            new Column[]{new Column("name".toUpperCase(), NativeTypes.STRING, 
false)}
-    );
-
-    /** Accounts table id -> balance. */
-    protected TableViewInternal accounts;
-
-    /** Customers table id -> name. */
-    protected TableViewInternal customers;
-
-    protected HybridTimestampTracker timestampTracker = new 
HybridTimestampTracker();
-
-    protected IgniteTransactions igniteTransactions;
-
-    // TODO fsync can be turned on again after 
https://issues.apache.org/jira/browse/IGNITE-20195
-    @InjectConfiguration("mock: { fsync: false }")
-    protected RaftConfiguration raftConfiguration;
-
-    @InjectConfiguration
-    protected TransactionConfiguration txConfiguration;
-
-    @InjectConfiguration
-    protected StorageUpdateConfiguration storageUpdateConfiguration;
-
-    @InjectConfiguration
-    protected ReplicationConfiguration replicationConfiguration;
-
-    protected final TestInfo testInfo;
-
-    protected ItTxTestCluster txTestCluster;
-
-    /**
-     * Returns a count of nodes.
-     *
-     * @return Nodes.
-     */
-    protected abstract int nodes();
-
-    /**
-     * Returns a count of replicas.
-     *
-     * @return Replicas.
-     */
-    protected int replicas() {
-        return 1;
-    }
-
-    /**
-     * Returns {@code true} to disable collocation by using dedicated client 
node.
-     *
-     * @return {@code true} to disable collocation.
-     */
-    protected boolean startClient() {
-        return true;
-    }
-
+public abstract class TxAbstractTest extends TxInfrastructureTest {
     /**
      * The constructor.
      *
      * @param testInfo Test info.
      */
     public TxAbstractTest(TestInfo testInfo) {
-        this.testInfo = testInfo;
-    }
-
-    /**
-     * Initialize the test state.
-     */
-    @BeforeEach
-    public void before() throws Exception {
-        txTestCluster = new ItTxTestCluster(
-                testInfo,
-                raftConfiguration,
-                txConfiguration,
-                storageUpdateConfiguration,
-                workDir,
-                nodes(),
-                replicas(),
-                startClient(),
-                timestampTracker,
-                replicationConfiguration
-        );
-        txTestCluster.prepareCluster();
-
-        this.igniteTransactions = txTestCluster.igniteTransactions();
-
-        accounts = txTestCluster.startTable(ACC_TABLE_NAME, ACCOUNTS_SCHEMA);
-        customers = txTestCluster.startTable(CUST_TABLE_NAME, 
CUSTOMERS_SCHEMA);
-
-        log.info("Tables have been started");
-    }
-
-    /**
-     * Shutdowns all cluster nodes after each test.
-     *
-     * @throws Exception If failed.
-     */
-    @AfterEach
-    public void after() throws Exception {
-        txTestCluster.shutdownCluster();
-        Mockito.framework().clearInlineMocks();
-    }
-
-    /**
-     * Starts a node.
-     *
-     * @param name Node name.
-     * @param port Local port.
-     * @param nodeFinder Node finder.
-     * @return The client cluster view.
-     */
-    public static ClusterService startNode(TestInfo testInfo, String name, int 
port,
-            NodeFinder nodeFinder) {
-        var network = ClusterServiceTestUtils.clusterService(testInfo, port, 
nodeFinder);
-
-        assertThat(network.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
-
-        return network;
-    }
-
-    /** {@inheritDoc} */
-    protected TxManager clientTxManager() {
-        return txTestCluster.clientTxManager();
-    }
-
-    /** {@inheritDoc} */
-    protected TxManager txManager(TableViewInternal t) {
-        String leaseHolder = primaryNode(t);
-
-        assertNotNull(leaseHolder, "Table primary node should not be null");
-
-        TxManager manager = txTestCluster.txManagers().get(leaseHolder);
-
-        assertNotNull(manager);
-
-        return manager;
-    }
-
-    private @Nullable String primaryNode(TableViewInternal t) {
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
txTestCluster.placementDriver().getPrimaryReplica(
-                new TablePartitionId(t.tableId(), 0),
-                
txTestCluster.clocks().get(txTestCluster.localNodeName()).now());
-
-        assertThat(primaryReplicaFuture, willCompleteSuccessfully());
-
-        return primaryReplicaFuture.join().getLeaseholder();
-    }
-
-    /**
-     * Check the storage of partition is the same across all nodes. The 
checking is based on {@link MvPartitionStorage#lastAppliedIndex()}
-     * that is increased on all update storage operation.
-     * TODO: IGNITE-18869 The method must be updated when a proper way to 
compare storages will be implemented.
-     *
-     * @param table The table.
-     * @param partId Partition id.
-     * @return True if {@link MvPartitionStorage#lastAppliedIndex()} is 
equivalent across all nodes, false otherwise.
-     */
-    protected boolean assertPartitionsSame(TableViewInternal table, int 
partId) {
-        long storageIdx = 0;
-
-        for (Map.Entry<String, Loza> entry : 
txTestCluster.raftServers().entrySet()) {
-            Loza svc = entry.getValue();
-
-            var server = (JraftServerImpl) svc.server();
-
-            var groupId = new TablePartitionId(table.tableId(), partId);
-
-            Peer serverPeer = server.localPeers(groupId).get(0);
-
-            RaftGroupService grp = server.raftGroupService(new 
RaftNodeId(groupId, serverPeer));
-
-            var fsm = (JraftServerImpl.DelegatingStateMachine) 
grp.getRaftNode().getOptions().getFsm();
-
-            PartitionListener listener = (PartitionListener) fsm.getListener();
-
-            MvPartitionStorage storage = listener.getMvStorage();
-
-            if (storageIdx == 0) {
-                storageIdx = storage.lastAppliedIndex();
-            } else if (storageIdx != storage.lastAppliedIndex()) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-
-    protected void injectFailureOnNextOperation(TableViewInternal accounts) {
-        InternalTable internalTable = accounts.internalTable();
-        ReplicaService replicaService = 
IgniteTestUtils.getFieldValue(internalTable, "replicaSvc");
-        Mockito.doReturn(CompletableFuture.failedFuture(new 
Exception())).when(replicaService).invoke((String) any(), any());
-        Mockito.doReturn(CompletableFuture.failedFuture(new 
Exception())).when(replicaService).invoke((ClusterNode) any(), any());
-    }
-
-    protected Collection<TxManager> txManagers() {
-        return txTestCluster.txManagers().values();
+        super(testInfo);
     }
 
     @Test
@@ -2056,58 +1817,6 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         assertEquals(total, total0, "Total amount invariant is not preserved");
     }
 
-    /**
-     * Makes a key.
-     *
-     * @param id The id.
-     * @return The key tuple.
-     */
-    protected Tuple makeKey(long id) {
-        return Tuple.create().set("accountNumber", id);
-    }
-
-    /**
-     * Makes a tuple containing key and value.
-     *
-     * @param id The id.
-     * @param balance The balance.
-     * @return The value tuple.
-     */
-    protected Tuple makeValue(long id, double balance) {
-        return Tuple.create().set("accountNumber", id).set("balance", balance);
-    }
-
-    /**
-     * Makes a tuple containing key and value.
-     *
-     * @param id The id.
-     * @param name The name.
-     * @return The value tuple.
-     */
-    private Tuple makeValue(long id, String name) {
-        return Tuple.create().set("accountNumber", id).set("name", name);
-    }
-
-    /**
-     * Makes a value.
-     *
-     * @param balance The balance.
-     * @return The value tuple.
-     */
-    private Tuple makeValue(double balance) {
-        return Tuple.create().set("balance", balance);
-    }
-
-    /**
-     * Makes a value.
-     *
-     * @param name The name.
-     * @return The value tuple.
-     */
-    private Tuple makeValue(String name) {
-        return Tuple.create().set("name", name);
-    }
-
     /**
      * Get a lock manager on a partition leader.
      *
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
new file mode 100644
index 0000000000..5d7fafddb2
--- /dev/null
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.distributed.ItTxTestCluster;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.NodeFinder;
+import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicatorConstants;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import 
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+/**
+ * Setup infrastructure for tx related test scenarios.
+ */
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
+@MockitoSettings(strictness = Strictness.LENIENT)
+public abstract class TxInfrastructureTest extends IgniteAbstractTest {
+    protected static final double BALANCE_1 = 500;
+
+    protected static final double BALANCE_2 = 500;
+
+    protected static final double DELTA = 100;
+
+    protected static final String ACC_TABLE_NAME = "accounts";
+
+    protected static final String CUST_TABLE_NAME = "customers";
+
+    protected static SchemaDescriptor ACCOUNTS_SCHEMA = new SchemaDescriptor(
+            1,
+            new Column[]{new Column("accountNumber".toUpperCase(), 
NativeTypes.INT64, false)},
+            new Column[]{new Column("balance".toUpperCase(), 
NativeTypes.DOUBLE, false)}
+    );
+
+    protected static SchemaDescriptor CUSTOMERS_SCHEMA = new SchemaDescriptor(
+            1,
+            new Column[]{new Column("accountNumber".toUpperCase(), 
NativeTypes.INT64, false)},
+            new Column[]{new Column("name".toUpperCase(), NativeTypes.STRING, 
false)}
+    );
+
+    /** Accounts table id -> balance. */
+    protected TableViewInternal accounts;
+
+    /** Customers table id -> name. */
+    protected TableViewInternal customers;
+
+    protected HybridTimestampTracker timestampTracker = new 
HybridTimestampTracker();
+
+    protected IgniteTransactions igniteTransactions;
+
+    // TODO fsync can be turned on again after 
https://issues.apache.org/jira/browse/IGNITE-20195
+    @InjectConfiguration("mock: { fsync: false }")
+    protected RaftConfiguration raftConfiguration;
+
+    @InjectConfiguration
+    protected TransactionConfiguration txConfiguration;
+
+    @InjectConfiguration
+    protected StorageUpdateConfiguration storageUpdateConfiguration;
+
+    @InjectConfiguration
+    protected ReplicationConfiguration replicationConfiguration;
+
+    protected final TestInfo testInfo;
+
+    protected ItTxTestCluster txTestCluster;
+
+    /**
+     * Returns a count of nodes.
+     *
+     * @return Nodes.
+     */
+    protected abstract int nodes();
+
+    /**
+     * Returns a count of replicas.
+     *
+     * @return Replicas.
+     */
+    protected int replicas() {
+        return 1;
+    }
+
+    /**
+     * Returns {@code true} to disable collocation by using dedicated client 
node.
+     *
+     * @return {@code true} to disable collocation.
+     */
+    protected boolean startClient() {
+        return true;
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param testInfo Test info.
+     */
+    public TxInfrastructureTest(TestInfo testInfo) {
+        this.testInfo = testInfo;
+    }
+
+    /**
+     * Initialize the test state.
+     */
+    @BeforeEach
+    public void before() throws Exception {
+        txTestCluster = new ItTxTestCluster(
+                testInfo,
+                raftConfiguration,
+                txConfiguration,
+                storageUpdateConfiguration,
+                workDir,
+                nodes(),
+                replicas(),
+                startClient(),
+                timestampTracker,
+                replicationConfiguration
+        ) {
+            @Override
+            protected HybridClock createClock(ClusterNode node) {
+                return TxInfrastructureTest.this.createClock(node);
+            }
+
+            @Override
+            protected long getSafeTimePropagationTimeout() {
+                return 
TxInfrastructureTest.this.getSafeTimePropagationTimeout();
+            }
+        };
+        txTestCluster.prepareCluster();
+
+        this.igniteTransactions = txTestCluster.igniteTransactions();
+
+        accounts = txTestCluster.startTable(ACC_TABLE_NAME, ACCOUNTS_SCHEMA);
+        customers = txTestCluster.startTable(CUST_TABLE_NAME, 
CUSTOMERS_SCHEMA);
+
+        log.info("Tables have been started");
+    }
+
+    protected HybridClock createClock(ClusterNode node) {
+        return new HybridClockImpl();
+    }
+
+    protected long getSafeTimePropagationTimeout() {
+        return 
ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
+    }
+
+    /**
+     * Shutdowns all cluster nodes after each test.
+     *
+     * @throws Exception If failed.
+     */
+    @AfterEach
+    public void after() throws Exception {
+        txTestCluster.shutdownCluster();
+        Mockito.framework().clearInlineMocks();
+    }
+
+    /**
+     * Starts a node.
+     *
+     * @param name Node name.
+     * @param port Local port.
+     * @param nodeFinder Node finder.
+     * @return The client cluster view.
+     */
+    public static ClusterService startNode(TestInfo testInfo, String name, int 
port,
+            NodeFinder nodeFinder) {
+        var network = ClusterServiceTestUtils.clusterService(testInfo, port, 
nodeFinder);
+
+        assertThat(network.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        return network;
+    }
+
+    /** {@inheritDoc} */
+    protected TxManager clientTxManager() {
+        return txTestCluster.clientTxManager();
+    }
+
+    /** {@inheritDoc} */
+    protected TxManager txManager(TableViewInternal t) {
+        String leaseHolder = primaryNode(t);
+
+        assertNotNull(leaseHolder, "Table primary node should not be null");
+
+        TxManager manager = txTestCluster.txManagers().get(leaseHolder);
+
+        assertNotNull(manager);
+
+        return manager;
+    }
+
+    protected @Nullable String primaryNode(TableViewInternal t) {
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
txTestCluster.placementDriver().getPrimaryReplica(
+                new TablePartitionId(t.tableId(), 0),
+                
txTestCluster.clocks().get(txTestCluster.localNodeName()).now());
+
+        assertThat(primaryReplicaFuture, willCompleteSuccessfully());
+
+        return primaryReplicaFuture.join().getLeaseholder();
+    }
+
+    /**
+     * Check the storage of partition is the same across all nodes. The 
checking is based on {@link MvPartitionStorage#lastAppliedIndex()}
+     * that is increased on all update storage operation.
+     * TODO: IGNITE-18869 The method must be updated when a proper way to 
compare storages will be implemented.
+     *
+     * @param table The table.
+     * @param partId Partition id.
+     * @return True if {@link MvPartitionStorage#lastAppliedIndex()} is 
equivalent across all nodes, false otherwise.
+     */
+    protected boolean assertPartitionsSame(TableViewInternal table, int 
partId) {
+        long storageIdx = 0;
+
+        for (Map.Entry<String, Loza> entry : 
txTestCluster.raftServers().entrySet()) {
+            Loza svc = entry.getValue();
+
+            var server = (JraftServerImpl) svc.server();
+
+            var groupId = new TablePartitionId(table.tableId(), partId);
+
+            Peer serverPeer = server.localPeers(groupId).get(0);
+
+            RaftGroupService grp = server.raftGroupService(new 
RaftNodeId(groupId, serverPeer));
+
+            var fsm = (JraftServerImpl.DelegatingStateMachine) 
grp.getRaftNode().getOptions().getFsm();
+
+            PartitionListener listener = (PartitionListener) fsm.getListener();
+
+            MvPartitionStorage storage = listener.getMvStorage();
+
+            if (storageIdx == 0) {
+                storageIdx = storage.lastAppliedIndex();
+            } else if (storageIdx != storage.lastAppliedIndex()) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    protected void injectFailureOnNextOperation(TableViewInternal accounts) {
+        InternalTable internalTable = accounts.internalTable();
+        ReplicaService replicaService = 
IgniteTestUtils.getFieldValue(internalTable, "replicaSvc");
+        Mockito.doReturn(CompletableFuture.failedFuture(new 
Exception())).when(replicaService).invoke((String) any(), any());
+        Mockito.doReturn(CompletableFuture.failedFuture(new 
Exception())).when(replicaService).invoke((ClusterNode) any(), any());
+    }
+
+    protected Collection<TxManager> txManagers() {
+        return txTestCluster.txManagers().values();
+    }
+
+    /**
+     * Makes a key.
+     *
+     * @param id The id.
+     * @return The key tuple.
+     */
+    protected Tuple makeKey(long id) {
+        return Tuple.create().set("accountNumber", id);
+    }
+
+    /**
+     * Makes a tuple containing key and value.
+     *
+     * @param id The id.
+     * @param balance The balance.
+     * @return The value tuple.
+     */
+    protected Tuple makeValue(long id, double balance) {
+        return Tuple.create().set("accountNumber", id).set("balance", balance);
+    }
+
+    /**
+     * Makes a tuple containing key and value.
+     *
+     * @param id The id.
+     * @param name The name.
+     * @return The value tuple.
+     */
+    protected Tuple makeValue(long id, String name) {
+        return Tuple.create().set("accountNumber", id).set("name", name);
+    }
+
+    /**
+     * Makes a value.
+     *
+     * @param balance The balance.
+     * @return The value tuple.
+     */
+    protected Tuple makeValue(double balance) {
+        return Tuple.create().set("balance", balance);
+    }
+
+    /**
+     * Makes a value.
+     *
+     * @param name The name.
+     * @return The value tuple.
+     */
+    protected Tuple makeValue(String name) {
+        return Tuple.create().set("name", name);
+    }
+}
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 5eaa8c927c..25d0318ce7 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.SingleClusterNodeResolver;
 import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.network.serialization.MessageSerializer;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
@@ -77,6 +78,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import 
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import 
org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -292,6 +294,45 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                         return 
replicaListener.invoke(invocationOnMock.getArgument(1), 
nodeId).thenApply(ReplicaResult::result);
                     })
                     .when(replicaSvc).invoke(anyString(), any());
+
+            lenient()
+                    .doAnswer(invocationOnMock -> {
+                        ClusterNode node = invocationOnMock.getArgument(0);
+
+                        return 
replicaListener.invoke(invocationOnMock.getArgument(1), node.id())
+                                .thenApply(r -> new 
TimestampAwareReplicaResponse() {
+                                    @Override
+                                    public @Nullable Object result() {
+                                        return r.result();
+                                    }
+
+                                    @Override
+                                    public @Nullable HybridTimestamp 
timestamp() {
+                                        return CLOCK.now();
+                                    }
+
+                                    @Override
+                                    public MessageSerializer<NetworkMessage> 
serializer() {
+                                        return null;
+                                    }
+
+                                    @Override
+                                    public short messageType() {
+                                        return 0;
+                                    }
+
+                                    @Override
+                                    public short groupType() {
+                                        return 0;
+                                    }
+
+                                    @Override
+                                    public NetworkMessage clone() {
+                                        return null;
+                                    }
+                                });
+                    })
+                    .when(replicaSvc).invokeRaw(any(ClusterNode.class), any());
         }
 
         AtomicLong raftIndex = new AtomicLong(1);
@@ -544,7 +585,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int 
partId) {
+    public CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int 
partId, @Nullable HybridTimestamp readTimestamp) {
         return completedFuture(LOCAL_NODE);
     }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index e9fd707342..c9b0ead47a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -116,17 +116,16 @@ public interface TxManager extends IgniteComponent {
      * Finishes a one-phase committed transaction. This method doesn't contain 
any distributed communication.
      *
      * @param timestampTracker Observable timestamp tracker. This tracker is 
used to track an observable timestamp and should be
-     *         updated with commit timestamp of every committed transaction.
+     *         updated with commit timestamp of every committed transaction. 
Not null on commit.
      * @param txId Transaction id.
      * @param commit {@code True} if a commit requested.
      */
-    void finishFull(HybridTimestampTracker timestampTracker, UUID txId, 
boolean commit);
+    void finishFull(HybridTimestampTracker timestampTracker, UUID txId, 
@Nullable HybridTimestamp ts, boolean commit);
 
     /**
      * Finishes a dependant transactions.
      *
-     * @param timestampTracker Observable timestamp tracker is used to track a 
timestamp for either read-write or read-only
-     *         transaction execution. The tracker is also used to determine 
the read timestamp for read-only transactions. Each client
+     * @param timestampTracker Observable timestamp tracker is used to 
determine the read timestamp for read-only transactions. Each client
      *         should pass its own tracker to provide linearizability between 
read-write and read-only transactions started by this client.
      * @param commitPartition Partition to store a transaction state.
      * @param commit {@code true} if a commit requested.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 1bc88d1bdc..3949196f61 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.tx.TransactionIds;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * The read-write implementation of an internal transaction.
@@ -157,17 +158,16 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     }
 
     @Override
-    public CompletableFuture<Void> finish(boolean commit, HybridTimestamp 
executionTimestamp, boolean full) {
+    public CompletableFuture<Void> finish(boolean commit, @Nullable 
HybridTimestamp executionTimestamp, boolean full) {
         if (finishFuture != null) {
             return finishFuture;
         }
-
         enlistPartitionLock.writeLock().lock();
 
         try {
             if (finishFuture == null) {
                 if (full) {
-                    txManager.finishFull(observableTsTracker, id(), commit);
+                    txManager.finishFull(observableTsTracker, id(), 
executionTimestamp, commit);
 
                     finishFuture = nullCompletedFuture();
                 } else {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index fdd139022e..281b3f6ae0 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -433,7 +433,12 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
 
     @Override
     public @Nullable TxStateMeta stateMeta(UUID txId) {
-        return inBusyLock(busyLock, () -> txStateVolatileStorage.state(txId));
+        return txStateVolatileStorage.state(txId);
+    }
+
+    @TestOnly
+    public Collection<TxStateMeta> states() {
+        return txStateVolatileStorage.states();
     }
 
     @Override
@@ -442,13 +447,13 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
     }
 
     @Override
-    public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, 
boolean commit) {
+    public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, 
@Nullable HybridTimestamp ts, boolean commit) {
         TxState finalState;
 
         finishedTxs.add(1);
 
         if (commit) {
-            timestampTracker.update(clockService.current());
+            timestampTracker.update(ts);
 
             finalState = COMMITTED;
         } else {
@@ -460,7 +465,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
                         finalState,
                         old == null ? null : old.txCoordinatorId(),
                         old == null ? null : old.commitPartitionId(),
-                        old == null ? null : old.commitTimestamp()
+                        ts
                 ));
 
         decrementRwTxCount(txId);

Reply via email to