This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ad100c341 IGNITE-22977 Reduced latency for txn preparing phase.
9ad100c341 is described below

commit 9ad100c34183071024ececb790b939bfa27435e5
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Tue Aug 20 11:27:34 2024 +0300

    IGNITE-22977 Reduced latency for txn preparing phase.
---
 .../ignite/client/handler/FakePlacementDriver.java |   7 +
 .../ignite/internal/hlc/HybridClockImpl.java       |   4 +-
 .../internal/lang/IgniteSystemProperties.java      |   6 +
 .../ignite/internal/util/FastTimestamps.java       |   5 +-
 .../ignite/internal/hlc/HybridClockTest.java       |  21 +-
 .../IndexNodeFinishedRwTransactionsChecker.java    |   2 +-
 .../ignite/internal/index/TestPlacementDriver.java |   6 +
 .../checkpoint/CheckpointMetricsTrackerTest.java   |  16 +-
 .../replicator/utils/TestPlacementDriver.java      |   6 +
 .../placementdriver/LeasePlacementDriver.java      |  10 +
 .../placementdriver/TestPlacementDriver.java       |   6 +
 .../placementdriver/PlacementDriverManager.java    |   5 +
 .../placementdriver/leases/LeaseTracker.java       |  11 +
 .../ignite/internal/raft/RaftGroupServiceImpl.java |  48 +++-
 .../ignite/raft/jraft/option/NodeOptions.java      |   2 +-
 modules/runner/build.gradle                        |  10 +
 .../benchmark/AbstractMultiNodeBenchmark.java      |   6 +-
 .../internal/benchmark/UpsertKvBenchmark.java      | 109 ++++++++
 .../ignite/internal/table/RowIdGenerator.java      |  36 +++
 .../replicator/PartitionReplicaListener.java       | 284 ++++++++++-----------
 .../distributed/storage/InternalTableImpl.java     |  40 ++-
 .../wrappers/DelegatingPlacementDriver.java        |   6 +
 .../internal/tx/impl/ReadWriteTransactionImpl.java |  21 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  19 +-
 24 files changed, 467 insertions(+), 219 deletions(-)

diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 8f1bdc5835..70a295668c 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -100,6 +100,13 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         return awaitPrimaryReplica(replicationGroupId, timestamp, 0, 
TimeUnit.MILLISECONDS);
     }
 
+    @Override
+    public ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId 
replicationGroupId, HybridTimestamp timestamp) {
+        TablePartitionId id = (TablePartitionId) replicationGroupId;
+
+        return primaryReplicas.get(id.partitionId());
+    }
+
     @Override
     public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId 
