This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 33582cb1b0 IGNITE-20124 Prevent double storage updates within primary 
(#4049)
33582cb1b0 is described below

commit 33582cb1b038a367d6bd5ad9456d7c21ced5c3ad
Author: Alexander Lapin <[email protected]>
AuthorDate: Mon Aug 26 19:03:58 2024 +0300

    IGNITE-20124 Prevent double storage updates within primary (#4049)
---
 .../ignite/internal/replicator/ReplicaImpl.java    |  20 ++-
 .../message/PrimaryReplicaChangeCommand.java       |   6 +
 .../benchmark/AbstractMultiNodeBenchmark.java      |   6 +-
 .../ignite/internal/benchmark/InsertBenchmark.java |   8 +
 .../internal/storage/MvPartitionStorage.java       |  22 ++-
 .../storage/ThreadAssertingMvPartitionStorage.java |  18 +-
 .../storage/AbstractMvPartitionStorageTest.java    |  15 +-
 .../storage/impl/TestMvPartitionStorage.java       |  29 ++-
 .../storage/pagememory/StoragePartitionMeta.java   |  67 ++++++-
 .../pagememory/StoragePartitionMetaFactory.java    |   2 +
 .../storage/pagememory/StoragePartitionMetaIo.java |  55 +++++-
 .../mv/PersistentPageMemoryMvPartitionStorage.java | 124 ++++++++++++-
 .../mv/VolatilePageMemoryMvPartitionStorage.java   |  32 +++-
 .../StoragePartitionMetaManagerTest.java           |   2 +-
 .../pagememory/StoragePartitionMetaTest.java       |   2 +-
 .../storage/rocksdb/RocksDbMvPartitionStorage.java |  73 ++++++--
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |   9 +
 .../ReplicasSafeTimePropagationTest.java           |   3 +-
 .../table/distributed/StorageUpdateHandler.java    |   1 -
 .../internal/table/distributed/TableManager.java   |   3 +-
 .../distributed/raft/PartitionDataStorage.java     |  22 ++-
 .../table/distributed/raft/PartitionListener.java  | 141 +++++++++------
 .../SnapshotAwarePartitionDataStorage.java         |  18 +-
 .../replicator/PartitionReplicaListener.java       | 196 ++++++++-------------
 .../raft/PartitionCommandListenerTest.java         |  65 +++++--
 .../replication/PartitionReplicaListenerTest.java  |   6 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |  38 +++-
 .../distributed/TestPartitionDataStorage.java      |  18 +-
 .../table/impl/DummyInternalTableImpl.java         |  38 +++-
 .../ignite/internal/tx/UpdateCommandResult.java    |  48 +++--
 30 files changed, 832 insertions(+), 255 deletions(-)

diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
index 7f46cd276e..5c7bb1f9c0 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
@@ -218,7 +218,11 @@ public class ReplicaImpl implements Replica {
                 // group leader are received.
 
                 return waitForActualState(msg.leaseStartTime(), 
msg.leaseExpirationTime().getPhysical())
-                        .thenCompose(v -> 
sendPrimaryReplicaChangeToReplicationGroup(msg.leaseStartTime().longValue()))
+                        .thenCompose(v -> 
sendPrimaryReplicaChangeToReplicationGroup(
+                                msg.leaseStartTime().longValue(),
+                                localNode.id(),
+                                localNode.name()
+                        ))
                         .thenCompose(v -> {
                             CompletableFuture<LeaseGrantedMessageResponse> 
respFut =
                                     acceptLease(msg.leaseStartTime(), 
msg.leaseExpirationTime());
@@ -233,7 +237,11 @@ public class ReplicaImpl implements Replica {
             } else {
                 if (leader.equals(localNode)) {
                     return waitForActualState(msg.leaseStartTime(), 
msg.leaseExpirationTime().getPhysical())
-                            .thenCompose(v -> 
sendPrimaryReplicaChangeToReplicationGroup(msg.leaseStartTime().longValue()))
+                            .thenCompose(v -> 
sendPrimaryReplicaChangeToReplicationGroup(
+                                    msg.leaseStartTime().longValue(),
+                                    localNode.id(),
+                                    localNode.name()
+                            ))
                             .thenCompose(v -> 
acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()));
                 } else {
                     return proposeLeaseRedirect(leader);
@@ -242,9 +250,15 @@ public class ReplicaImpl implements Replica {
         }));
     }
 
-    private CompletableFuture<Void> 
sendPrimaryReplicaChangeToReplicationGroup(long leaseStartTime) {
+    private CompletableFuture<Void> sendPrimaryReplicaChangeToReplicationGroup(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    ) {
         PrimaryReplicaChangeCommand cmd = 
REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
                 .leaseStartTime(leaseStartTime)
+                .primaryReplicaNodeId(primaryReplicaNodeId)
+                .primaryReplicaNodeName(primaryReplicaNodeName)
                 .build();
 
         return raftClient.run(cmd);
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/PrimaryReplicaChangeCommand.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/PrimaryReplicaChangeCommand.java
index 941bd3e2d5..9511a68e2e 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/PrimaryReplicaChangeCommand.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/PrimaryReplicaChangeCommand.java
@@ -30,4 +30,10 @@ import org.apache.ignite.internal.raft.WriteCommand;
 public interface PrimaryReplicaChangeCommand extends WriteCommand {
     /** Lease start time, hybrid timestamp as long, see {@link 
HybridTimestamp#longValue()}. */
     long leaseStartTime();
+
+    /** Primary replica node id. */
+    String primaryReplicaNodeId();
+
+    /** Primary replica node name. */
+    String primaryReplicaNodeName();
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index 93a011988c..23678e9954 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -86,7 +86,7 @@ public class AbstractMultiNodeBenchmark {
             var queryEngine = igniteImpl.queryEngine();
 
             var createZoneStatement = "CREATE ZONE IF NOT EXISTS " + ZONE_NAME 
+ " WITH partitions=" + partitionCount()
-                    + ", storage_profiles ='" + DEFAULT_STORAGE_PROFILE + "'";
+                    + ", replicas=" + replicaCount() + ", storage_profiles ='" 
+ DEFAULT_STORAGE_PROFILE + "'";
 
             getAllFromCursor(
                     await(queryEngine.queryAsync(
@@ -244,4 +244,8 @@ public class AbstractMultiNodeBenchmark {
     protected int partitionCount() {
         return CatalogUtils.DEFAULT_PARTITION_COUNT;
     }
+
+    protected int replicaCount() {
+        return CatalogUtils.DEFAULT_REPLICA_COUNT;
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
index a418b45f67..edc021ea3b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
@@ -68,6 +68,9 @@ public class InsertBenchmark extends 
AbstractMultiNodeBenchmark {
     @Param({"1", "2", "4", "8", "16", "32"})
     private int partitionCount;
 
+    @Param({"1", "2", "3"})
+    private int replicaCount;
+
     /**
      * Benchmark for SQL insert via embedded client.
      */
@@ -408,4 +411,9 @@ public class InsertBenchmark extends 
AbstractMultiNodeBenchmark {
     protected int partitionCount() {
         return partitionCount;
     }
+
+    @Override
+    protected int replicaCount() {
+        return replicaCount;
+    }
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index a7daeebd92..31c7b5a522 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -274,8 +274,14 @@ public interface MvPartitionStorage extends 
ManuallyCloseable {
      * Updates the current lease start time in the storage.
      *
      * @param leaseStartTime Lease start time.
+     * @param primaryReplicaNodeId Primary replica node id.
+     * @param primaryReplicaNodeName Primary replica node name.
      */
-    void updateLease(long leaseStartTime);
+    void updateLease(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    );
 
     /**
      * Returns the start time of the known lease for this replication group.
@@ -284,6 +290,20 @@ public interface MvPartitionStorage extends 
ManuallyCloseable {
      */
     long leaseStartTime();
 
+    /**
+     * Return the node id of the known lease for this replication group.
+     *
+     * @return Primary replica node id or null if there is no information 
about lease in the storage.
+     */
+    @Nullable String primaryReplicaNodeId();
+
+    /**
+     * Return the node name of the known lease for this replication group.
+     *
+     * @return Primary replica node name or null if there is no information 
about lease in the storage.
+     */
+    @Nullable String primaryReplicaNodeName();
+
     /**
      * Returns the <em>estimated size</em> of this partition.
      *
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
index 9d0350d38a..50b40fd5a2 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -155,10 +155,14 @@ public class ThreadAssertingMvPartitionStorage implements 
MvPartitionStorage, Wr
     }
 
     @Override
-    public void updateLease(long leaseStartTime) {
+    public void updateLease(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    ) {
         assertThreadAllowsToWrite();
 
-        partitionStorage.updateLease(leaseStartTime);
+        partitionStorage.updateLease(leaseStartTime, primaryReplicaNodeId, 
primaryReplicaNodeName);
     }
 
     @Override
@@ -166,6 +170,16 @@ public class ThreadAssertingMvPartitionStorage implements 
MvPartitionStorage, Wr
         return partitionStorage.leaseStartTime();
     }
 
+    @Override
+    public @Nullable String primaryReplicaNodeId() {
+        return partitionStorage.primaryReplicaNodeId();
+    }
+
+    @Override
+    public @Nullable String primaryReplicaNodeName() {
+        return partitionStorage.primaryReplicaNodeName();
+    }
+
     @Override
     public long estimatedSize() {
         return partitionStorage.estimatedSize();
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 252cde5c11..675aec6257 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1375,20 +1375,29 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
     public void testLease() {
         storage.runConsistently(locker -> {
             long lst0 = 1000;
+            String nodeId0 = UUID.randomUUID().toString();
 
             long lst1 = 2000;
+            String nodeId1 = UUID.randomUUID().toString();
 
-            storage.updateLease(lst0);
+            storage.updateLease(lst0, nodeId0, nodeId0 + "name");
 
             assertEquals(lst0, storage.leaseStartTime());
+            assertEquals(nodeId0, storage.primaryReplicaNodeId());
+            assertEquals(nodeId0 + "name", storage.primaryReplicaNodeName());
 
-            storage.updateLease(lst1);
+            storage.updateLease(lst1, nodeId1, nodeId1 + "name");
 
             assertEquals(lst1, storage.leaseStartTime());
+            assertEquals(nodeId1, storage.primaryReplicaNodeId());
+            assertEquals(nodeId1 + "name", storage.primaryReplicaNodeName());
 
-            storage.updateLease(0);
+            String nodeIdRandom = UUID.randomUUID().toString();
+            storage.updateLease(0, nodeIdRandom, nodeIdRandom + "name");
 
             assertEquals(lst1, storage.leaseStartTime());
+            assertEquals(nodeId1, storage.primaryReplicaNodeId());
+            assertEquals(nodeId1 + "name", storage.primaryReplicaNodeName());
 
             return null;
         });
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 1d979d2776..4c48dc67a7 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -70,7 +70,12 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     private volatile long lastAppliedTerm;
 
-    private volatile long leaseStartTime = 
HybridTimestamp.MIN_VALUE.longValue();
+    // -1 is used as an initial value, in order not to clash with {@code 
ReplicaMeta.getStartTime}
+    private volatile long leaseStartTime = -1;
+
+    private volatile String primaryReplicaNodeId;
+
+    private volatile String primaryReplicaNodeName;
 
     private volatile long estimatedSize;
 
@@ -633,7 +638,11 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     @Override
-    public synchronized void updateLease(long leaseStartTime) {
+    public synchronized void updateLease(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    ) {
         checkStorageClosed();
 
         if (leaseStartTime <= this.leaseStartTime) {
@@ -641,6 +650,8 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
         }
 
         this.leaseStartTime = leaseStartTime;
+        this.primaryReplicaNodeId = primaryReplicaNodeId;
+        this.primaryReplicaNodeName = primaryReplicaNodeName;
     }
 
     @Override
@@ -650,6 +661,20 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
         return leaseStartTime;
     }
 
+    @Override
+    public @Nullable String primaryReplicaNodeId() {
+        checkStorageClosed();
+
+        return primaryReplicaNodeId;
+    }
+
+    @Override
+    public @Nullable String primaryReplicaNodeName() {
+        checkStorageClosed();
+
+        return primaryReplicaNodeName;
+    }
+
     @Override
     public long estimatedSize() {
         checkStorageClosed();
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMeta.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMeta.java
index 7e4d94030d..bfeeb4e242 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMeta.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMeta.java
@@ -43,6 +43,10 @@ public class StoragePartitionMeta extends PartitionMeta {
 
     private volatile long leaseStartTime;
 
+    private volatile long primaryReplicaNodeIdFirstPageId;
+
+    private volatile long primaryReplicaNodeNameFirstPageId;
+
     private volatile long freeListRootPageId;
 
     private volatile long versionChainTreeRootPageId;
@@ -59,8 +63,11 @@ public class StoragePartitionMeta extends PartitionMeta {
      * @param pageCount Count of pages in the partition.
      * @param lastAppliedIndex Last applied index value.
      * @param lastAppliedTerm Last applied term value.
-     * @param lastReplicationProtocolGroupConfigFirstPageId ID of the first 
page in a chain storing a blob representing.
+     * @param lastReplicationProtocolGroupConfigFirstPageId ID of the first 
page in a chain storing a blob representing last replication
+     *     protocol group config.
      * @param leaseStartTime Lease start time.
+     * @param primaryReplicaNodeIdFirstPageId ID of the first page in a chain 
storing a blob representing a primary replica node id.
+     * @param primaryReplicaNodeNameFirstPageId ID of the first page in a 
chain storing a blob representing a primary replica node name.
      * @param freeListRootPageId Free list root page ID.
      * @param versionChainTreeRootPageId Version chain tree root page ID.
      * @param indexTreeMetaPageId Index tree meta page ID.
@@ -73,6 +80,8 @@ public class StoragePartitionMeta extends PartitionMeta {
             long lastAppliedTerm,
             long lastReplicationProtocolGroupConfigFirstPageId,
             long leaseStartTime,
+            long primaryReplicaNodeIdFirstPageId,
+            long primaryReplicaNodeNameFirstPageId,
             long freeListRootPageId,
             long versionChainTreeRootPageId,
             long indexTreeMetaPageId,
@@ -84,6 +93,8 @@ public class StoragePartitionMeta extends PartitionMeta {
         this.lastAppliedTerm = lastAppliedTerm;
         this.lastReplicationProtocolGroupConfigFirstPageId = 
lastReplicationProtocolGroupConfigFirstPageId;
         this.leaseStartTime = leaseStartTime;
+        this.primaryReplicaNodeIdFirstPageId = primaryReplicaNodeIdFirstPageId;
+        this.primaryReplicaNodeNameFirstPageId = 
primaryReplicaNodeNameFirstPageId;
         this.freeListRootPageId = freeListRootPageId;
         this.versionChainTreeRootPageId = versionChainTreeRootPageId;
         this.indexTreeMetaPageId = indexTreeMetaPageId;
@@ -264,6 +275,8 @@ public class StoragePartitionMeta extends PartitionMeta {
                 gcQueueMetaPageId,
                 pageCount(),
                 leaseStartTime,
+                primaryReplicaNodeIdFirstPageId,
+                primaryReplicaNodeNameFirstPageId,
                 estimatedSize
         );
     }
@@ -287,10 +300,6 @@ public class StoragePartitionMeta extends PartitionMeta {
     public void updateLease(@Nullable UUID checkpointId, long leaseStartTime) {
         updateSnapshot(checkpointId);
 
-        if (leaseStartTime <= this.leaseStartTime) {
-            return;
-        }
-
         this.leaseStartTime = leaseStartTime;
     }
 
@@ -303,6 +312,44 @@ public class StoragePartitionMeta extends PartitionMeta {
         return leaseStartTime;
     }
 
+    /**
+     * Returns ID of the first page in a chain storing a blob representing 
primary replica node id.
+     */
+    public long primaryReplicaNodeIdFirstPageId() {
+        return primaryReplicaNodeIdFirstPageId;
+    }
+
+    /**
+     * Sets ID of the first page in a chain storing a blob representing 
primary replica node id.
+     *
+     * @param checkpointId Checkpoint ID.
+     * @param pageId PageId.
+     */
+    public void primaryReplicaNodeIdFirstPageId(@Nullable UUID checkpointId, 
long pageId) {
+        updateSnapshot(checkpointId);
+
+        this.primaryReplicaNodeIdFirstPageId = pageId;
+    }
+
+    /**
+     * Returns ID of the first page in a chain storing a blob representing 
primary replica node name.
+     */
+    public long primaryReplicaNodeNameFirstPageId() {
+        return primaryReplicaNodeNameFirstPageId;
+    }
+
+    /**
+     * Sets ID of the first page in a chain storing a blob representing 
primary replica node name.
+     *
+     * @param checkpointId Checkpoint ID.
+     * @param pageId PageId.
+     */
+    public void primaryReplicaNodeNameFirstPageId(@Nullable UUID checkpointId, 
long pageId) {
+        updateSnapshot(checkpointId);
+
+        this.primaryReplicaNodeNameFirstPageId = pageId;
+    }
+
     /**
      * An immutable snapshot of the partition's meta information.
      */
@@ -327,6 +374,10 @@ public class StoragePartitionMeta extends PartitionMeta {
 
         private final long leaseStartTime;
 
+        private final long primaryReplicaNodeIdFirstPageId;
+
+        private final long primaryReplicaNodeNameFirstPageId;
+
         private final long estimatedSize;
 
         private StoragePartitionMetaSnapshot(
@@ -340,6 +391,8 @@ public class StoragePartitionMeta extends PartitionMeta {
                 long gcQueueMetaPageId,
                 int pageCount,
                 long leaseStartTime,
+                long primaryReplicaNodeIdFistPageId,
+                long primaryReplicaNodeNameFistPageId,
                 long estimatedSize
         ) {
             this.checkpointId = checkpointId;
@@ -352,6 +405,8 @@ public class StoragePartitionMeta extends PartitionMeta {
             this.gcQueueMetaPageId = gcQueueMetaPageId;
             this.pageCount = pageCount;
             this.leaseStartTime = leaseStartTime;
+            this.primaryReplicaNodeIdFirstPageId = 
primaryReplicaNodeIdFistPageId;
+            this.primaryReplicaNodeNameFirstPageId = 
primaryReplicaNodeNameFistPageId;
             this.estimatedSize = estimatedSize;
         }
 
@@ -443,6 +498,8 @@ public class StoragePartitionMeta extends PartitionMeta {
             storageMetaIo.setGcQueueMetaPageId(pageAddr, gcQueueMetaPageId);
             storageMetaIo.setPageCount(pageAddr, pageCount);
             storageMetaIo.setLeaseStartTime(pageAddr, leaseStartTime);
+            storageMetaIo.setPrimaryReplicaNodeIdFirstPageId(pageAddr, 
primaryReplicaNodeIdFirstPageId);
+            storageMetaIo.setPrimaryReplicaNodeNameFirstPageId(pageAddr, 
primaryReplicaNodeNameFirstPageId);
             storageMetaIo.setEstimatedSize(pageAddr, estimatedSize);
         }
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
index 0f791b8f7c..8f39e66043 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
@@ -36,6 +36,8 @@ public class StoragePartitionMetaFactory implements 
PartitionMetaFactory {
                 metaIo.getLastAppliedTerm(pageAddr),
                 
metaIo.getLastReplicationProtocolGroupConfigFirstPageId(pageAddr),
                 metaIo.getLeaseStartTime(pageAddr),
+                metaIo.getPrimaryReplicaNodeIdFirstPageId(pageAddr),
+                metaIo.getPrimaryReplicaNodeNameFirstPageId(pageAddr),
                 StoragePartitionMetaIo.getFreeListRootPageId(pageAddr),
                 metaIo.getVersionChainTreeRootPageId(pageAddr),
                 metaIo.getIndexTreeMetaPageId(pageAddr),
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
index 20e3c62bec..9b7da9c977 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
@@ -48,8 +48,12 @@ public class StoragePartitionMetaIo extends PartitionMetaIo {
 
     private static final int LEASE_START_TIME_OFF = GC_QUEUE_META_PAGE_ID_OFF 
+ Long.BYTES;
 
-    private static final int ESTIMATED_SIZE_OFF = LEASE_START_TIME_OFF + 
Long.BYTES;
+    private static final int PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF = 
LEASE_START_TIME_OFF + Long.BYTES;
 
+    private static final int PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF = 
PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF + Long.BYTES;
+
+    /** Estimated size here is not a size of a meta, but an approximate rows 
count. */
+    private static final int ESTIMATED_SIZE_OFF = 
PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF + Long.BYTES;
 
     /** I/O versions. */
     public static final IoVersions<StoragePartitionMetaIo> VERSIONS = new 
IoVersions<>(new StoragePartitionMetaIo(1));
@@ -77,6 +81,8 @@ public class StoragePartitionMetaIo extends PartitionMetaIo {
         setGcQueueMetaPageId(pageAddr, 0);
         setPageCount(pageAddr, 0);
         setLeaseStartTime(pageAddr, HybridTimestamp.MIN_VALUE.longValue());
+        setPrimaryReplicaNodeIdFirstPageId(pageAddr, 0);
+        setPrimaryReplicaNodeNameFirstPageId(pageAddr, 0);
         setEstimatedSize(pageAddr, 0);
     }
 
@@ -249,6 +255,51 @@ public class StoragePartitionMetaIo extends 
PartitionMetaIo {
         return getLong(pageAddr, LEASE_START_TIME_OFF);
     }
 
+    /**
+     * Sets the primary replica node id first page id.
+     *
+     * @param pageAddr Page address.
+     * @param primaryReplicaNodeIdFirstPageId Primary replica node id first 
page id.
+     */
+    public void setPrimaryReplicaNodeIdFirstPageId(long pageAddr, long 
primaryReplicaNodeIdFirstPageId) {
+        assertPageType(pageAddr);
+
+        putLong(pageAddr, PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF, 
primaryReplicaNodeIdFirstPageId);
+    }
+
+    /**
+     * Returns the primary replica node id first page id.
+     *
+     * @param pageAddr Page address.
+     * @return Primary replica node id first page id.
+     */
+    public long getPrimaryReplicaNodeIdFirstPageId(long pageAddr) {
+        return getLong(pageAddr, PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF);
+    }
+
+
+    /**
+     * Sets the primary replica node name first page id.
+     *
+     * @param pageAddr Page address.
+     * @param primaryReplicaNodeNameFirstPageId Primary replica node name 
first page id.
+     */
+    public void setPrimaryReplicaNodeNameFirstPageId(long pageAddr, long 
primaryReplicaNodeNameFirstPageId) {
+        assertPageType(pageAddr);
+
+        putLong(pageAddr, PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF, 
primaryReplicaNodeNameFirstPageId);
+    }
+
+    /**
+     * Returns the primary replica node name first page id.
+     *
+     * @param pageAddr Page address.
+     * @return Primary replica node name first page id.
+     */
+    public long getPrimaryReplicaNodeNameFirstPageId(long pageAddr) {
+        return getLong(pageAddr, PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF);
+    }
+
     /**
      * Sets the estimated size of this partition.
      *
@@ -282,6 +333,8 @@ public class StoragePartitionMetaIo extends PartitionMetaIo 
{
                 
.app("gcQueueMetaPageId=").appendHex(getGcQueueMetaPageId(addr)).nl()
                 .app("pageCount=").app(getPageCount(addr)).nl()
                 .app("leaseStartTime=").app(getLeaseStartTime(addr)).nl()
+                
.app("primaryReplicaNodeIdFirstPageId=").app(getPrimaryReplicaNodeIdFirstPageId(addr)).nl()
+                
.app("primaryReplicaNodeNameFirstPageId=").app(getPrimaryReplicaNodeNameFirstPageId(addr)).nl()
                 .app("estimatedSize=").app(getEstimatedSize(addr)).nl()
                 .app(']');
     }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 296eb3af35..5fc99d0baa 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage.pagememory.mv;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
+import static org.apache.ignite.internal.util.ByteUtils.stringToBytes;
 
 import java.util.List;
 import java.util.UUID;
@@ -52,6 +53,7 @@ import 
org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
 import 
org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage;
 import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
 import org.apache.ignite.internal.storage.util.LocalLocker;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -75,6 +77,19 @@ public class PersistentPageMemoryMvPartitionStorage extends 
AbstractPageMemoryMv
     /** Lock that protects group config read/write. */
     private final ReadWriteLock replicationProtocolGroupConfigReadWriteLock = 
new ReentrantReadWriteLock();
 
+    /** Lock that protects group primary replica meta read/write. */
+    private final ReadWriteLock primaryReplicaMetaReadWriteLock = new 
ReentrantReadWriteLock();
+
+    /**
+     * Cached primary replica node id in order not to touch blobStorage each 
time. Guarded by primaryReplicaMetaReadWriteLock.
+     */
+    private String primaryReplicaNodeId;
+
+    /**
+     * Cached primary replica node name in order not to touch blobStorage each 
time. Guarded by primaryReplicaMetaReadWriteLock.
+     */
+    private String primaryReplicaNodeName;
+
     /**
      * Constructor.
      *
@@ -303,11 +318,50 @@ public class PersistentPageMemoryMvPartitionStorage 
extends AbstractPageMemoryMv
     }
 
     @Override
-    public void updateLease(long leaseStartTime) {
+    public void updateLease(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    ) {
         busy(() -> {
             throwExceptionIfStorageNotInRunnableState();
 
-            updateMeta((lastCheckpointId, meta) -> 
meta.updateLease(lastCheckpointId, leaseStartTime));
+            updateMeta((lastCheckpointId, meta) -> {
+                primaryReplicaMetaReadWriteLock.writeLock().lock();
+                try {
+                    if (leaseStartTime <= meta.leaseStartTime()) {
+                        return;
+                    }
+
+                    if (meta.primaryReplicaNodeIdFirstPageId() == 
BlobStorage.NO_PAGE_ID) {
+                        long primaryReplicaNodeIdFirstPageId = 
blobStorage.addBlob(stringToBytes(primaryReplicaNodeId));
+
+                        meta.primaryReplicaNodeIdFirstPageId(lastCheckpointId, 
primaryReplicaNodeIdFirstPageId);
+                    } else {
+                        
blobStorage.updateBlob(meta.primaryReplicaNodeIdFirstPageId(), 
stringToBytes(primaryReplicaNodeId));
+                    }
+                    if (meta.primaryReplicaNodeNameFirstPageId() == 
BlobStorage.NO_PAGE_ID) {
+                        long primaryReplicaNodeNameFirstPageId = 
blobStorage.addBlob(stringToBytes(primaryReplicaNodeName));
+
+                        
meta.primaryReplicaNodeNameFirstPageId(lastCheckpointId, 
primaryReplicaNodeNameFirstPageId);
+                    } else {
+                        
blobStorage.updateBlob(meta.primaryReplicaNodeNameFirstPageId(), 
stringToBytes(primaryReplicaNodeName));
+                    }
+
+                    meta.updateLease(lastCheckpointId, leaseStartTime);
+
+                    this.primaryReplicaNodeId = primaryReplicaNodeId;
+                    this.primaryReplicaNodeName = primaryReplicaNodeName;
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException(
+                            "Cannot save lease meta: [tableId={}, 
partitionId={}]",
+                            e,
+                            tableStorage.getTableId(), partitionId
+                    );
+                } finally {
+                    primaryReplicaMetaReadWriteLock.writeLock().unlock();
+                }
+            });
 
             return null;
         });
@@ -322,6 +376,72 @@ public class PersistentPageMemoryMvPartitionStorage 
extends AbstractPageMemoryMv
         });
     }
 
+    // TODO https://issues.apache.org/jira/browse/IGNITE-15119 nodeId type 
should be changed from String to UUID, after the fix
+    // TODO nodeID will be stored in meta directly and not the blob storage.
+    @Override
+    public @Nullable String primaryReplicaNodeId() {
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableState();
+            primaryReplicaMetaReadWriteLock.readLock().lock();
+
+            try {
+                if (primaryReplicaNodeId == null) {
+                    long primaryReplicaNodeIdFirstPageId = 
meta.primaryReplicaNodeIdFirstPageId();
+
+                    // It's possible to face BlobStorage.NO_PAGE_ID if a lease 
information has not yet been recorded in storage,
+                    // for example, if the lease itself has not yet been 
elected.
+                    if (primaryReplicaNodeIdFirstPageId != 
BlobStorage.NO_PAGE_ID) {
+                        primaryReplicaNodeId = 
ByteUtils.stringFromBytes(blobStorage.readBlob(primaryReplicaNodeIdFirstPageId));
+                    }
+                }
+
+                return primaryReplicaNodeId;
+
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException(
+                        "Failed to read primary replica node id: [tableId={}, 
partitionId={}]",
+                        e,
+                        tableStorage.getTableId(), partitionId
+                );
+            } finally {
+                primaryReplicaMetaReadWriteLock.readLock().unlock();
+            }
+
+        });
+    }
+
+    @Override
+    public @Nullable String primaryReplicaNodeName() {
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableState();
+
+            primaryReplicaMetaReadWriteLock.readLock().lock();
+
+            try {
+                if (primaryReplicaNodeName == null) {
+                    long primaryReplicaNodeNameFirstPageId = 
meta.primaryReplicaNodeNameFirstPageId();
+
+                    // It's possible to face BlobStorage.NO_PAGE_ID if a lease 
information has not yet been recorded in storage,
+                    // for example, if the lease itself has not yet been 
elected.
+                    if (primaryReplicaNodeNameFirstPageId != 
BlobStorage.NO_PAGE_ID) {
+                        primaryReplicaNodeName = 
ByteUtils.stringFromBytes(blobStorage.readBlob(primaryReplicaNodeNameFirstPageId));
+                    }
+                }
+
+                return primaryReplicaNodeName;
+
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException(
+                        "Failed to read primary replica node name: 
[tableId={}, partitionId={}]",
+                        e,
+                        tableStorage.getTableId(), partitionId
+                );
+            } finally {
+                primaryReplicaMetaReadWriteLock.readLock().unlock();
+            }
+        });
+    }
+
     private void committedGroupConfigurationBusy(byte[] groupConfigBytes) {
         updateMeta((lastCheckpointId, meta) -> {
             replicationProtocolGroupConfigReadWriteLock.writeLock().lock();
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index d9330b1b90..7a50600651 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -68,6 +68,12 @@ public class VolatilePageMemoryMvPartitionStorage extends 
AbstractPageMemoryMvPa
     /** Lease start time. */
     private volatile long leaseStartTime;
 
+    /** Primary replica node id. */
+    private volatile String primaryReplicaNodeId;
+
+    /** Primary replica node name. */
+    private volatile String primaryReplicaNodeName;
+
     /** Last group configuration. */
     private volatile byte @Nullable [] groupConfig;
 
@@ -197,7 +203,11 @@ public class VolatilePageMemoryMvPartitionStorage extends 
AbstractPageMemoryMvPa
     }
 
     @Override
-    public void updateLease(long leaseStartTime) {
+    public void updateLease(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    ) {
         busy(() -> {
             throwExceptionIfStorageNotInRunnableState();
 
@@ -206,6 +216,8 @@ public class VolatilePageMemoryMvPartitionStorage extends 
AbstractPageMemoryMvPa
             }
 
             this.leaseStartTime = leaseStartTime;
+            this.primaryReplicaNodeId = primaryReplicaNodeId;
+            this.primaryReplicaNodeName = primaryReplicaNodeName;
 
             return null;
         });
@@ -220,6 +232,24 @@ public class VolatilePageMemoryMvPartitionStorage extends 
AbstractPageMemoryMvPa
         });
     }
 
+    @Override
+    public @Nullable String primaryReplicaNodeId() {
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableState();
+
+            return primaryReplicaNodeId;
+        });
+    }
+
+    @Override
+    public @Nullable String primaryReplicaNodeName() {
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableState();
+
+            return primaryReplicaNodeName;
+        });
+    }
+
     @Override
     public void lastAppliedOnRebalance(long lastAppliedIndex, long 
lastAppliedTerm) {
         throwExceptionIfStorageNotInProgressOfRebalance(state.get(), 
this::createStorageInfo);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaManagerTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaManagerTest.java
index 5a731c0f6f..678dd723e7 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaManagerTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaManagerTest.java
@@ -129,7 +129,7 @@ public class StoragePartitionMetaManagerTest extends 
BaseIgniteAbstractTest {
             try (FilePageStore filePageStore = 
createFilePageStore(testFilePath)) {
                 manager.writeMetaToBuffer(
                         partId,
-                        new StoragePartitionMeta(4, 100, 10, 34, 1000, 900, 
300, 200, 400, 200)
+                        new StoragePartitionMeta(4, 100, 10, 34, 1000, 11, 12, 
900, 300, 200, 400, 200)
                                 .init(null)
                                 .metaSnapshot(null),
                         buffer.rewind()
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaTest.java
index 62195c21a8..01ca7ce261 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaTest.java
@@ -213,7 +213,7 @@ public class StoragePartitionMetaTest {
     }
 
     private static StoragePartitionMeta createMeta() {
-        return new StoragePartitionMeta(0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+        return new StoragePartitionMeta(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
                 .init(null);
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 15e1a33661..773ecc99c4 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -48,12 +48,17 @@ import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptio
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.transitionToTerminalState;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToInt;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
+import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
-import static org.apache.ignite.internal.util.ByteUtils.putLongToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.stringToBytes;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.UUID;
@@ -198,6 +203,12 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /** On-heap-cached lease start time value. */
     private volatile long leaseStartTime;
 
+    /** On-heap-cached lease node id. */
+    private volatile String primaryReplicaNodeId;
+
+    /** On-heap-cached lease node name. */
+    private volatile String primaryReplicaNodeName;
+
     /** On-heap-cached last committed group configuration. */
     private volatile byte @Nullable [] lastGroupConfig;
 
@@ -241,12 +252,29 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
             lastGroupConfig = db.get(meta, readOpts, lastGroupConfigKey);
 
-            byte[] leaseStartTimeBytes = db.get(meta, readOpts, leaseKey);
-            ByteBuffer leaseStartTimeBuf = leaseStartTimeBytes == null
-                    ? null
-                    : 
ByteBuffer.wrap(leaseStartTimeBytes).order(ByteOrder.LITTLE_ENDIAN);
+            byte[] leaseBytes = db.get(meta, readOpts, leaseKey);
 
-            leaseStartTime = leaseStartTimeBuf == null ? 
HybridTimestamp.MIN_VALUE.longValue() : leaseStartTimeBuf.getLong();
+            if (leaseBytes == null) {
+                leaseStartTime = HybridTimestamp.MIN_VALUE.longValue();
+            } else {
+                leaseStartTime = bytesToLong(leaseBytes);
+
+                int primaryReplicaNodeIdLength = bytesToInt(leaseBytes, 
Long.BYTES);
+                primaryReplicaNodeId = new String(
+                        leaseBytes,
+                        Long.BYTES + Integer.BYTES,
+                        primaryReplicaNodeIdLength,
+                        StandardCharsets.UTF_8
+                );
+
+                int primaryReplicaNodeNameLength = bytesToInt(leaseBytes, 
Long.BYTES + Integer.BYTES + primaryReplicaNodeIdLength);
+                primaryReplicaNodeName = new String(
+                        leaseBytes,
+                        Long.BYTES + Integer.BYTES + 
primaryReplicaNodeIdLength + Integer.BYTES,
+                        primaryReplicaNodeNameLength,
+                        StandardCharsets.UTF_8
+                );
+            }
 
             byte[] estimatedSizeBytes = db.get(meta, readOpts, 
estimatedSizeKey);
 
@@ -1070,7 +1098,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     @Override
-    public void updateLease(long leaseStartTime) {
+    public void updateLease(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    ) {
         busy(() -> {
             if (leaseStartTime <= this.leaseStartTime) {
                 return null;
@@ -1079,14 +1111,23 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             AbstractWriteBatch writeBatch = requireWriteBatch();
 
             try {
-                byte[] leaseBytes = new byte[Long.BYTES];
+                ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream();
+                outputStream.write(longToBytes(leaseStartTime));
+
+                byte[] primaryReplicaNodeIdBytes = 
stringToBytes(primaryReplicaNodeId);
+                
outputStream.write(intToBytes(primaryReplicaNodeIdBytes.length));
+                outputStream.write(primaryReplicaNodeIdBytes);
 
-                putLongToBytes(leaseStartTime, leaseBytes, 0);
+                byte[] primaryReplicaNodeNameBytes = 
stringToBytes(primaryReplicaNodeName);
+                
outputStream.write(intToBytes(primaryReplicaNodeNameBytes.length));
+                outputStream.write(primaryReplicaNodeNameBytes);
 
-                writeBatch.put(meta, leaseKey, leaseBytes);
+                writeBatch.put(meta, leaseKey, outputStream.toByteArray());
 
                 this.leaseStartTime = leaseStartTime;
-            } catch (RocksDBException e) {
+                this.primaryReplicaNodeId = primaryReplicaNodeId;
+                this.primaryReplicaNodeName = primaryReplicaNodeName;
+            } catch (RocksDBException | IOException e) {
                 throw new StorageException(e);
             }
 
@@ -1099,6 +1140,16 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         return busy(() -> leaseStartTime);
     }
 
+    @Override
+    public @Nullable String primaryReplicaNodeId() {
+        return busy(() -> primaryReplicaNodeId);
+    }
+
+    @Override
+    public @Nullable String primaryReplicaNodeName() {
+        return busy(() -> primaryReplicaNodeName);
+    }
+
     /**
      * Deletes partition data from the storage, using write batch to perform 
the operation.
      */
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 80f0e91efd..26ba21b514 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
@@ -137,10 +138,14 @@ public class RocksDbMvTableStorageTest extends 
AbstractMvTableStorageTest {
         MvPartitionStorage partitionStorage0 = 
getOrCreateMvPartition(PARTITION_ID);
 
         RowId rowId0 = new RowId(PARTITION_ID);
+        long leaseStartTime = 1234567;
+        String primaryReplicaNodeId = UUID.randomUUID().toString();
+        String primaryReplicaNodeName = primaryReplicaNodeId + "name";
 
         partitionStorage0.runConsistently(locker -> {
             locker.lock(rowId0);
 
+            partitionStorage0.updateLease(leaseStartTime, 
primaryReplicaNodeId, primaryReplicaNodeName);
             return partitionStorage0.addWrite(rowId0, testData, txId, 
COMMIT_TABLE_ID, 0);
         });
 
@@ -156,6 +161,10 @@ public class RocksDbMvTableStorageTest extends 
AbstractMvTableStorageTest {
         assertThat(tableStorage.getMvPartition(PARTITION_ID_1), 
is(nullValue()));
         
assertThat(unwrap(tableStorage.getMvPartition(PARTITION_ID).read(rowId0, 
HybridTimestamp.MAX_VALUE).binaryRow()),
                 is(equalTo(unwrap(testData))));
+
+        assertEquals(leaseStartTime, 
tableStorage.getMvPartition(PARTITION_ID).leaseStartTime());
+        assertEquals(primaryReplicaNodeId, 
tableStorage.getMvPartition(PARTITION_ID).primaryReplicaNodeId());
+        assertEquals(primaryReplicaNodeName, 
tableStorage.getMvPartition(PARTITION_ID).primaryReplicaNodeName());
     }
 
     @Test
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
index cd59061975..ed8f7455db 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
@@ -302,7 +302,8 @@ public class ReplicasSafeTimePropagationTest extends 
IgniteAbstractTest {
                                     mock(CatalogService.class),
                                     mock(SchemaRegistry.class),
                                     clockService,
-                                    mock(IndexMetaStorage.class)
+                                    mock(IndexMetaStorage.class),
+                                    
clusterService.topologyService().localMember().id()
                             ),
                             RaftGroupEventsListener.noopLsnr,
                             RaftGroupOptions.defaults()
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 0be8a177ca..3bef883622 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -486,7 +486,6 @@ public class StorageUpdateHandler {
 
                 ReadResult item = cursor.next();
 
-                // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 
Prevent double storage updates within primary
                 if (item.isWriteIntent()) {
                     // We are aborting only those write intents that belong to 
the provided transaction.
                     // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20347 to check transaction id in 
the storage
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 166995dd32..a1872c6482 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1099,7 +1099,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                             catalogService,
                             table.schemaView(),
                             clockService,
-                            indexMetaStorage
+                            indexMetaStorage,
+                            topologyService.localMember().id()
                     );
 
                     SnapshotStorageFactory snapshotStorageFactory = 
createSnapshotStorageFactory(replicaGrpId,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index d618e8911e..8f2fdc31f0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -232,8 +232,14 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
      * Updates the current lease start time in the storage.
      *
      * @param leaseStartTime Lease start time.
+     * @param primaryReplicaNodeId Primary replica node id.
+     * @param primaryReplicaNodeName Primary replica node name.
      */
-    void updateLease(long leaseStartTime);
+    void updateLease(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    );
 
     /**
      * Return the start time of the known lease for this replication group.
@@ -241,4 +247,18 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
      * @return Lease start time.
      */
     long leaseStartTime();
+
+    /**
+     * Return the node id of the known lease for this replication group.
+     *
+     * @return Primary replica node id.
+     */
+    String primaryReplicaNodeId();
+
+    /**
+     * Return the node name of the known lease for this replication group.
+     *
+     * @return Primary replica node name.
+     */
+    String primaryReplicaNodeName();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 11fd10dc3e..91fa878221 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -32,8 +32,10 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -131,6 +133,10 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
     private final IndexMetaStorage indexMetaStorage;
 
+    private final String localNodeId;
+
+    private Set<String> currentGroupTopology;
+
     /**
      * Timestamp with minimum starting time among all active RW transactions 
in the cluster.
      * This timestamp is used to prevent the catalog from being dropped, which 
may be used when applying raft commands.
@@ -148,7 +154,8 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
             CatalogService catalogService,
             SchemaRegistry schemaRegistry,
             ClockService clockService,
-            IndexMetaStorage indexMetaStorage
+            IndexMetaStorage indexMetaStorage,
+            String localNodeId
     ) {
         this.txManager = txManager;
         this.storage = partitionDataStorage;
@@ -160,6 +167,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
         this.schemaRegistry = schemaRegistry;
         this.clockService = clockService;
         this.indexMetaStorage = indexMetaStorage;
+        this.localNodeId = localNodeId;
     }
 
     @Override
@@ -261,9 +269,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
                 assert safeTimePropagatingCommand.safeTime() != null;
 
-                synchronized (safeTime) {
-                    updateTrackerIgnoringTrackerClosedException(safeTime, 
safeTimePropagatingCommand.safeTime());
-                }
+                updateTrackerIgnoringTrackerClosedException(safeTime, 
safeTimePropagatingCommand.safeTime());
             }
 
             updateTrackerIgnoringTrackerClosedException(storageIndexTracker, 
commandIndex);
@@ -280,7 +286,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
     private UpdateCommandResult handleUpdateCommand(UpdateCommand cmd, long 
commandIndex, long commandTerm) {
         // Skips the write command because the storage has already executed it.
         if (commandIndex <= storage.lastAppliedIndex()) {
-            return new UpdateCommandResult(true);
+            return new UpdateCommandResult(true, isPrimaryInGroupTopology());
         }
 
         if (cmd.leaseStartTime() != null) {
@@ -289,38 +295,42 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
             long storageLeaseStartTime = storage.leaseStartTime();
 
             if (leaseStartTime != storageLeaseStartTime) {
-                return new UpdateCommandResult(false, storageLeaseStartTime);
+                return new UpdateCommandResult(
+                        false,
+                        storageLeaseStartTime,
+                        isPrimaryInGroupTopology()
+                );
             }
         }
 
         UUID txId = cmd.txId();
 
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper 
storage/raft index handling is required.
-        synchronized (safeTime) {
-            if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                storageUpdateHandler.handleUpdate(
-                        txId,
-                        cmd.rowUuid(),
-                        cmd.tablePartitionId().asTablePartitionId(),
-                        cmd.rowToUpdate(),
-                        !cmd.full(),
-                        () -> storage.lastApplied(commandIndex, commandTerm),
-                        cmd.full() ? cmd.safeTime() : null,
-                        cmd.lastCommitTimestamp(),
-                        indexIdsAtRwTxBeginTs(catalogService, txId, 
storage.tableId())
-                );
-
-                updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
-            } else {
-                // We MUST bump information about last updated index+term.
-                // See a comment in #onWrite() for explanation.
-                advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
-            }
+        assert storage.primaryReplicaNodeId() != null;
+        assert localNodeId != null;
+
+        if (cmd.full() || !localNodeId.equals(storage.primaryReplicaNodeId())) 
{
+            storageUpdateHandler.handleUpdate(
+                    txId,
+                    cmd.rowUuid(),
+                    cmd.tablePartitionId().asTablePartitionId(),
+                    cmd.rowToUpdate(),
+                    !cmd.full(),
+                    () -> storage.lastApplied(commandIndex, commandTerm),
+                    cmd.full() ? cmd.safeTime() : null,
+                    cmd.lastCommitTimestamp(),
+                    indexIdsAtRwTxBeginTs(catalogService, txId, 
storage.tableId())
+            );
+        } else {
+            // We MUST bump information about last updated index+term.
+            // See a comment in #onWrite() for explanation.
+            // If we get here, that means that we are collocated with primary 
and data was already inserted there, thus it's only required
+            // to update information about index and term.
+            advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
         }
 
         replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime() 
: null, cmd.full());
 
-        return new UpdateCommandResult(true);
+        return new UpdateCommandResult(true, isPrimaryInGroupTopology());
     }
 
     /**
@@ -333,7 +343,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
     private UpdateCommandResult handleUpdateAllCommand(UpdateAllCommand cmd, 
long commandIndex, long commandTerm) {
         // Skips the write command because the storage has already executed it.
         if (commandIndex <= storage.lastAppliedIndex()) {
-            return new UpdateCommandResult(true);
+            return new UpdateCommandResult(true, isPrimaryInGroupTopology());
         }
 
         if (cmd.leaseStartTime() != null) {
@@ -342,36 +352,37 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
             long storageLeaseStartTime = storage.leaseStartTime();
 
             if (leaseStartTime != storageLeaseStartTime) {
-                return new UpdateCommandResult(false, storageLeaseStartTime);
+                return new UpdateCommandResult(
+                        false,
+                        storageLeaseStartTime,
+                        isPrimaryInGroupTopology()
+                );
             }
         }
 
         UUID txId = cmd.txId();
 
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper 
storage/raft index handling is required.
-        synchronized (safeTime) {
-            if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                storageUpdateHandler.handleUpdateAll(
-                        txId,
-                        cmd.rowsToUpdate(),
-                        cmd.tablePartitionId().asTablePartitionId(),
-                        !cmd.full(),
-                        () -> storage.lastApplied(commandIndex, commandTerm),
-                        cmd.full() ? cmd.safeTime() : null,
-                        indexIdsAtRwTxBeginTs(catalogService, txId, 
storage.tableId())
-                );
-
-                updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
-            } else {
-                // We MUST bump information about last updated index+term.
-                // See a comment in #onWrite() for explanation.
-                advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
-            }
+        if (cmd.full() || !localNodeId.equals(storage.primaryReplicaNodeId())) 
{
+            storageUpdateHandler.handleUpdateAll(
+                    txId,
+                    cmd.rowsToUpdate(),
+                    cmd.tablePartitionId().asTablePartitionId(),
+                    !cmd.full(),
+                    () -> storage.lastApplied(commandIndex, commandTerm),
+                    cmd.full() ? cmd.safeTime() : null,
+                    indexIdsAtRwTxBeginTs(catalogService, txId, 
storage.tableId())
+            );
+        } else {
+            // We MUST bump information about last updated index+term.
+            // See a comment in #onWrite() for explanation.
+            // If we get here, that means that we are collocated with primary 
and data was already inserted there, thus it's only required
+            // to update information about index and term.
+            advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
         }
 
         replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime() 
: null, cmd.full());
 
-        return new UpdateCommandResult(true);
+        return new UpdateCommandResult(true, isPrimaryInGroupTopology());
     }
 
     /**
@@ -488,6 +499,9 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
     @Override
     public void onConfigurationCommitted(CommittedConfiguration config) {
+        currentGroupTopology = new HashSet<>(config.peers());
+        currentGroupTopology.addAll(config.learners());
+
         // Skips the update because the storage has already recorded it.
         if (config.index() <= storage.lastAppliedIndex()) {
             return;
@@ -669,7 +683,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
         }
 
         storage.runConsistently(locker -> {
-            storage.updateLease(cmd.leaseStartTime());
+            storage.updateLease(cmd.leaseStartTime(), 
cmd.primaryReplicaNodeId(), cmd.primaryReplicaNodeName());
 
             storage.lastApplied(commandIndex, commandTerm);
 
@@ -798,4 +812,29 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
         return upgradedBinaryRow == sourceBinaryRow ? source : new 
BinaryRowAndRowId(upgradedBinaryRow, source.rowId());
     }
+
+    /**
+     * Checks whether the primary replica belongs to the raft group topology 
(peers and learners) within a raft linearized context.
+     * On the primary replica election prior to the lease publication, the 
placement driver sends a PrimaryReplicaChangeCommand that
+     * populates the raft listener and the underneath storage with 
lease-related information, such as primaryReplicaNodeId,
+     * primaryReplicaNodeName and leaseStartTime. In Update(All)Command  
handling, which occurs strictly after PrimaryReplicaChangeCommand
+     * processing, given information is used in order to detect whether 
primary belongs to the raft group topology (peers and learners).
+     *
+     *
+     * @return {@code true} if primary replica belongs to the raft group 
topology: peers and learners, (@code false) otherwise.
+     */
+    private boolean isPrimaryInGroupTopology() {
+        assert currentGroupTopology != null : "Current group topology is null";
+        // TODO https://issues.apache.org/jira/browse/IGNITE-23030 Seems that 
we have a bug. Lease related information is not restored on
+        // TODO snapshot load.
+        if (storage.primaryReplicaNodeName() == null) {
+            return true;
+        } else {
+            // Despite the fact that storage.primaryReplicaNodeName() may 
itself return null it's never expected to happen
+            // while calling isPrimaryInGroupTopology because of HB between 
handlePrimaryReplicaChangeCommand that will populate the storage
+            // with lease information and handleUpdate(All)Command that on 
it's turn calls isPrimaryReplicaInGroupTopology.
+            assert storage.primaryReplicaNodeName() != null : "Primary replica 
node name is null.";
+            return 
currentGroupTopology.contains(storage.primaryReplicaNodeName());
+        }
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 8d122e7015..9483e5b5b3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -227,12 +227,26 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
     }
 
     @Override
-    public void updateLease(long leaseStartTime) {
-        partitionStorage.updateLease(leaseStartTime);
+    public void updateLease(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    ) {
+        partitionStorage.updateLease(leaseStartTime, primaryReplicaNodeId, 
primaryReplicaNodeName);
     }
 
     @Override
     public long leaseStartTime() {
         return partitionStorage.leaseStartTime();
     }
+
+    @Override
+    public String primaryReplicaNodeId() {
+        return partitionStorage.primaryReplicaNodeId();
+    }
+
+    @Override
+    public String primaryReplicaNodeName() {
+        return partitionStorage.primaryReplicaNodeName();
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 42abb134d3..69f61e7a99 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -213,7 +213,6 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.util.TrackerClosedException;
 import org.apache.ignite.lang.ErrorGroups.Replicator;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
@@ -2672,22 +2671,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     HybridTimestamp safeTimeForRetry = clockService.now();
 
-                    // Within primary replica it's required to update safe 
time in order to prevent double storage updates in case of !1PC.
-                    // Otherwise, it may be possible that a newer entry will 
be overwritten by an older one that came as part of the raft
-                    // replication flow:
-                    // tx1 = transactions.begin();
-                    // tx1.put(k1, v1) -> primary.apply(k1,v1) + asynchronous 
raft replication (k1,v1)
-                    // tx1.put(k1, v2) -> primary.apply(k1,v2) + asynchronous 
raft replication (k1,v1)
-                    // (k1,v1) replication overrides newer (k1, v2). 
Eventually (k1,v2) replication will restore proper value.
-                    // However it's possible that tx1.get(k1) will see v1 
instead of v2.
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Better solution requied. 
Given one is correct, but fragile.
-                    if ((cmd instanceof UpdateCommand && !((UpdateCommand) 
cmd).full())
-                            || (cmd instanceof UpdateAllCommand && 
!((UpdateAllCommand) cmd).full())) {
-                        synchronized (safeTime) {
-                            
updateTrackerIgnoringTrackerClosedException(safeTime, safeTimeForRetry);
-                        }
-                    }
-
                     SafeTimePropagatingCommand 
clonedSafeTimePropagatingCommand =
                             (SafeTimePropagatingCommand) 
safeTimePropagatingCommand.clone();
                     
clonedSafeTimePropagatingCommand.safeTime(safeTimeForRetry);
@@ -2743,23 +2726,18 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             );
 
             if (!cmd.full()) {
-                // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 
Temporary code below
-                synchronized (safeTime) {
-                    // We don't need to take the partition snapshots read 
lock, see #INTERNAL_DOC_PLACEHOLDER why.
-                    storageUpdateHandler.handleUpdate(
-                            cmd.txId(),
-                            cmd.rowUuid(),
-                            cmd.tablePartitionId().asTablePartitionId(),
-                            cmd.rowToUpdate(),
-                            true,
-                            null,
-                            null,
-                            null,
-                            indexIdsAtRwTxBeginTs(txId)
-                    );
-
-                    updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
-                }
+                // We don't need to take the partition snapshots read lock, 
see #INTERNAL_DOC_PLACEHOLDER why.
+                storageUpdateHandler.handleUpdate(
+                        cmd.txId(),
+                        cmd.rowUuid(),
+                        cmd.tablePartitionId().asTablePartitionId(),
+                        cmd.rowToUpdate(),
+                        true,
+                        null,
+                        null,
+                        null,
+                        indexIdsAtRwTxBeginTs(txId)
+                );
 
                 CompletableFuture<UUID> fut = 
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
                         .thenApply(res -> cmd.txId());
@@ -2770,37 +2748,33 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                 applyCmdWithExceptionHandling(cmd, resultFuture);
 
-                return resultFuture.thenApply(res -> {
+                return resultFuture.thenCompose(res -> {
                     UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res;
 
-                    if (full && updateCommandResult != null && 
!updateCommandResult.isPrimaryReplicaMatch()) {
+                    if (!updateCommandResult.isPrimaryReplicaMatch()) {
                         throw new PrimaryReplicaMissException(txId, 
cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
                     }
 
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
-                    // Try to avoid double write if an entry is already 
replicated.
-                    synchronized (safeTime) {
-                        if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                            if 
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
 {
-                                // We don't need to take the partition 
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
-                                storageUpdateHandler.handleUpdate(
-                                        cmd.txId(),
-                                        cmd.rowUuid(),
-                                        
cmd.tablePartitionId().asTablePartitionId(),
-                                        cmd.rowToUpdate(),
-                                        false,
-                                        null,
-                                        cmd.safeTime(),
-                                        null,
-                                        indexIdsAtRwTxBeginTs(txId)
-                                );
-                            }
-
-                            
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
+                    if (updateCommandResult.isPrimaryInPeersAndLearners()) {
+                        return 
safeTime.waitFor(cmd.safeTime()).thenApply(ignored -> null);
+                    } else {
+                        if 
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
 {
+                            // We don't need to take the partition snapshots 
read lock, see #INTERNAL_DOC_PLACEHOLDER why.
+                            storageUpdateHandler.handleUpdate(
+                                    cmd.txId(),
+                                    cmd.rowUuid(),
+                                    
cmd.tablePartitionId().asTablePartitionId(),
+                                    cmd.rowToUpdate(),
+                                    false,
+                                    null,
+                                    cmd.safeTime(),
+                                    null,
+                                    indexIdsAtRwTxBeginTs(txId)
+                            );
                         }
-                    }
 
-                    return null;
+                        return nullCompletedFuture();
+                    }
                 });
             }
         }
@@ -2876,39 +2850,29 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
             if (!cmd.full()) {
                 if (skipDelayedAck) {
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
-                    synchronized (safeTime) {
-                        // We don't need to take the partition snapshots read 
lock, see #INTERNAL_DOC_PLACEHOLDER why.
-                        storageUpdateHandler.handleUpdateAll(
-                                cmd.txId(),
-                                cmd.rowsToUpdate(),
-                                cmd.tablePartitionId().asTablePartitionId(),
-                                true,
-                                null,
-                                null,
-                                indexIdsAtRwTxBeginTs(txId)
-                        );
-
-                        updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
-                    }
+                    // We don't need to take the partition snapshots read 
lock, see #INTERNAL_DOC_PLACEHOLDER why.
+                    storageUpdateHandler.handleUpdateAll(
+                            cmd.txId(),
+                            cmd.rowsToUpdate(),
+                            cmd.tablePartitionId().asTablePartitionId(),
+                            true,
+                            null,
+                            null,
+                            indexIdsAtRwTxBeginTs(txId)
+                    );
 
                     return applyCmdWithExceptionHandling(cmd, new 
CompletableFuture<>()).thenApply(res -> null);
                 } else {
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
-                    synchronized (safeTime) {
-                        // We don't need to take the partition snapshots read 
lock, see #INTERNAL_DOC_PLACEHOLDER why.
-                        storageUpdateHandler.handleUpdateAll(
-                                cmd.txId(),
-                                cmd.rowsToUpdate(),
-                                cmd.tablePartitionId().asTablePartitionId(),
-                                true,
-                                null,
-                                null,
-                                indexIdsAtRwTxBeginTs(txId)
-                        );
-
-                        updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
-                    }
+                    // We don't need to take the partition snapshots read 
lock, see #INTERNAL_DOC_PLACEHOLDER why.
+                    storageUpdateHandler.handleUpdateAll(
+                            cmd.txId(),
+                            cmd.rowsToUpdate(),
+                            cmd.tablePartitionId().asTablePartitionId(),
+                            true,
+                            null,
+                            null,
+                            indexIdsAtRwTxBeginTs(txId)
+                    );
 
                     CompletableFuture<Object> fut = 
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
                             .thenApply(res -> cmd.txId());
@@ -2917,34 +2881,32 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 }
             } else {
                 return applyCmdWithExceptionHandling(cmd, new 
CompletableFuture<>())
-                        .thenApply(res -> {
+                        .thenCompose(res -> {
                             UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res;
 
-                            if (full && 
!updateCommandResult.isPrimaryReplicaMatch()) {
-                                throw new 
PrimaryReplicaMissException(cmd.txId(), cmd.leaseStartTime(),
-                                        
updateCommandResult.currentLeaseStartTime());
+                            if (!updateCommandResult.isPrimaryReplicaMatch()) {
+                                throw new PrimaryReplicaMissException(
+                                        cmd.txId(),
+                                        cmd.leaseStartTime(),
+                                        
updateCommandResult.currentLeaseStartTime()
+                                );
                             }
+                            if 
(updateCommandResult.isPrimaryInPeersAndLearners()) {
+                                return 
safeTime.waitFor(cmd.safeTime()).thenApply(ignored -> null);
+                            } else {
+                                // We don't need to take the partition 
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
+                                storageUpdateHandler.handleUpdateAll(
+                                        cmd.txId(),
+                                        cmd.rowsToUpdate(),
+                                        
cmd.tablePartitionId().asTablePartitionId(),
+                                        false,
+                                        null,
+                                        cmd.safeTime(),
+                                        indexIdsAtRwTxBeginTs(txId)
+                                );
 
-                            // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
-                            // Try to avoid double write if an entry is 
already replicated.
-                            synchronized (safeTime) {
-                                if 
(cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                                    // We don't need to take the partition 
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
-                                    storageUpdateHandler.handleUpdateAll(
-                                            cmd.txId(),
-                                            cmd.rowsToUpdate(),
-                                            
cmd.tablePartitionId().asTablePartitionId(),
-                                            false,
-                                            null,
-                                            cmd.safeTime(),
-                                            indexIdsAtRwTxBeginTs(txId)
-                                    );
-
-                                    
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
-                                }
+                                return null;
                             }
-
-                            return null;
                         });
             }
         }
@@ -3961,18 +3923,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         ));
     }
 
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code 
below
-    private static <T extends Comparable<T>> void 
updateTrackerIgnoringTrackerClosedException(
-            PendingComparableValuesTracker<T, Void> tracker,
-            T newValue
-    ) {
-        try {
-            tracker.update(newValue, null);
-        } catch (TrackerClosedException ignored) {
-            // No-op.
-        }
-    }
-
     private static BuildIndexCommand 
toBuildIndexCommand(BuildIndexReplicaRequest request, MetaIndexStatusChange 
buildingChangeInfo) {
         return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommand()
                 .indexId(request.indexId())
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 9bf90b13be..249168b7ec 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -49,6 +49,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -221,6 +222,8 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
     private IndexMetaStorage indexMetaStorage;
 
+    private ClusterService clusterService;
+
     /**
      * Initializes a table listener before tests.
      */
@@ -228,9 +231,11 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     public void before() {
         NetworkAddress addr = new NetworkAddress("127.0.0.1", 5003);
 
-        ClusterService clusterService = mock(ClusterService.class, 
RETURNS_DEEP_STUBS);
+        clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
 
         
when(clusterService.topologyService().localMember().address()).thenReturn(addr);
+        
when(clusterService.topologyService().localMember().id()).thenReturn(addr.toString());
+        when(clusterService.nodeName()).thenReturn(addr.toString());
 
         safeTimeTracker = new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0));
 
@@ -285,8 +290,30 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 catalogService,
                 SCHEMA_REGISTRY,
                 clockService,
-                indexMetaStorage
+                indexMetaStorage,
+                clusterService.topologyService().localMember().id()
         );
+
+        // Update(All)Command handling requires both information about raft 
group topology and the primary replica,
+        // thus onConfigurationCommited and primaryReplicaChangeCommand are 
called.
+        {
+            commandListener.onConfigurationCommitted(new 
CommittedConfiguration(
+                    raftIndex.incrementAndGet(),
+                    1,
+                    List.of(clusterService.nodeName()),
+                    Collections.emptyList(),
+                    null,
+                    null
+            ));
+
+            PrimaryReplicaChangeCommand command = 
REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
+                    .primaryReplicaNodeName("primary")
+                    .primaryReplicaNodeId(UUID.randomUUID().toString())
+                    
.leaseStartTime(HybridTimestamp.MIN_VALUE.addPhysicalTime(1).longValue())
+                    .build();
+
+            
commandListener.onWrite(List.of(writeCommandCommandClosure(raftIndex.incrementAndGet(),
 1, command)).iterator());
+        }
     }
 
     /**
@@ -386,11 +413,15 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 writeCommandCommandClosure(6, 1, primaryReplicaChangeCommand, 
commandClosureResultCaptor)
         ).iterator());
 
-        verify(mvPartitionStorage, 
never()).runConsistently(any(WriteClosure.class));
-        verify(mvPartitionStorage, times(1)).lastApplied(anyLong(), anyLong());
+        // Two storage runConsistently runs are expected: one for 
configuration application and another for primaryReplicaChangeCommand
+        // handling. Both comes from initial configuration preparation in 
@BeforeEach
+        verify(mvPartitionStorage, 
times(2)).runConsistently(any(WriteClosure.class));
+        verify(mvPartitionStorage, times(3)).lastApplied(anyLong(), anyLong());
 
-        assertThat(updateCommandClosureResultCaptor.getAllValues(), 
containsInAnyOrder(new UpdateCommandResult(true),
-                new UpdateCommandResult(true)));
+        assertThat(updateCommandClosureResultCaptor.getAllValues(),
+                containsInAnyOrder(new UpdateCommandResult(true, false),
+                        new UpdateCommandResult(true, false))
+        );
         assertThat(commandClosureResultCaptor.getAllValues(), 
containsInAnyOrder(new Throwable[]{null, null, null}));
 
         // Checks for TxStateStorage.
@@ -473,7 +504,8 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 catalogService,
                 SCHEMA_REGISTRY,
                 clockService,
-                indexMetaStorage
+                indexMetaStorage,
+                clusterService.topologyService().localMember().id()
         );
 
         txStateStorage.lastApplied(3L, 1L);
@@ -593,7 +625,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     @Test
     void updatesGroupConfigurationOnConfigCommit() {
         commandListener.onConfigurationCommitted(new CommittedConfiguration(
-                1, 2, List.of("peer"), List.of("learner"), 
List.of("old-peer"), List.of("old-learner")
+                raftIndex.incrementAndGet(), 2, List.of("peer"), 
List.of("learner"), List.of("old-peer"), List.of("old-learner")
         ));
 
         RaftGroupConfiguration expectedConfig = new RaftGroupConfiguration(
@@ -610,10 +642,10 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     @Test
     void updatesLastAppliedIndexAndTermOnConfigCommit() {
         commandListener.onConfigurationCommitted(new CommittedConfiguration(
-                1, 2, List.of("peer"), List.of("learner"), 
List.of("old-peer"), List.of("old-learner")
+                3, 2, List.of("peer"), List.of("learner"), 
List.of("old-peer"), List.of("old-learner")
         ));
 
-        verify(mvPartitionStorage).lastApplied(1, 2);
+        verify(mvPartitionStorage).lastApplied(3, 2);
     }
 
     @Test
@@ -624,20 +656,21 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 1, 2, List.of("peer"), List.of("learner"), 
List.of("old-peer"), List.of("old-learner")
         ));
 
-        verify(mvPartitionStorage, never()).committedGroupConfiguration(any());
-        verify(mvPartitionStorage, never()).lastApplied(eq(1L), anyLong());
+        // Exact one call is expected because it's done in @BeforeEach in 
order to prepare initial configuration.
+        verify(mvPartitionStorage, 
times(1)).committedGroupConfiguration(any());
+        verify(mvPartitionStorage, times(1)).lastApplied(eq(1L), anyLong());
     }
 
     @Test
     void locksOnConfigCommit() {
         commandListener.onConfigurationCommitted(new CommittedConfiguration(
-                1, 2, List.of("peer"), List.of("learner"), 
List.of("old-peer"), List.of("old-learner")
+                raftIndex.incrementAndGet(), 2, List.of("peer"), 
List.of("learner"), List.of("old-peer"), List.of("old-learner")
         ));
 
         InOrder inOrder = inOrder(partitionDataStorage);
 
         
inOrder.verify(partitionDataStorage).acquirePartitionSnapshotsReadLock();
-        inOrder.verify(partitionDataStorage).lastApplied(1, 2);
+        inOrder.verify(partitionDataStorage).lastApplied(raftIndex.get(), 2);
         
inOrder.verify(partitionDataStorage).releasePartitionSnapshotsReadLock();
     }
 
@@ -706,10 +739,10 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .build();
 
         commandListener.onWrite(List.of(
-                writeCommandCommandClosure(3, 2, command)
+                writeCommandCommandClosure(raftIndex.incrementAndGet(), 2, 
command)
         ).iterator());
 
-        verify(mvPartitionStorage).lastApplied(3, 2);
+        verify(mvPartitionStorage).lastApplied(raftIndex.get(), 2);
     }
 
     @ParameterizedTest
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index c737a153c1..c865bd856e 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -319,9 +319,9 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 return v;
             });
 
-            return completedFuture(new UpdateCommandResult(true));
+            return completedFuture(new UpdateCommandResult(true, true));
         } else if (cmd instanceof UpdateAllCommand) {
-            return completedFuture(new UpdateCommandResult(true));
+            return completedFuture(new UpdateCommandResult(true, true));
         } else if (cmd instanceof FinishTxCommand) {
             FinishTxCommand command = (FinishTxCommand) cmd;
 
@@ -1573,7 +1573,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
             assertFalse(replicaCleanupFut.isDone());
 
-            writeFut.complete(new UpdateCommandResult(true));
+            writeFut.complete(new UpdateCommandResult(true, true));
 
             assertThat(replicaCleanupFut, willSucceedFast());
         } finally {
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index ac02e96bd6..e2c3d32d70 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.network.StaticNodeFinder;
 import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
@@ -111,6 +112,8 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import 
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.ColumnsExtractor;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -179,6 +182,8 @@ import org.junit.jupiter.api.TestInfo;
 public class ItTxTestCluster {
     private static final int SCHEMA_VERSION = 1;
 
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
     private final List<NetworkAddress> localAddresses;
 
     private final NodeFinder nodeFinder;
@@ -713,7 +718,9 @@ public class ItTxTestCluster {
                         catalogService,
                         schemaManager,
                         clockServices.get(assignment),
-                        mock(IndexMetaStorage.class)
+                        mock(IndexMetaStorage.class),
+                        // TODO use proper index.
+                        
clusterServices.get(assignment).topologyService().getByConsistentId(assignment).id()
                 );
 
                 Function<RaftGroupService, ReplicaListener> 
createReplicaListener = raftClient -> newReplicaListener(
@@ -758,6 +765,35 @@ public class ItTxTestCluster {
 
         allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();
 
+        for (int p = 0; p < assignments.size(); p++) {
+            TablePartitionId grpId = grpIds.get(p);
+            CompletableFuture<ReplicaMeta> primaryFuture = 
placementDriver.getPrimaryReplica(grpId,
+                    clockServices.values().iterator().next().now());
+
+            // TestPlacementDriver always returns completed futures.
+            assert primaryFuture.isDone();
+
+            ReplicaMeta primary = primaryFuture.join();
+
+            assert primary.getLeaseholderId() != null;
+
+            PrimaryReplicaChangeCommand cmd = 
REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
+                    .leaseStartTime(primary.getStartTime().longValue())
+                    .primaryReplicaNodeId(primary.getLeaseholderId())
+                    .primaryReplicaNodeName(primary.getLeaseholder())
+                    .build();
+
+            CompletableFuture<RaftGroupService> raftClientFuture = 
getRaftClientForGroup(grpId);
+
+            assertThat(raftClientFuture, willCompleteSuccessfully());
+
+            CompletableFuture<?> primaryReplicaChangePropagationFuture = 
raftClientFuture.join().run(cmd);
+
+            partitionReadyFutures.add(primaryReplicaChangePropagationFuture);
+        }
+
+        allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();
+
         return table;
     }
 
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 42cfb687b6..449e129ab7 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -159,12 +159,26 @@ public class TestPartitionDataStorage implements 
PartitionDataStorage {
     }
 
     @Override
-    public void updateLease(long leaseStartTime) {
-        partitionStorage.updateLease(leaseStartTime);
+    public void updateLease(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    ) {
+        partitionStorage.updateLease(leaseStartTime, primaryReplicaNodeId, 
primaryReplicaNodeName);
     }
 
     @Override
     public long leaseStartTime() {
         return partitionStorage.leaseStartTime();
     }
+
+    @Override
+    public String primaryReplicaNodeId() {
+        return partitionStorage.primaryReplicaNodeId();
+    }
+
+    @Override
+    public String primaryReplicaNodeName() {
+        return partitionStorage.primaryReplicaNodeName();
+    }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 5f747189e7..ef8c5822f6 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -60,11 +60,13 @@ import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.SingleClusterNodeResolver;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.CommittedConfiguration;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaResult;
@@ -72,6 +74,8 @@ import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import 
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -142,6 +146,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
     private static final ReplicationGroupId crossTableGroupId = new 
TablePartitionId(333, 0);
 
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
     private PartitionListener partitionListener;
 
     private ReplicaListener replicaListener;
@@ -285,7 +291,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                     .when(replicaSvc).invoke(anyString(), any());
         }
 
-        AtomicLong raftIndex = new AtomicLong();
+        AtomicLong raftIndex = new AtomicLong(1);
 
         // Delegate directly to listener.
         lenient().doAnswer(
@@ -418,8 +424,36 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 catalogService,
                 schemaManager,
                 CLOCK_SERVICE,
-                mock(IndexMetaStorage.class)
+                mock(IndexMetaStorage.class),
+                LOCAL_NODE.id()
         );
+
+        // Update(All)Command handling requires both information about raft 
group topology and the primary replica,
+        // thus onConfigurationCommited and primaryReplicaChangeCommand are 
called.
+        {
+            partitionListener.onConfigurationCommitted(new 
CommittedConfiguration(
+                    1,
+                    1,
+                    List.of(LOCAL_NODE.name()),
+                    Collections.emptyList(),
+                    null,
+                    null
+            ));
+
+            CompletableFuture<ReplicaMeta> primaryMetaFuture = 
placementDriver.getPrimaryReplica(groupId, CLOCK.now());
+
+            assertThat(primaryMetaFuture, willCompleteSuccessfully());
+
+            ReplicaMeta primary = primaryMetaFuture.join();
+
+            PrimaryReplicaChangeCommand primaryReplicaChangeCommand = 
REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
+                    .leaseStartTime(primary.getStartTime().longValue())
+                    .primaryReplicaNodeId(primary.getLeaseholderId())
+                    .primaryReplicaNodeName(primary.getLeaseholder())
+                    .build();
+
+            assertThat(svc.run(primaryReplicaChangeCommand), 
willCompleteSuccessfully());
+        }
     }
 
     /**
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java
index 5ddf95a66d..01d810969b 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.tx;
 
 import java.io.Serializable;
+import java.util.Objects;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -32,32 +33,43 @@ public class UpdateCommandResult implements Serializable {
     @Nullable
     private final Long currentLeaseStartTime;
 
+    /** {@code true} if primary replica belongs to the raft group topology: 
peers and learners, (@code false) otherwise. */
+    private final boolean primaryInPeersAndLearners;
+
     /**
      * Constructor.
      *
-     * @param primaryReplicaMatch Whether the command should be successfully 
applied on primary replica.
+     * @param primaryReplicaMatch Whether the command was executed 
successfully or failed due to mismatch of primary replica information.
      */
-    public UpdateCommandResult(boolean primaryReplicaMatch) {
-        this(primaryReplicaMatch, null);
+    public UpdateCommandResult(boolean primaryReplicaMatch, boolean 
primaryInPeersAndLearners) {
+        this(primaryReplicaMatch, null, primaryInPeersAndLearners);
     }
 
     /**
      * Constructor.
      *
-     * @param primaryReplicaMatch Whether the command should be successfully 
applied on primary replica.
+     * @param primaryReplicaMatch Whether the command was executed 
successfully or failed due to mismatch of primary replica information.
      * @param currentLeaseStartTime Actual lease start time.
+     * @param primaryInPeersAndLearners {@code true} if primary replica 
belongs to the raft group topology: peers and learners,
+     *     (@code false) otherwise.
      */
-    public UpdateCommandResult(boolean primaryReplicaMatch, @Nullable Long 
currentLeaseStartTime) {
+    public UpdateCommandResult(
+            boolean primaryReplicaMatch,
+            @Nullable Long currentLeaseStartTime,
+            boolean primaryInPeersAndLearners
+    ) {
         assert primaryReplicaMatch || currentLeaseStartTime != null : 
"Incorrect UpdateCommandResult.";
 
         this.primaryReplicaMatch = primaryReplicaMatch;
         this.currentLeaseStartTime = currentLeaseStartTime;
+        this.primaryInPeersAndLearners = primaryInPeersAndLearners;
     }
 
     /**
-     * Whether the command should be successfully applied on primary replica.
+     * Whether the command was executed successfully or failed due to mismatch 
of primary replica information, i.e. lease start time that
+     * was sent along with the command doesn't match the one in raft updated 
by handlePrimaryReplicaChangeCommand.
      *
-     * @return Whether the command should be successfully applied on primary 
replica.
+     * @return Whether the command was executed successfully or failed due to 
mismatch of primary replica information.
      */
     public boolean isPrimaryReplicaMatch() {
         return primaryReplicaMatch;
@@ -73,6 +85,15 @@ public class UpdateCommandResult implements Serializable {
         return currentLeaseStartTime;
     }
 
+    /**
+     * Returns whether primary replica belongs to the raft group topology: 
peers and learners.
+     *
+     * @return {@code true} if primary replica belongs to the raft group 
topology: peers and learners, (@code false) otherwise.
+     */
+    public boolean isPrimaryInPeersAndLearners() {
+        return primaryInPeersAndLearners;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -81,20 +102,13 @@ public class UpdateCommandResult implements Serializable {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-
         UpdateCommandResult that = (UpdateCommandResult) o;
-
-        if (primaryReplicaMatch != that.primaryReplicaMatch) {
-            return false;
-        }
-        return currentLeaseStartTime != null ? 
currentLeaseStartTime.equals(that.currentLeaseStartTime)
-                : that.currentLeaseStartTime == null;
+        return primaryReplicaMatch == that.primaryReplicaMatch && 
primaryInPeersAndLearners == that.primaryInPeersAndLearners
+                && Objects.equals(currentLeaseStartTime, 
that.currentLeaseStartTime);
     }
 
     @Override
     public int hashCode() {
-        int result = (primaryReplicaMatch ? 1 : 0);
-        result = 31 * result + (currentLeaseStartTime != null ? 
currentLeaseStartTime.hashCode() : 0);
-        return result;
+        return Objects.hash(primaryReplicaMatch, currentLeaseStartTime, 
primaryInPeersAndLearners);
     }
 }

Reply via email to