grpId) {
         return nullCompletedFuture();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
index 843dcb7e18..cfb82efac7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.hlc;
 
 import static java.lang.Math.max;
-import static java.time.Clock.systemUTC;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 
@@ -29,6 +28,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.FastTimestamps;
 
 /**
  * A Hybrid Logical Clock implementation.
@@ -61,7 +61,7 @@ public class HybridClockImpl implements HybridClock {
     }
 
     private static long currentTime() {
-        return systemUTC().instant().toEpochMilli() << LOGICAL_TIME_BITS_SIZE;
+        return FastTimestamps.coarseCurrentTimeMillis() << 
LOGICAL_TIME_BITS_SIZE;
     }
 
     @Override
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteSystemProperties.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteSystemProperties.java
index 1ebfecb2c0..798ec2f377 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteSystemProperties.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteSystemProperties.java
@@ -73,6 +73,12 @@ public final class IgniteSystemProperties {
     /** Name of the property controlling whether, when a thread assertion is 
triggered, it should also be written to the log. */
     public static final String THREAD_ASSERTIONS_LOG_BEFORE_THROWING = 
"THREAD_ASSERTIONS_LOG_BEFORE_THROWING";
 
+    /** Skip replication in a benchmark. */
+    public static final String IGNITE_SKIP_REPLICATION_IN_BENCHMARK = 
"IGNITE_SKIP_REPLICATION_IN_BENCHMARK";
+
+    /** Skip storage update in a benchmark. */
+    public static final String IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK = 
"IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK";
+
     /**
      * Enforces singleton.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
index 6079d05da6..c95b683971 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
@@ -23,7 +23,8 @@ package org.apache.ignite.internal.util;
 public class FastTimestamps {
     private static volatile long coarseCurrentTimeMillis = 
System.currentTimeMillis();
 
-    private static final long UPDATE_FREQUENCY_MS = 10;
+    /** Note: don't change this value, because it's crucial for a timestamp 
generation. */
+    private static final long UPDATE_FREQUENCY_MS = 1;
 
     static {
         startUpdater();
@@ -46,7 +47,7 @@ public class FastTimestamps {
         };
 
         updater.setDaemon(true);
-        updater.setPriority(10);
+        updater.setPriority(Thread.MAX_PRIORITY);
         updater.start();
     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
index 5cb248e1c2..cb35d8a9f5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
@@ -21,15 +21,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-import java.time.Clock;
-import java.time.Instant;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.FastTimestamps;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -46,7 +43,7 @@ class HybridClockTest extends BaseIgniteAbstractTest {
     /**
      * Mock of a system clock.
      */
-    private static MockedStatic<Clock> clockMock;
+    private static MockedStatic<FastTimestamps> clockMock;
 
     @Mock
     private ClockUpdateListener updateListener;
@@ -61,7 +58,7 @@ class HybridClockTest extends BaseIgniteAbstractTest {
      */
     @Test
     public void testNow() {
-        clockMock = mockToEpochMilli(100);
+        clockMock = mockCurrentTimestamp(100);
 
         HybridClock clock = new HybridClockImpl();
 
@@ -79,7 +76,7 @@ class HybridClockTest extends BaseIgniteAbstractTest {
      */
     @Test
     public void testTick() {
-        clockMock = mockToEpochMilli(100);
+        clockMock = mockCurrentTimestamp(100);
 
         HybridClock clock = new HybridClockImpl();
 
@@ -108,7 +105,7 @@ class HybridClockTest extends BaseIgniteAbstractTest {
     private void assertTimestampEquals(long sysTime, HybridTimestamp expTs, 
Supplier<HybridTimestamp> clo) {
         closeClockMock();
 
-        clockMock = mockToEpochMilli(sysTime);
+        clockMock = mockCurrentTimestamp(sysTime);
 
         assertEquals(expTs, clo.get());
     }
@@ -167,12 +164,10 @@ class HybridClockTest extends BaseIgniteAbstractTest {
         verify(updateListener, never()).onUpdate(anyLong());
     }
 
-    private static MockedStatic<Clock> mockToEpochMilli(long expected) {
-        Clock spyClock = spy(Clock.class);
-        MockedStatic<Clock> clockMock = mockStatic(Clock.class);
+    private static MockedStatic<FastTimestamps> mockCurrentTimestamp(long 
expected) {
+        MockedStatic<FastTimestamps> clockMock = 
mockStatic(FastTimestamps.class);
 
-        clockMock.when(Clock::systemUTC).thenReturn(spyClock);
-        when(spyClock.instant()).thenReturn(Instant.ofEpochMilli(expected));
+        
clockMock.when(FastTimestamps::coarseCurrentTimeMillis).thenReturn(expected);
 
         return clockMock;
     }
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
index 301a0f0f48..f11732cc2f 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
@@ -187,7 +187,7 @@ public class IndexNodeFinishedRwTransactionsChecker 
implements LocalRwTxCounter,
     }
 
     /**
-     * Returns {@code true} iff the requested catalog version is active and 
all RW transactions started on versions strictly before that
+     * Returns {@code true} if the requested catalog version is active and all 
RW transactions started on versions strictly before that
      * version have finished on the node.
      *
      * @param catalogVersion Catalog version of interest.
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index f0650073ca..5fadd2b1be 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -33,6 +33,7 @@ import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.jetbrains.annotations.Nullable;
 
 /** Implementation for tests. */
 class TestPlacementDriver extends AbstractEventProducer<PrimaryReplicaEvent, 
PrimaryReplicaEventParameters> implements PlacementDriver {
@@ -53,6 +54,11 @@ class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
         return primaryReplicaMetaFutureById.get(replicationGroupId);
     }
 
+    @Override
+    public @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId 
replicationGroupId, HybridTimestamp timestamp) {
+        return primaryReplicaMetaFutureById.get(replicationGroupId).join();
+    }
+
     @Override
     public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId 
grpId) {
         throw new UnsupportedOperationException();
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTrackerTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTrackerTest.java
index 46849b4b30..b6f8fe614b 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTrackerTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTrackerTest.java
@@ -50,7 +50,7 @@ public class CheckpointMetricsTrackerTest {
 
         assertThat(tracker.checkpointStartTime(), 
equalTo(checkpointStartTime));
 
-        assertThat(tracker.totalDuration(), greaterThanOrEqualTo(10L));
+        assertThat(tracker.totalDuration(), greaterThanOrEqualTo(1L));
     }
 
     @Test
@@ -97,7 +97,7 @@ public class CheckpointMetricsTrackerTest {
 
         tracker.onSplitAndSortCheckpointPagesEnd();
 
-        assertThat(tracker.splitAndSortCheckpointPagesDuration(), 
greaterThanOrEqualTo(10L));
+        assertThat(tracker.splitAndSortCheckpointPagesDuration(), 
greaterThanOrEqualTo(1L));
     }
 
     @Test
@@ -114,7 +114,7 @@ public class CheckpointMetricsTrackerTest {
 
         tracker.onCheckpointEnd();
 
-        assertThat(tracker.fsyncDuration(), greaterThanOrEqualTo(10L));
+        assertThat(tracker.fsyncDuration(), greaterThanOrEqualTo(1L));
     }
 
     @Test
@@ -131,7 +131,7 @@ public class CheckpointMetricsTrackerTest {
 
         tracker.onFsyncStart();
 
-        assertThat(tracker.pagesWriteDuration(), greaterThanOrEqualTo(10L));
+        assertThat(tracker.pagesWriteDuration(), greaterThanOrEqualTo(1L));
     }
 
     @Test
@@ -148,7 +148,7 @@ public class CheckpointMetricsTrackerTest {
 
         tracker.onMarkCheckpointBeginEnd();
 
-        assertThat(tracker.onMarkCheckpointBeginDuration(), 
greaterThanOrEqualTo(10L));
+        assertThat(tracker.onMarkCheckpointBeginDuration(), 
greaterThanOrEqualTo(1L));
     }
 
     @Test
@@ -165,7 +165,7 @@ public class CheckpointMetricsTrackerTest {
 
         long beforeWriteLockDuration = tracker.beforeWriteLockDuration();
 
-        assertThat(beforeWriteLockDuration, greaterThanOrEqualTo(10L));
+        assertThat(beforeWriteLockDuration, greaterThanOrEqualTo(1L));
         assertThat(tracker.writeLockWaitDuration(), lessThan(0L));
         assertThat(tracker.writeLockHoldDuration(), equalTo(0L));
 
@@ -176,7 +176,7 @@ public class CheckpointMetricsTrackerTest {
         long writeLockWaitDuration = tracker.writeLockWaitDuration();
 
         assertThat(tracker.beforeWriteLockDuration(), 
equalTo(beforeWriteLockDuration));
-        assertThat(writeLockWaitDuration, greaterThanOrEqualTo(10L));
+        assertThat(writeLockWaitDuration, greaterThanOrEqualTo(1L));
         assertThat(tracker.writeLockHoldDuration(), lessThan(0L));
 
         waitForChangeCoarseCurrentTimeMillis();
@@ -185,7 +185,7 @@ public class CheckpointMetricsTrackerTest {
 
         assertThat(tracker.beforeWriteLockDuration(), 
equalTo(beforeWriteLockDuration));
         assertThat(tracker.writeLockWaitDuration(), 
equalTo(writeLockWaitDuration));
-        assertThat(tracker.writeLockHoldDuration(), greaterThanOrEqualTo(10L));
+        assertThat(tracker.writeLockHoldDuration(), greaterThanOrEqualTo(1L));
     }
 
     private void waitForChangeCoarseCurrentTimeMillis() throws Exception {
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
index aa2c0bcc15..fa70085543 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
@@ -62,6 +62,12 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         return getPrimaryReplicaMeta();
     }
 
+    @Override
+    public ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId 
replicationGroupId,
+            HybridTimestamp timestamp) {
+        return getPrimaryReplicaMeta().join();
+    }
+
     @Override
     public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId 
grpId) {
         return nullCompletedFuture();
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
index ea83862d99..94864b1ac2 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Service that provides an ability to await and retrieve primary replicas for 
replication groups.
@@ -70,6 +71,15 @@ public interface LeasePlacementDriver extends 
EventProducer<PrimaryReplicaEvent,
      */
     CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId 
replicationGroupId, HybridTimestamp timestamp);
 
+    /**
+     * Returns the current primary replica without waiting.
+     *
+     * @param replicationGroupId Replication group id.
+     * @param timestamp The timestamp.
+     * @return Metadata information or null if not available.
+     */
+    @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId 
replicationGroupId, HybridTimestamp timestamp);
+
     /**
      * Returns a future that completes when all expiration event {@link 
PrimaryReplicaEvent#PRIMARY_REPLICA_EXPIRED} listeners of previous
      * primary complete.
diff --git 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index d91e9499ff..143dcf9d78 100644
--- 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -32,6 +32,7 @@ import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -75,6 +76,11 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         return getReplicaMetaFuture();
     }
 
+    @Override
+    public @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId 
replicationGroupId, HybridTimestamp timestamp) {
+        return getReplicaMetaFuture().join();
+    }
+
     @Override
     public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId 
grpId) {
         return nullCompletedFuture();
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index e7bae39bd6..c13658a542 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -299,6 +299,11 @@ public class PlacementDriverManager implements 
IgniteComponent {
                 return leaseTracker.getPrimaryReplica(replicationGroupId, 
timestamp);
             }
 
+            @Override
+            public ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId 
replicationGroupId, HybridTimestamp timestamp) {
+                return 
leaseTracker.getCurrentPrimaryReplica(replicationGroupId, timestamp);
+            }
+
             @Override
             public CompletableFuture<Void> 
previousPrimaryExpired(ReplicationGroupId replicationGroupId) {
                 return leaseTracker.previousPrimaryExpired(replicationGroupId);
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index ea1be1d3df..c2de87ca8d 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -300,6 +300,17 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
         });
     }
 
+    @Override
+    public @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId 
replicationGroupId, HybridTimestamp timestamp) {
+        Lease lease = getLease(replicationGroupId);
+
+        if (lease.isAccepted() && 
clockService.after(lease.getExpirationTime(), timestamp)) {
+            return lease;
+        }
+
+        return null;
+    }
+
     /**
      * Helper method that checks whether tracker for given groupId is present 
in {@code primaryReplicaWaiters} map, whether it's empty
      * and removes it if it's true.
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 13065a1c78..19c21a8379 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.raft;
 
 import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.ThreadLocalRandom.current;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.tostring.IgniteToStringBuilder.includeSensitive;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
@@ -53,6 +55,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.internal.lang.SafeTimeReorderException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -171,19 +174,40 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
             ScheduledExecutorService executor,
             Marshaller commandsMarshaller
     ) {
-        var service = new RaftGroupServiceImpl(
-                groupId,
-                cluster,
-                factory,
-                configuration,
-                membersConfiguration,
-                null,
-                executor,
-                commandsMarshaller
-        );
+        boolean inBenchmark = 
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK);
+
+        RaftGroupServiceImpl service;
+        if (inBenchmark) {
+            service = new RaftGroupServiceImpl(
+                    groupId,
+                    cluster,
+                    factory,
+                    configuration,
+                    membersConfiguration,
+                    null,
+                    executor,
+                    commandsMarshaller
+            ) {
+                @Override
+                public <R> CompletableFuture<R> run(Command cmd) {
+                    return 
cmd.getClass().getSimpleName().contains("UpdateCommand") ? 
nullCompletedFuture() : super.run(cmd);
+                }
+            };
+        } else {
+            service = new RaftGroupServiceImpl(
+                    groupId,
+                    cluster,
+                    factory,
+                    configuration,
+                    membersConfiguration,
+                    null,
+                    executor,
+                    commandsMarshaller
+            );
+        }
 
         if (!getLeader) {
-            return CompletableFuture.completedFuture(service);
+            return completedFuture(service);
         }
 
         return service.refreshLeader().handle((unused, throwable) -> {
@@ -816,6 +840,6 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
             return CompletableFuture.failedFuture(new 
PeerUnavailableException(peer.consistentId()));
         }
 
-        return CompletableFuture.completedFuture(node);
+        return completedFuture(node);
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index e444800edb..8df09366d3 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -52,7 +52,7 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
     private static final int DEFAULT_STRIPES = Utils.cpus();
 
     /** This value is used by default to determine the count of stripes for 
log manager. */
-    private static final int DEFAULT_LOG_STRIPES_COUNT = 4;
+    private static final int DEFAULT_LOG_STRIPES_COUNT = Math.min(4, 
DEFAULT_STRIPES);
 
     // A follower would become a candidate if it doesn't receive any message
     // from the leader in |election_timeout_ms| milliseconds
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 8ad1bc82d8..fd622e9930 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -243,6 +243,16 @@ tasks.register("runnerPlatformBenchmark", JavaExec) {
     enableAssertions = true
 }
 
+tasks.register("runUpsertBenchmark", JavaExec) {
+    mainClass = "org.apache.ignite.internal.benchmark.UpsertKvBenchmark"
+
+    jvmArgs += defaultJvmArgs
+
+    classpath = sourceSets.integrationTest.runtimeClasspath
+
+    enableAssertions = true
+}
+
 jar {
     manifest {
         attributes(
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index b5cd6e1797..f4c746b673 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -78,7 +78,7 @@ public class AbstractMultiNodeBenchmark {
      * Starts ignite node and creates table {@link #TABLE_NAME}.
      */
     @Setup
-    public final void nodeSetUp() throws Exception {
+    public void nodeSetUp() throws Exception {
         System.setProperty("jraft.available_processors", "2");
         startCluster();
 
@@ -191,6 +191,10 @@ public class AbstractMultiNodeBenchmark {
                 + "      \"netClusterNodes\": [ {} ]\n"
                 + "    }\n"
                 + "  },\n"
+                + "  storage.profiles: {"
+                + "        " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, "
+                + "        " + DEFAULT_STORAGE_PROFILE + ".size: 2073741824 " 
// Avoid page replacement.
+                + "  },\n"
                 + "  clientConnector: { port:{} },\n"
                 + "  rest.port: {},\n"
                 + "  raft.fsync = " + fsync
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
new file mode 100644
index 0000000000..c5d2d18fc6
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark for a single upsert operation via KV API with a possibility to 
disable updates via RAFT and to storage.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+public class UpsertKvBenchmark extends AbstractMultiNodeBenchmark {
+    private final Tuple tuple = Tuple.create();
+
+    private int id = 0;
+
+    private static KeyValueView<Tuple, Tuple> kvView;
+
+    @Override
+    public void nodeSetUp() throws Exception {
+        
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, 
"true");
+        
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
 "true");
+        super.nodeSetUp();
+    }
+
+    /**
+     * Initializes the tuple.
+     */
+    @Setup
+    public void setUp() {
+        kvView = clusterNode.tables().table(TABLE_NAME).keyValueView();
+        for (int i = 1; i < 11; i++) {
+            tuple.set("field" + i, FIELD_VAL);
+        }
+    }
+
+    @TearDown
+    public void tearDown() {
+        System.out.println("Inserted " + id + " tuples");
+    }
+
+    /**
+     * Benchmark for KV upsert via embedded client.
+     */
+    @Benchmark
+    public void upsert() {
+        kvView.put(null, Tuple.create().set("ycsb_key", id++), tuple);
+    }
+
+    /**
+     * Benchmark's entry point.
+     */
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+                .include(".*" + UpsertKvBenchmark.class.getSimpleName() + ".*")
+                .build();
+
+        new Runner(opt).run();
+    }
+
+    @Override
+    protected int nodes() {
+        return 1;
+    }
+
+    @Override
+    protected int partitionCount() {
+        return 8;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RowIdGenerator.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RowIdGenerator.java
new file mode 100644
index 0000000000..94bea7cd97
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RowIdGenerator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.util.FastTimestamps;
+
+/**
+ * New row id allocator.
+ */
+public class RowIdGenerator {
+    /**
+     * Get next row id.
+     *
+     * @return Next row id.
+     */
+    public static UUID next() {
+        return new UUID(FastTimestamps.coarseCurrentTimeMillis(), 
ThreadLocalRandom.current().nextLong());
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ccc49bf2f6..eb6ee1856b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -49,6 +49,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.allOfToList;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.findAny;
@@ -79,6 +80,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
@@ -93,6 +95,7 @@ import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.internal.lang.IgniteTriFunction;
 import org.apache.ignite.internal.lang.SafeTimeReorderException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -129,6 +132,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.replication.Reque
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
 import org.apache.ignite.internal.raft.service.RaftCommandRunner;
@@ -163,6 +167,7 @@ import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.table.RowIdGenerator;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
@@ -417,7 +422,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         this.replicationGroupId = new TablePartitionId(tableId, partId);
 
-        schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
+        this.schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
 
         prepareIndexBuilderTxRwOperationTracker();
     }
@@ -484,7 +489,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private CompletableFuture<?> processRequest(ReplicaRequest request, 
@Nullable Boolean isPrimary, String senderId,
             @Nullable Long leaseStartTime) {
-        if (request instanceof SchemaVersionAwareReplicaRequest) {
+        boolean hasSchemaVersion = request instanceof 
SchemaVersionAwareReplicaRequest;
+
+        if (hasSchemaVersion) {
             assert ((SchemaVersionAwareReplicaRequest) 
request).schemaVersion() > 0 : "No schema version passed?";
         }
 
@@ -514,13 +521,38 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return processGetEstimatedSizeRequest();
         }
 
-        HybridTimestamp opTsIfDirectRo = (request instanceof 
ReadOnlyDirectReplicaRequest) ? clockService.now() : null;
+        @Nullable HybridTimestamp opTs = getTxOpTimestamp(request);
+        @Nullable HybridTimestamp opTsIfDirectRo = (request instanceof 
ReadOnlyDirectReplicaRequest) ? opTs : null;
+        @Nullable HybridTimestamp txTs = getTxStartTimestamp(request);
+        if (txTs == null) {
+            txTs = opTsIfDirectRo;
+        }
+
+        // Don't need to validate schema.
+        if (opTs == null) {
+            assert opTsIfDirectRo == null;
+            return processOperationRequestWithTxRwCounter(senderId, request, 
isPrimary, null, leaseStartTime);
+        }
+
+        assert txTs != null && opTs.compareTo(txTs) >= 0 : "Invalid request 
timestamps";
+
+        @Nullable HybridTimestamp finalTxTs = txTs;
+        Runnable validateClo = () -> {
+            schemaCompatValidator.failIfTableDoesNotExistAt(opTs, tableId());
+
+            if (hasSchemaVersion) {
+                SchemaVersionAwareReplicaRequest versionAwareRequest = 
(SchemaVersionAwareReplicaRequest) request;
+
+                schemaCompatValidator.failIfRequestSchemaDiffersFromTxTs(
+                        finalTxTs,
+                        versionAwareRequest.schemaVersion(),
+                        tableId()
+                );
+            }
+        };
 
-        return validateTableExistence(request, opTsIfDirectRo)
-                .thenCompose(unused -> validateSchemaMatch(request, 
opTsIfDirectRo))
-                .thenCompose(unused -> waitForSchemasBeforeReading(request, 
opTsIfDirectRo))
-                .thenCompose(unused ->
-                        processOperationRequestWithTxRwCounter(senderId, 
request, isPrimary, opTsIfDirectRo, leaseStartTime));
+        return 
schemaSyncService.waitForMetadataCompleteness(opTs).thenRun(validateClo).thenCompose(ignored
 ->
+                processOperationRequestWithTxRwCounter(senderId, request, 
isPrimary, opTsIfDirectRo, leaseStartTime));
     }
 
     private CompletableFuture<Long> processGetEstimatedSizeRequest() {
@@ -592,7 +624,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     }
 
     /**
-     * Validates that the table exists at a timestamp corresponding to the 
request operation.
+     * Returns the txn operation timestamp.
      *
      * <ul>
      *     <li>For a read/write in an RW transaction, it's 'now'</li>
@@ -600,88 +632,25 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      *     <li>For a direct read in an RO implicit transaction, it's the 
timestamp chosen (as 'now') to process the request</li>
      * </ul>
      *
-     * <p>For other requests, the validation is skipped.
+     * <p>For other requests, op timestamp is not applicable and the 
validation is skipped.
      *
-     * @param request Replica request corresponding to the operation.
-     * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null} 
otherwise.
-     * @return Future completed when the validation is finished.
+     * @param request The request.
+     * @return The timestamp or {@code null} if not a tx operation request.
      */
-    private CompletableFuture<Void> validateTableExistence(ReplicaRequest 
request, @Nullable HybridTimestamp opTsIfDirectRo) {
+    private @Nullable HybridTimestamp getTxOpTimestamp(ReplicaRequest request) 
{
         HybridTimestamp opStartTs;
 
-        if (request instanceof ScanCloseReplicaRequest) {
-            // We don't need to validate close request for table existence.
-            opStartTs = null;
-        } else if (request instanceof ReadWriteReplicaRequest) {
+        if (request instanceof ReadWriteReplicaRequest) {
             opStartTs = clockService.now();
         } else if (request instanceof ReadOnlyReplicaRequest) {
             opStartTs = ((ReadOnlyReplicaRequest) request).readTimestamp();
         } else if (request instanceof ReadOnlyDirectReplicaRequest) {
-            assert opTsIfDirectRo != null;
-
-            opStartTs = opTsIfDirectRo;
+            opStartTs = clockService.now();
         } else {
             opStartTs = null;
         }
 
-        if (opStartTs == null) {
-            return nullCompletedFuture();
-        }
-
-        return schemaSyncService.waitForMetadataCompleteness(opStartTs)
-                .thenRun(() -> 
schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, tableId()));
-    }
-
-    /**
-     * Makes sure that {@link 
SchemaVersionAwareReplicaRequest#schemaVersion()} sent in a request matches 
table schema version
-     * corresponding to the operation.
-     *
-     * @param request Replica request corresponding to the operation.
-     * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null} 
otherwise.
-     * @return Future completed when the validation is finished.
-     */
-    private CompletableFuture<Void> validateSchemaMatch(ReplicaRequest 
request, @Nullable HybridTimestamp opTsIfDirectRo) {
-        if (!(request instanceof SchemaVersionAwareReplicaRequest)) {
-            return nullCompletedFuture();
-        }
-
-        HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
-        if (tsToWaitForSchema == null) {
-            tsToWaitForSchema = opTsIfDirectRo;
-        }
-
-        if (tsToWaitForSchema == null) {
-            return nullCompletedFuture();
-        }
-
-        HybridTimestamp finalTsToWaitForSchema = tsToWaitForSchema;
-        return 
schemaSyncService.waitForMetadataCompleteness(finalTsToWaitForSchema)
-                .thenRun(() -> {
-                    SchemaVersionAwareReplicaRequest versionAwareRequest = 
(SchemaVersionAwareReplicaRequest) request;
-
-                    schemaCompatValidator.failIfRequestSchemaDiffersFromTxTs(
-                            finalTsToWaitForSchema,
-                            versionAwareRequest.schemaVersion(),
-                            tableId()
-                    );
-                });
-    }
-
-    /**
-     * Makes sure that we have schemas corresponding to the moment of tx 
start; this makes PK extraction safe WRT
-     * {@link SchemaRegistry#schema(int)}.
-     *
-     * @param request Replica request corresponding to the operation.
-     * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null} 
otherwise.
-     * @return Future completed when the validation is finished.
-     */
-    private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest 
request, @Nullable HybridTimestamp opTsIfDirectRo) {
-        HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
-        if (tsToWaitForSchema == null) {
-            tsToWaitForSchema = opTsIfDirectRo;
-        }
-
-        return tsToWaitForSchema == null ? nullCompletedFuture() : 
schemaSyncService.waitForMetadataCompleteness(tsToWaitForSchema);
+        return opStartTs;
     }
 
     /**
@@ -1985,28 +1954,34 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         assert pkLocker != null;
 
-        return pkLocker.locksForLookupByKey(txId, pk)
-                .thenCompose(ignored -> {
+        CompletableFuture<Void> lockFut = pkLocker.locksForLookupByKey(txId, 
pk);
 
-                    boolean cursorClosureSetUp = false;
-                    Cursor<RowId> cursor = null;
+        Supplier<CompletableFuture<T>> sup = () -> {
+            boolean cursorClosureSetUp = false;
+            Cursor<RowId> cursor = null;
 
-                    try {
-                        cursor = getFromPkIndex(pk);
+            try {
+                cursor = getFromPkIndex(pk);
 
-                        Cursor<RowId> finalCursor = cursor;
-                        CompletableFuture<T> resolvingFuture = 
continueResolvingByPk(cursor, txId, action)
-                                .whenComplete((res, ex) -> 
finalCursor.close());
+                Cursor<RowId> finalCursor = cursor;
+                CompletableFuture<T> resolvingFuture = 
continueResolvingByPk(cursor, txId, action)
+                        .whenComplete((res, ex) -> finalCursor.close());
 
-                        cursorClosureSetUp = true;
+                cursorClosureSetUp = true;
 
-                        return resolvingFuture;
-                    } finally {
-                        if (!cursorClosureSetUp && cursor != null) {
-                            cursor.close();
-                        }
-                    }
-                });
+                return resolvingFuture;
+            } finally {
+                if (!cursorClosureSetUp && cursor != null) {
+                    cursor.close();
+                }
+            }
+        };
+
+        if (isCompletedSuccessfully(lockFut)) {
+            return sup.get();
+        } else {
+            return lockFut.thenCompose(ignored -> sup.get());
+        }
     }
 
     private <T> CompletableFuture<T> continueResolvingByPk(
@@ -2353,7 +2328,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         RowId lockedRow = pkReadLockFuts[i].join();
 
                         if (lockedRow == null && 
uniqueKeys.add(pks.get(i).byteBuffer())) {
-                            rowsToInsert.put(new RowId(partId(), 
UUID.randomUUID()), row);
+                            rowsToInsert.put(new RowId(partId(), 
RowIdGenerator.next()), row);
 
                             result.add(new NullBinaryRow());
                         } else {
@@ -2457,7 +2432,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         }
 
                         boolean insert = rowId == null;
-                        RowId rowId0 = insert ? new RowId(partId(), 
UUID.randomUUID()) : rowId;
+                        RowId rowId0 = insert ? new RowId(partId(), 
RowIdGenerator.next()) : rowId;
 
                         return insert
                                 ? takeLocksForInsert(searchRow, rowId0, txId)
@@ -2798,7 +2773,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 return resultFuture.thenApply(res -> {
                     UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res;
 
-                    if (full && !updateCommandResult.isPrimaryReplicaMatch()) {
+                    if (full && updateCommandResult != null && 
!updateCommandResult.isPrimaryReplicaMatch()) {
                         throw new PrimaryReplicaMissException(txId, 
cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
                     }
 
@@ -2806,18 +2781,20 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     // Try to avoid double write if an entry is already 
replicated.
                     synchronized (safeTime) {
                         if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                            // We don't need to take the partition snapshots 
read lock, see #INTERNAL_DOC_PLACEHOLDER why.
-                            storageUpdateHandler.handleUpdate(
-                                    cmd.txId(),
-                                    cmd.rowUuid(),
-                                    
cmd.tablePartitionId().asTablePartitionId(),
-                                    cmd.rowToUpdate(),
-                                    false,
-                                    null,
-                                    cmd.safeTime(),
-                                    null,
-                                    indexIdsAtRwTxBeginTs(txId)
-                            );
+                            if 
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
 {
+                                // We don't need to take the partition 
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
+                                storageUpdateHandler.handleUpdate(
+                                        cmd.txId(),
+                                        cmd.rowUuid(),
+                                        
cmd.tablePartitionId().asTablePartitionId(),
+                                        cmd.rowToUpdate(),
+                                        false,
+                                        null,
+                                        cmd.safeTime(),
+                                        null,
+                                        indexIdsAtRwTxBeginTs(txId)
+                                );
+                            }
 
                             
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
                         }
@@ -3071,7 +3048,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(new ReplicaResult(false, null));
                     }
 
-                    RowId rowId0 = new RowId(partId(), UUID.randomUUID());
+                    RowId rowId0 = new RowId(partId(), RowIdGenerator.next());
 
                     return takeLocksForInsert(searchRow, rowId0, txId)
                             .thenCompose(rowIdLock -> 
validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
@@ -3098,7 +3075,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     boolean insert = rowId == null;
 
-                    RowId rowId0 = insert ? new RowId(partId(), 
UUID.randomUUID()) : rowId;
+                    RowId rowId0 = insert ? new RowId(partId(), 
RowIdGenerator.next()) : rowId;
 
                     CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
lockFut = insert
                             ? takeLocksForInsert(searchRow, rowId0, txId)
@@ -3130,7 +3107,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     boolean insert = rowId == null;
 
-                    RowId rowId0 = insert ? new RowId(partId(), 
UUID.randomUUID()) : rowId;
+                    RowId rowId0 = insert ? new RowId(partId(), 
RowIdGenerator.next()) : rowId;
 
                     CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
lockFut = insert
                             ? takeLocksForInsert(searchRow, rowId0, txId)
@@ -3306,11 +3283,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     }
 
     /**
-     *  Wait for the async cleanup of the provided row to finish.
+     * Wait for the async cleanup of the provided row to finish.
      *
      * @param rowId Row Ids of existing row that the transaction affects.
      * @param result The value that the returned future will wrap.
-     *
      * @param <T> Type of the {@code result}.
      */
     private <T> CompletableFuture<T> awaitCleanup(@Nullable RowId rowId, T 
result) {
@@ -3323,7 +3299,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      *
      * @param rowIds Row Ids of existing rows that the transaction affects.
      * @param result The value that the returned future will wrap.
-     *
      * @param <T> Type of the {@code result}.
      */
     private <T> CompletableFuture<T> awaitCleanup(Collection<RowId> rowIds, T 
result) {
@@ -3588,42 +3563,49 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         if (request instanceof PrimaryReplicaRequest) {
             Long enlistmentConsistencyToken = ((PrimaryReplicaRequest) 
request).enlistmentConsistencyToken();
 
-            return placementDriver.getPrimaryReplica(replicationGroupId, now)
-                    .thenCompose(primaryReplicaMeta -> {
-                        if (primaryReplicaMeta == null) {
-                            return failedFuture(
-                                    new PrimaryReplicaMissException(
-                                            localNode.name(),
-                                            null,
-                                            localNode.id(),
-                                            null,
-                                            enlistmentConsistencyToken,
-                                            null,
-                                            null
-                                    )
-                            );
-                        }
+            Function<ReplicaMeta, IgniteBiTuple<Boolean, Long>> validateClo = 
primaryReplicaMeta -> {
+                if (primaryReplicaMeta == null) {
+                    throw new PrimaryReplicaMissException(
+                            localNode.name(),
+                            null,
+                            localNode.id(),
+                            null,
+                            enlistmentConsistencyToken,
+                            null,
+                            null
+                    );
+                }
 
-                        long currentEnlistmentConsistencyToken = 
primaryReplicaMeta.getStartTime().longValue();
-
-                        if (enlistmentConsistencyToken != 
currentEnlistmentConsistencyToken
-                                || 
clockService.before(primaryReplicaMeta.getExpirationTime(), now)
-                                || 
!isLocalPeer(primaryReplicaMeta.getLeaseholderId())
-                        ) {
-                            return failedFuture(
-                                    new PrimaryReplicaMissException(
-                                            localNode.name(),
-                                            
primaryReplicaMeta.getLeaseholder(),
-                                            localNode.id(),
-                                            
primaryReplicaMeta.getLeaseholderId(),
-                                            enlistmentConsistencyToken,
-                                            currentEnlistmentConsistencyToken,
-                                            null)
-                            );
-                        }
+                long currentEnlistmentConsistencyToken = 
primaryReplicaMeta.getStartTime().longValue();
+
+                if (enlistmentConsistencyToken != 
currentEnlistmentConsistencyToken
+                        || 
clockService.before(primaryReplicaMeta.getExpirationTime(), now)
+                        || !isLocalPeer(primaryReplicaMeta.getLeaseholderId())
+                ) {
+                    throw new PrimaryReplicaMissException(
+                            localNode.name(),
+                            primaryReplicaMeta.getLeaseholder(),
+                            localNode.id(),
+                            primaryReplicaMeta.getLeaseholderId(),
+                            enlistmentConsistencyToken,
+                            currentEnlistmentConsistencyToken,
+                            null);
+                }
 
-                        return completedFuture(new IgniteBiTuple<>(null, 
primaryReplicaMeta.getStartTime().longValue()));
-                    });
+                return new IgniteBiTuple<>(null, 
primaryReplicaMeta.getStartTime().longValue());
+            };
+
+            ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(replicationGroupId, now);
+
+            if (meta != null) {
+                try {
+                    return completedFuture(validateClo.apply(meta));
+                } catch (Exception e) {
+                    return failedFuture(e);
+                }
+            }
+
+            return placementDriver.getPrimaryReplica(replicationGroupId, 
now).thenApply(validateClo);
         } else if (request instanceof ReadOnlyReplicaRequest || request 
instanceof ReplicaSafeTimeSyncRequest) {
             return placementDriver.getPrimaryReplica(replicationGroupId, now)
                     .thenApply(primaryReplica -> new IgniteBiTuple<>(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 296cf73133..39900f36e9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -1923,33 +1923,49 @@ public class InternalTableImpl implements InternalTable 
{
      * @return The enlist future (then will a leader become known).
      */
     protected CompletableFuture<IgniteBiTuple<ClusterNode, Long>> enlist(int 
partId, InternalTransaction tx) {
+        HybridTimestamp now = clock.now();
+
         TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
         tx.assignCommitPartition(tablePartitionId);
 
-        return partitionMeta(tablePartitionId).thenApply(meta -> {
+        ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(tablePartitionId, now);
+
+        Function<ReplicaMeta, IgniteBiTuple<ClusterNode, Long>> enlistClo = 
replicaMeta -> {
             TablePartitionId partGroupId = new TablePartitionId(tableId, 
partId);
 
-            return tx.enlist(partGroupId, new IgniteBiTuple<>(
-                    getClusterNode(meta),
-                    enlistmentConsistencyToken(meta))
-            );
-        });
+            IgniteBiTuple<ClusterNode, Long> enlistState = new 
IgniteBiTuple<>(getClusterNode(replicaMeta),
+                    enlistmentConsistencyToken(replicaMeta));
+
+            tx.enlist(partGroupId, enlistState);
+
+            return enlistState;
+        };
+
+        if (meta != null) {
+            try {
+                return completedFuture(enlistClo.apply(meta));
+            } catch (IgniteException e) {
+                if (e.code() != REPLICA_UNAVAILABLE_ERR) {
+                    return failedFuture(e);
+                }
+            }
+        }
+
+        return partitionMeta(tablePartitionId, now).thenApply(enlistClo);
     }
 
     @Override
     public CompletableFuture<ClusterNode> partitionLocation(TablePartitionId 
tablePartitionId) {
-        return partitionMeta(tablePartitionId).thenApply(this::getClusterNode);
+        return partitionMeta(tablePartitionId, 
clock.now()).thenApply(this::getClusterNode);
     }
 
-    private CompletableFuture<ReplicaMeta> partitionMeta(TablePartitionId 
tablePartitionId) {
-        HybridTimestamp now = clock.now();
-
-        return awaitPrimaryReplica(tablePartitionId, now)
+    private CompletableFuture<ReplicaMeta> partitionMeta(TablePartitionId 
tablePartitionId, HybridTimestamp at) {
+        return awaitPrimaryReplica(tablePartitionId, at)
                 .exceptionally(e -> {
                     throw withCause(
                             TransactionException::new,
                             REPLICA_UNAVAILABLE_ERR,
-                            "Failed to get the primary replica 
[tablePartitionId=" + tablePartitionId + ", awaitTimestamp=" + now + ']',
+                            "Failed to get the primary replica 
[tablePartitionId=" + tablePartitionId + ", awaitTimestamp=" + at + ']',
                             e
                     );
                 });
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
index c5989ec45a..aba6e5430b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * A base for a {@link PlacementDriver} that delegates some of its methods to 
another {@link PlacementDriver}.
@@ -60,6 +61,11 @@ abstract class DelegatingPlacementDriver implements 
PlacementDriver {
         return delegate.getPrimaryReplica(replicationGroupId, timestamp);
     }
 
+    @Override
+    public @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId 
replicationGroupId, HybridTimestamp timestamp) {
+        return delegate.getCurrentPrimaryReplica(replicationGroupId, 
timestamp);
+    }
+
     @Override
     public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId 
grpId) {
         return delegate.previousPrimaryExpired(grpId);
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index b2ea186b42..84eed4c7e6 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -101,9 +101,11 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
             TablePartitionId tablePartitionId,
             IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken
     ) {
-        checkEnlistPossibility();
-
-        enlistPartitionLock.readLock().lock();
+        // No need to wait for lock if commit is in progress.
+        if (!enlistPartitionLock.readLock().tryLock()) {
+            failEnlist();
+            assert false; // Not reachable.
+        }
 
         try {
             checkEnlistPossibility();
@@ -114,15 +116,22 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
         }
     }
 
+    /**
+     * Fails the operation.
+     */
+    private void failEnlist() {
+        throw new TransactionException(
+                TX_ALREADY_FINISHED_ERR,
+                format("Transaction is already finished [id={}, state={}].", 
id(), state()));
+    }
+
     /**
      * Checks that this transaction was not finished and will be able to 
enlist another partition.
      */
     private void checkEnlistPossibility() {
         if (finishFuture != null) {
             // This means that the transaction is either in final or FINISHING 
state.
-            throw new TransactionException(
-                    TX_ALREADY_FINISHED_ERR,
-                    format("Transaction is already finished [id={}, 
state={}].", id(), state()));
+            failEnlist();
         }
     }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 542952d0a6..03d3b3308a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -56,8 +56,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
@@ -165,14 +165,13 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
      * Total number of started transaction.
      * TODO: IGNITE-21440 Implement transaction metrics.
      */
-    private final AtomicInteger startedTxs = new AtomicInteger();
+    private final LongAdder startedTxs = new LongAdder();
 
     /**
      * Total number of finished transaction.
      * TODO: IGNITE-21440 Implement transaction metrics.
      */
-    private final AtomicInteger finishedTxs = new AtomicInteger();
-
+    private final LongAdder finishedTxs = new LongAdder();
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -393,7 +392,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         HybridTimestamp beginTimestamp = readOnly ? clockService.now() : 
createBeginTimestampWithIncrementRwTxCounter();
         UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, 
priority);
 
-        startedTxs.incrementAndGet();
+        startedTxs.add(1);
 
         if (!readOnly) {
             txStateVolatileStorage.initialize(txId, localNodeId);
@@ -463,7 +462,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, 
boolean commit) {
         TxState finalState;
 
-        finishedTxs.incrementAndGet();
+        finishedTxs.add(1);
 
         if (commit) {
             timestampTracker.update(clockService.now());
@@ -498,7 +497,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     ) {
         LOG.debug("Finish [commit={}, txId={}, groups={}].", commitIntent, 
txId, enlistedGroups);
 
-        finishedTxs.incrementAndGet();
+        finishedTxs.add(1);
 
         assert enlistedGroups != null;
 
@@ -751,12 +750,12 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
     @Override
     public int finished() {
-        return finishedTxs.get();
+        return finishedTxs.intValue();
     }
 
     @Override
     public int pending() {
-        return startedTxs.get() - finishedTxs.get();
+        return startedTxs.intValue() - finishedTxs.intValue();
     }
 
     @Override
@@ -867,7 +866,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     }
 
     CompletableFuture<Void> completeReadOnlyTransactionFuture(TxIdAndTimestamp 
txIdAndTimestamp) {
-        finishedTxs.incrementAndGet();
+        finishedTxs.add(1);
 
         CompletableFuture<Void> readOnlyTxFuture = 
readOnlyTxFutureById.remove(txIdAndTimestamp);
 

Reply via email to