This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d6d402d84e IGNITE-23379 Pass lease information with Raft snapshot of 
partition (#4725)
d6d402d84e is described below

commit d6d402d84e4c3f0be35e5415e5ee6d0706acefe4
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Nov 15 17:46:50 2024 +0400

    IGNITE-23379 Pass lease information with Raft snapshot of partition (#4725)
---
 .../network/raft/PartitionSnapshotMeta.java        | 10 +++
 .../internal/storage/engine/MvPartitionMeta.java}  | 36 ++++++----
 .../internal/storage/engine/MvTableStorage.java    | 12 +---
 .../storage/engine/PrimitivePartitionMeta.java     | 69 +++++++++++++++++++
 .../engine/ThreadAssertingMvTableStorage.java      |  9 +--
 .../storage/AbstractMvTableStorageTest.java        | 24 +++++--
 .../storage/impl/TestMvPartitionStorage.java       |  9 +--
 .../internal/storage/impl/TestMvTableStorage.java  | 20 ++++--
 .../pagememory/AbstractPageMemoryTableStorage.java | 21 +++---
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 13 ++++
 .../mv/PersistentPageMemoryMvPartitionStorage.java | 62 ++++++++++-------
 .../mv/VolatilePageMemoryMvPartitionStorage.java   | 17 ++++-
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 74 +++++++++++++-------
 .../storage/rocksdb/RocksDbTableStorage.java       | 10 +--
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     | 35 +++++++++-
 .../distributed/raft/snapshot/PartitionAccess.java | 29 ++++++--
 .../raft/snapshot/PartitionAccessImpl.java         | 69 ++++++++++++-------
 .../snapshot/PartitionSnapshotStorageFactory.java  | 56 ++++++---------
 .../raft/snapshot/RaftSnapshotPartitionMeta.java   | 79 ++++++++++++++++++++++
 .../snapshot/incoming/IncomingSnapshotCopier.java  |  6 +-
 .../raft/snapshot/outgoing/OutgoingSnapshot.java   | 14 ++--
 .../raft/snapshot/outgoing/SnapshotMetaUtils.java  | 15 +++-
 .../distributed/storage/NullMvTableStorage.java    |  8 +--
 .../table/distributed/TableManagerTest.java        |  2 +
 .../incoming/IncomingSnapshotCopierTest.java       | 31 ++++++++-
 .../outgoing/OutgoingSnapshotCommonTest.java       |  9 ++-
 .../snapshot/outgoing/SnapshotMetaUtilsTest.java   | 25 ++++++-
 27 files changed, 564 insertions(+), 200 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
index 8d0258b767..ccfad0b819 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.partition.replicator.network.raft;
 
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
@@ -32,4 +33,13 @@ public interface PartitionSnapshotMeta extends SnapshotMeta {
 
     /** Row ID for which the index needs to be built per building index ID at 
the time the snapshot meta was created. */
     @Nullable Map<Integer, UUID> nextRowIdToBuildByIndexId();
+
+    /** Lease start time represented as {@link HybridTimestamp#longValue()}. */
+    long leaseStartTime();
+
+    /** ID of primary replica node ({@code null} if there is no primary). */
+    @Nullable UUID primaryReplicaNodeId();
+
+    /** Name of primary replica node ({@code null} if there is no primary). */
+    @Nullable String primaryReplicaNodeName();
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvPartitionMeta.java
similarity index 51%
copy from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
copy to 
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvPartitionMeta.java
index 8d0258b767..006f1b2811 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvPartitionMeta.java
@@ -15,21 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.partition.replicator.network.raft;
+package org.apache.ignite.internal.storage.engine;
 
-import java.util.Map;
 import java.util.UUID;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
-import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.jetbrains.annotations.Nullable;
 
-/** Partition Raft snapshot meta. */
-@Transferable(PartitionReplicationMessageGroup.PARTITION_SNAPSHOT_META)
-public interface PartitionSnapshotMeta extends SnapshotMeta {
-    /** Minimum catalog version that is required for the snapshot to be 
accepted by a follower. */
-    int requiredCatalogVersion();
+/**
+ * Partition metadata for {@link MvTableStorage#finishRebalancePartition(int, 
MvPartitionMeta)}.
+ */
+public class MvPartitionMeta extends PrimitivePartitionMeta {
+    private final byte[] groupConfig;
+
+    /** Constructor. */
+    public MvPartitionMeta(
+            long lastAppliedIndex,
+            long lastAppliedTerm,
+            byte[] groupConfig,
+            long leaseStartTime,
+            @Nullable UUID primaryReplicaNodeId,
+            @Nullable String primaryReplicaNodeName
+    ) {
+        super(lastAppliedIndex, lastAppliedTerm, leaseStartTime, 
primaryReplicaNodeId, primaryReplicaNodeName);
+
+        this.groupConfig = groupConfig;
+    }
 
-    /** Row ID for which the index needs to be built per building index ID at 
the time the snapshot meta was created. */
-    @Nullable Map<Integer, UUID> nextRowIdToBuildByIndexId();
+    /** Returns replication group config as bytes. */
+    public byte[] groupConfig() {
+        return groupConfig;
+    }
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index bcb4f58b3a..5f149b2c6e 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -206,19 +206,13 @@ public interface MvTableStorage extends ManuallyCloseable 
{
      *
      * <p>If rebalance has not started, then {@link StorageRebalanceException} 
will be thrown.
      *
-     * @param lastAppliedIndex Last applied index.
-     * @param lastAppliedTerm Last applied term.
-     * @param groupConfig Replication protocol group configuration (byte 
representation).
+     * @param partitionId ID of the partition.
+     * @param partitionMeta Metadata of the partition.
      * @return Future of the finish rebalance for a multi-version partition 
storage and its indexes.
      * @throws IllegalArgumentException If Partition ID is out of bounds.
      * @throws StorageRebalanceException If there is an error when completing 
rebalance.
      */
-    CompletableFuture<Void> finishRebalancePartition(
-            int partitionId,
-            long lastAppliedIndex,
-            long lastAppliedTerm,
-            byte[] groupConfig
-    );
+    CompletableFuture<Void> finishRebalancePartition(int partitionId, 
MvPartitionMeta partitionMeta);
 
     /**
      * Clears a partition and all associated indices. After the cleaning is 
completed, a partition and all associated indices will be fully
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/PrimitivePartitionMeta.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/PrimitivePartitionMeta.java
new file mode 100644
index 0000000000..c392dc7851
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/PrimitivePartitionMeta.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Partition meta containing values of 'primitive' types (which are the same 
for all representations of partition metadata).
+ */
+public class PrimitivePartitionMeta {
+    private final long lastAppliedIndex;
+    private final long lastAppliedTerm;
+    private final long leaseStartTime;
+    private final @Nullable UUID primaryReplicaNodeId;
+    private final @Nullable String primaryReplicaNodeName;
+
+    /** Constructor. */
+    public PrimitivePartitionMeta(
+            long lastAppliedIndex,
+            long lastAppliedTerm,
+            long leaseStartTime,
+            @Nullable UUID primaryReplicaNodeId,
+            @Nullable String primaryReplicaNodeName
+    ) {
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+        this.leaseStartTime = leaseStartTime;
+        this.primaryReplicaNodeId = primaryReplicaNodeId;
+        this.primaryReplicaNodeName = primaryReplicaNodeName;
+    }
+
+    public long lastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long lastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long leaseStartTime() {
+        return leaseStartTime;
+    }
+
+    /** Returns primary replica node ID (or {@code null} if no primary replica 
node is known). */
+    public @Nullable UUID primaryReplicaNodeId() {
+        return primaryReplicaNodeId;
+    }
+
+    /** Returns primary replica node name (or {@code null} if no primary 
replica node is known). */
+    public @Nullable String primaryReplicaNodeName() {
+        return primaryReplicaNodeName;
+    }
+}
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
index cc80c45bd1..d48875ea88 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
@@ -123,15 +123,10 @@ public class ThreadAssertingMvTableStorage implements 
MvTableStorage {
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(
-            int partitionId,
-            long lastAppliedIndex,
-            long lastAppliedTerm,
-            byte[] groupConfig
-    ) {
+    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
MvPartitionMeta partitionMeta) {
         assertThreadAllowsToWrite();
 
-        return tableStorage.finishRebalancePartition(partitionId, 
lastAppliedIndex, lastAppliedTerm, groupConfig);
+        return tableStorage.finishRebalancePartition(partitionId, 
partitionMeta);
     }
 
     @Override
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 9ee0693184..1c6433da82 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.IndexRow;
@@ -659,7 +660,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         // Error because rebalance has not yet started for the partition.
         assertThrows(
                 StorageRebalanceException.class,
-                () -> tableStorage.finishRebalancePartition(PARTITION_ID, 100, 
500, BYTE_EMPTY_ARRAY)
+                () -> tableStorage.finishRebalancePartition(PARTITION_ID, 
saneMvPartitionMeta())
         );
 
         List<TestRow> rowsBeforeRebalanceStart = List.of(
@@ -691,18 +692,21 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         // Partition is out of configuration range.
         assertThrows(
                 IllegalArgumentException.class,
-                () -> 
tableStorage.finishRebalancePartition(getPartitionIdOutOfRange(), 100, 500, 
BYTE_EMPTY_ARRAY)
+                () -> 
tableStorage.finishRebalancePartition(getPartitionIdOutOfRange(), 
saneMvPartitionMeta())
         );
 
         // Partition does not exist.
         assertThrows(
                 StorageRebalanceException.class,
-                () -> tableStorage.finishRebalancePartition(1, 100, 500, 
BYTE_EMPTY_ARRAY)
+                () -> tableStorage.finishRebalancePartition(1, 
saneMvPartitionMeta())
         );
 
         byte[] raftGroupConfig = createRandomRaftGroupConfiguration();
 
-        assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 10, 20, 
raftGroupConfig), willCompleteSuccessfully());
+        assertThat(
+                tableStorage.finishRebalancePartition(PARTITION_ID, 
saneMvPartitionMeta(10, 20, raftGroupConfig)),
+                willCompleteSuccessfully()
+        );
 
         completeBuiltIndexes(PARTITION_ID, hashIndexStorage, 
sortedIndexStorage);
 
@@ -714,6 +718,14 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         checkRaftGroupConfigs(raftGroupConfig, 
mvPartitionStorage.committedGroupConfiguration());
     }
 
+    private static MvPartitionMeta saneMvPartitionMeta() {
+        return saneMvPartitionMeta(100, 500, BYTE_EMPTY_ARRAY);
+    }
+
+    private static MvPartitionMeta saneMvPartitionMeta(long lastAppliedIndex, 
long lastAppliedTerm, byte[] groupConfig) {
+        return new MvPartitionMeta(lastAppliedIndex, lastAppliedTerm, 
groupConfig, 333, new UUID(1, 2), "primary");
+    }
+
     @Test
     public void testFailRebalance() throws Exception {
         MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(PARTITION_ID);
@@ -956,7 +968,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
     void testNextRowIdToBuildAfterRebalance() throws Exception {
         testNextRowIdToBuildAfterOperation(() -> {
             assertThat(tableStorage.startRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully());
-            assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 
100, 100, BYTE_EMPTY_ARRAY), willCompleteSuccessfully());
+            assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 
saneMvPartitionMeta()), willCompleteSuccessfully());
         });
     }
 
@@ -1425,7 +1437,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
 
         fillStorages(mvPartitionStorage, null, null, rowsAfterRebalance);
 
-        assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 42, 42, 
BYTE_EMPTY_ARRAY), willCompleteSuccessfully());
+        assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 
saneMvPartitionMeta()), willCompleteSuccessfully());
 
         assertThat(mvPartitionStorage.estimatedSize(), is(3L));
     }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index c4ab33988e..dd2194b4f8 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.storage.StorageDestroyedException;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
 import org.apache.ignite.internal.storage.gc.GcEntry;
 import org.apache.ignite.internal.storage.util.LocalLocker;
 import org.apache.ignite.internal.storage.util.LockByRowId;
@@ -769,16 +770,16 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
         clear0();
     }
 
-    void finishRebalance(long lastAppliedIndex, long lastAppliedTerm, byte[] 
groupConfig) {
+    void finishRebalance(MvPartitionMeta partitionMeta) {
         checkStorageClosedForRebalance();
 
         assert rebalance;
 
         rebalance = false;
 
-        this.lastAppliedIndex = lastAppliedIndex;
-        this.lastAppliedTerm = lastAppliedTerm;
-        this.groupConfig = Arrays.copyOf(groupConfig, groupConfig.length);
+        this.lastAppliedIndex = partitionMeta.lastAppliedIndex();
+        this.lastAppliedTerm = partitionMeta.lastAppliedTerm();
+        this.groupConfig = Arrays.copyOf(partitionMeta.groupConfig(), 
partitionMeta.groupConfig().length);
     }
 
     private class ScanVersionsCursor implements Cursor<ReadResult> {
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index 2fbcbeb80c..5550dfa2b8 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -243,14 +244,19 @@ public class TestMvTableStorage implements MvTableStorage 
{
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(
-            int partitionId,
-            long lastAppliedIndex,
-            long lastAppliedTerm,
-            byte[] groupConfig
-    ) {
+    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
MvPartitionMeta partitionMeta) {
         return mvPartitionStorages.finishRebalance(partitionId, 
mvPartitionStorage -> {
-            mvPartitionStorage.finishRebalance(lastAppliedIndex, 
lastAppliedTerm, groupConfig);
+            mvPartitionStorage.finishRebalance(partitionMeta);
+
+            if (partitionMeta.primaryReplicaNodeId() != null) {
+                assert partitionMeta.primaryReplicaNodeId() != null;
+
+                mvPartitionStorage.updateLease(
+                        partitionMeta.leaseStartTime(),
+                        partitionMeta.primaryReplicaNodeId(),
+                        partitionMeta.primaryReplicaNodeName()
+                );
+            }
 
             
testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::finishRebalance);
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 53fef1f07b..543e8eb15d 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.pagememory.reuse.ReuseList;
 import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -260,17 +261,21 @@ public abstract class AbstractPageMemoryTableStorage 
implements MvTableStorage {
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(
-            int partitionId,
-            long lastAppliedIndex,
-            long lastAppliedTerm,
-            byte[] groupConfig
-    ) {
+    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
MvPartitionMeta partitionMeta) {
         return inBusyLock(busyLock, () -> 
mvPartitionStorages.finishRebalance(partitionId, mvPartitionStorage -> {
             mvPartitionStorage.runConsistently(locker -> {
-                mvPartitionStorage.lastAppliedOnRebalance(lastAppliedIndex, 
lastAppliedTerm);
+                
mvPartitionStorage.lastAppliedOnRebalance(partitionMeta.lastAppliedIndex(), 
partitionMeta.lastAppliedTerm());
+                
mvPartitionStorage.committedGroupConfigurationOnRebalance(partitionMeta.groupConfig());
+
+                if (partitionMeta.primaryReplicaNodeId() != null) {
+                    assert partitionMeta.primaryReplicaNodeName() != null;
 
-                
mvPartitionStorage.committedGroupConfigurationOnRebalance(groupConfig);
+                    mvPartitionStorage.updateLeaseOnRebalance(
+                            partitionMeta.leaseStartTime(),
+                            partitionMeta.primaryReplicaNodeId(),
+                            partitionMeta.primaryReplicaNodeName()
+                    );
+                }
 
                 return null;
             });
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 07f4eff195..192c9dc702 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -746,6 +746,19 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
      */
     public abstract void committedGroupConfigurationOnRebalance(byte[] config);
 
+    /**
+     * Updates the current lease start time in the storage on rebalance.
+     *
+     * @param leaseStartTime Lease start time.
+     * @param primaryReplicaNodeId Primary replica node id.
+     * @param primaryReplicaNodeName Primary replica node name.
+     */
+    public abstract void updateLeaseOnRebalance(
+            long leaseStartTime,
+            UUID primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    );
+
     /**
      * Prepares the storage and its indexes for cleanup.
      *
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index d117831b49..9cd8105ab3 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -321,38 +321,43 @@ public class PersistentPageMemoryMvPartitionStorage 
extends AbstractPageMemoryMv
         busy(() -> {
             throwExceptionIfStorageNotInRunnableState();
 
-            updateMeta((lastCheckpointId, meta) -> {
-                primaryReplicaMetaReadWriteLock.writeLock().lock();
-                try {
-                    if (leaseStartTime <= meta.leaseStartTime()) {
-                        return;
-                    }
+            updateLeaseBusy(leaseStartTime, primaryReplicaNodeId, 
primaryReplicaNodeName);
 
-                    meta.primaryReplicaNodeId(lastCheckpointId, 
primaryReplicaNodeId);
+            return null;
+        });
+    }
 
-                    if (meta.primaryReplicaNodeNameFirstPageId() == 
BlobStorage.NO_PAGE_ID) {
-                        long primaryReplicaNodeNameFirstPageId = 
blobStorage.addBlob(stringToBytes(primaryReplicaNodeName));
+    private void updateLeaseBusy(long leaseStartTime, UUID 
primaryReplicaNodeId, String primaryReplicaNodeName) {
+        updateMeta((lastCheckpointId, meta) -> {
+            primaryReplicaMetaReadWriteLock.writeLock().lock();
 
-                        
meta.primaryReplicaNodeNameFirstPageId(lastCheckpointId, 
primaryReplicaNodeNameFirstPageId);
-                    } else {
-                        
blobStorage.updateBlob(meta.primaryReplicaNodeNameFirstPageId(), 
stringToBytes(primaryReplicaNodeName));
-                    }
+            try {
+                if (leaseStartTime <= meta.leaseStartTime()) {
+                    return;
+                }
 
-                    meta.updateLease(lastCheckpointId, leaseStartTime);
+                meta.primaryReplicaNodeId(lastCheckpointId, 
primaryReplicaNodeId);
 
-                    this.primaryReplicaNodeName = primaryReplicaNodeName;
-                } catch (IgniteInternalCheckedException e) {
-                    throw new StorageException(
-                            "Cannot save lease meta: [tableId={}, 
partitionId={}]",
-                            e,
-                            tableStorage.getTableId(), partitionId
-                    );
-                } finally {
-                    primaryReplicaMetaReadWriteLock.writeLock().unlock();
+                if (meta.primaryReplicaNodeNameFirstPageId() == 
BlobStorage.NO_PAGE_ID) {
+                    long primaryReplicaNodeNameFirstPageId = 
blobStorage.addBlob(stringToBytes(primaryReplicaNodeName));
+
+                    meta.primaryReplicaNodeNameFirstPageId(lastCheckpointId, 
primaryReplicaNodeNameFirstPageId);
+                } else {
+                    
blobStorage.updateBlob(meta.primaryReplicaNodeNameFirstPageId(), 
stringToBytes(primaryReplicaNodeName));
                 }
-            });
 
-            return null;
+                meta.updateLease(lastCheckpointId, leaseStartTime);
+
+                this.primaryReplicaNodeName = primaryReplicaNodeName;
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException(
+                        "Cannot save lease meta: [tableId={}, partitionId={}]",
+                        e,
+                        tableStorage.getTableId(), partitionId
+                );
+            } finally {
+                primaryReplicaMetaReadWriteLock.writeLock().unlock();
+            }
         });
     }
 
@@ -535,6 +540,13 @@ public class PersistentPageMemoryMvPartitionStorage 
extends AbstractPageMemoryMv
         committedGroupConfigurationBusy(config);
     }
 
+    @Override
+    public void updateLeaseOnRebalance(long leaseStartTime, UUID 
primaryReplicaNodeId, String primaryReplicaNodeName) {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+        updateLeaseBusy(leaseStartTime, primaryReplicaNodeId, 
primaryReplicaNodeName);
+    }
+
     private void saveFreeListMetadataBusy(RenewablePartitionStorageState 
localState) {
         try {
             localState.freeList().saveMetadata();
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index 6db571befb..a65ef9f2f8 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -216,14 +216,18 @@ public class VolatilePageMemoryMvPartitionStorage extends 
AbstractPageMemoryMvPa
                 return null;
             }
 
-            this.leaseStartTime = leaseStartTime;
-            this.primaryReplicaNodeId = primaryReplicaNodeId;
-            this.primaryReplicaNodeName = primaryReplicaNodeName;
+            updateLeaseBusy(leaseStartTime, primaryReplicaNodeId, 
primaryReplicaNodeName);
 
             return null;
         });
     }
 
+    private void updateLeaseBusy(long leaseStartTime, UUID 
primaryReplicaNodeId, String primaryReplicaNodeName) {
+        this.leaseStartTime = leaseStartTime;
+        this.primaryReplicaNodeId = primaryReplicaNodeId;
+        this.primaryReplicaNodeName = primaryReplicaNodeName;
+    }
+
     @Override
     public long leaseStartTime() {
         return busy(() -> {
@@ -391,6 +395,13 @@ public class VolatilePageMemoryMvPartitionStorage extends 
AbstractPageMemoryMvPa
         this.groupConfig = config;
     }
 
+    @Override
+    public void updateLeaseOnRebalance(long leaseStartTime, UUID 
primaryReplicaNodeId, String primaryReplicaNodeName) {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+        updateLeaseBusy(leaseStartTime, primaryReplicaNodeId, 
primaryReplicaNodeName);
+    }
+
     @Override
     public long estimatedSize() {
         return estimatedSize;
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 4a56f119a5..60229f23ff 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
 import org.apache.ignite.internal.storage.gc.GcEntry;
 import org.apache.ignite.internal.storage.rocksdb.GarbageCollector.AddResult;
 import org.apache.ignite.internal.storage.util.LocalLocker;
@@ -1117,32 +1118,35 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 return null;
             }
 
-            AbstractWriteBatch writeBatch = requireWriteBatch();
+            saveLease(requireWriteBatch(), leaseStartTime, 
primaryReplicaNodeId, primaryReplicaNodeName);
 
-            try {
-                ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream();
-                outputStream.write(longToBytes(leaseStartTime));
+            return null;
+        });
+    }
 
-                byte[] primaryReplicaNodeIdBytes = 
uuidToBytes(primaryReplicaNodeId);
-                outputStream.write(primaryReplicaNodeIdBytes);
+    private void saveLease(AbstractWriteBatch writeBatch, long leaseStartTime, 
UUID primaryReplicaNodeId, String primaryReplicaNodeName) {
+        // TODO: IGNITE-23683 - switch to VersionedSerialization.
+        try {
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+            outputStream.write(longToBytes(leaseStartTime));
 
-                byte[] primaryReplicaNodeNameBytes = 
stringToBytes(primaryReplicaNodeName);
-                
outputStream.write(intToBytes(primaryReplicaNodeNameBytes.length));
-                outputStream.write(primaryReplicaNodeNameBytes);
+            byte[] primaryReplicaNodeIdBytes = 
uuidToBytes(primaryReplicaNodeId);
+            outputStream.write(primaryReplicaNodeIdBytes);
 
-                writeBatch.put(meta, leaseKey, outputStream.toByteArray());
+            byte[] primaryReplicaNodeNameBytes = 
stringToBytes(primaryReplicaNodeName);
+            outputStream.write(intToBytes(primaryReplicaNodeNameBytes.length));
+            outputStream.write(primaryReplicaNodeNameBytes);
 
-                this.leaseStartTime = leaseStartTime;
-                this.primaryReplicaNodeId = primaryReplicaNodeId;
-                this.primaryReplicaNodeName = primaryReplicaNodeName;
-            } catch (IOException e) {
-                throw new StorageException(e);
-            } catch (RocksDBException e) {
-                throw new IgniteRocksDbException(e);
-            }
+            writeBatch.put(meta, leaseKey, outputStream.toByteArray());
 
-            return null;
-        });
+            this.leaseStartTime = leaseStartTime;
+            this.primaryReplicaNodeId = primaryReplicaNodeId;
+            this.primaryReplicaNodeName = primaryReplicaNodeName;
+        } catch (IOException e) {
+            throw new StorageException(e);
+        } catch (RocksDBException e) {
+            throw new IgniteRocksDbException(e);
+        }
     }
 
     @Override
@@ -1653,15 +1657,26 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
      *
      * @throws StorageRebalanceException If there was an error when finishing 
the rebalance.
      */
-    void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long 
lastAppliedTerm, byte[] groupConfig) {
+    void finishRebalance(WriteBatch writeBatch, MvPartitionMeta partitionMeta) 
{
         if (!state.compareAndSet(StorageState.REBALANCE, 
StorageState.RUNNABLE)) {
             throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
         }
 
         try {
-            saveLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
+            saveLastApplied(writeBatch, partitionMeta.lastAppliedIndex(), 
partitionMeta.lastAppliedTerm());
+
+            saveGroupConfigurationOnRebalance(writeBatch, 
partitionMeta.groupConfig());
+
+            if (partitionMeta.primaryReplicaNodeId() != null) {
+                assert partitionMeta.primaryReplicaNodeName() != null;
 
-            saveGroupConfigurationOnRebalance(writeBatch, groupConfig);
+                updateLeaseOnRebalance(
+                        writeBatch,
+                        partitionMeta.leaseStartTime(),
+                        partitionMeta.primaryReplicaNodeId(),
+                        partitionMeta.primaryReplicaNodeName()
+                );
+            }
         } catch (RocksDBException e) {
             throw new StorageRebalanceException("Error when trying to abort 
rebalancing storage: " + createStorageInfo(), e);
         }
@@ -1696,6 +1711,19 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         this.lastGroupConfig = config.clone();
     }
 
+    private void updateLeaseOnRebalance(
+            WriteBatch writeBatch,
+            long leaseStartTime,
+            UUID primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    ) {
+        saveLease(writeBatch, leaseStartTime, primaryReplicaNodeId, 
primaryReplicaNodeName);
+
+        this.leaseStartTime = leaseStartTime;
+        this.primaryReplicaNodeId = primaryReplicaNodeId;
+        this.primaryReplicaNodeName = primaryReplicaNodeName;
+    }
+
     /**
      * Prepares the storage for cleanup.
      *
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 85d71af2d1..6b67f1f274 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -329,15 +330,10 @@ public class RocksDbTableStorage implements 
MvTableStorage {
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(
-            int partitionId,
-            long lastAppliedIndex,
-            long lastAppliedTerm,
-            byte[] groupConfig
-    ) {
+    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
MvPartitionMeta partitionMeta) {
         return inBusyLock(busyLock, () -> 
mvPartitionStorages.finishRebalance(partitionId, mvPartitionStorage -> {
             try (WriteBatch writeBatch = new WriteBatch()) {
-                mvPartitionStorage.finishRebalance(writeBatch, 
lastAppliedIndex, lastAppliedTerm, groupConfig);
+                mvPartitionStorage.finishRebalance(writeBatch, partitionMeta);
 
                 indexes.finishRebalance(partitionId);
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 71b181013f..170f1dc511 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIMEM_
 import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
 import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_ROCKSDB_PROFILE_NAME;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL;
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
@@ -35,7 +36,9 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
@@ -68,9 +71,11 @@ import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
 import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
 import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.InternalTable;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
 import 
org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshallerImpl;
 import org.apache.ignite.internal.test.WatchListenerInhibitor;
@@ -118,12 +123,13 @@ import org.junit.jupiter.params.provider.ValueSource;
 @SuppressWarnings("resource")
 @Timeout(90)
 @ExtendWith(WorkDirectoryExtension.class)
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-23379";)
 class ItTableRaftSnapshotsTest extends BaseIgniteAbstractTest {
     private static final IgniteLogger LOG = 
Loggers.forClass(ItTableRaftSnapshotsTest.class);
 
     private static final int AWAIT_PRIMARY_REPLICA_SECONDS = 10;
 
+    private static final int PARTITION_ID = 0;
+
     /**
      * Nodes bootstrap configuration pattern.
      *
@@ -200,12 +206,32 @@ class ItTableRaftSnapshotsTest extends 
BaseIgniteAbstractTest {
         transferLeadershipOnSolePartitionTo(2);
 
         assertThat(getFromNode(2, 1), is("one"));
+        assertThat(estimatedSizeFromNode(2), is(1L));
+
+        MvPartitionStorage partitionAtNode0 = mvPartitionAtNode(0);
+        MvPartitionStorage partitionAtNode2 = mvPartitionAtNode(2);
+
+        assertThat(partitionAtNode2.leaseStartTime(), not(0L));
+        assertEquals(partitionAtNode0.primaryReplicaNodeId(), 
partitionAtNode2.primaryReplicaNodeId());
+        assertEquals(partitionAtNode0.primaryReplicaNodeName(), 
partitionAtNode2.primaryReplicaNodeName());
     }
 
     private @Nullable String getFromNode(int clusterNode, int key) {
         return tableViewAt(clusterNode).get(null, key);
     }
 
+    private long estimatedSizeFromNode(int clusterNode) {
+        return mvPartitionAtNode(clusterNode).estimatedSize();
+    }
+
+    private MvPartitionStorage mvPartitionAtNode(int clusterNode) {
+        InternalTable internalTable = 
unwrapTableImpl(tableAt(clusterNode)).internalTable();
+        MvPartitionStorage mvPartition = 
internalTable.storage().getMvPartition(PARTITION_ID);
+
+        assertNotNull(mvPartition);
+        return mvPartition;
+    }
+
     private void feedNode2WithSnapshotOfOneRow() throws InterruptedException {
         feedNode2WithSnapshotOfOneRow(DEFAULT_STORAGE_ENGINE);
     }
@@ -290,10 +316,14 @@ class ItTableRaftSnapshotsTest extends 
BaseIgniteAbstractTest {
     }
 
     private KeyValueView<Integer, String> tableViewAt(int nodeIndex) {
-        Table table = cluster.node(nodeIndex).tables().table("test");
+        Table table = tableAt(nodeIndex);
         return table.keyValueView(Integer.class, String.class);
     }
 
+    private Table tableAt(int nodeIndex) {
+        return cluster.node(nodeIndex).tables().table("test");
+    }
+
     private void knockoutNode(int nodeIndex) {
         cluster.stopNode(nodeIndex);
 
@@ -732,6 +762,7 @@ class ItTableRaftSnapshotsTest extends 
BaseIgniteAbstractTest {
      * rejected, and that, when metadata catches up, the snapshot gets 
successfully installed.
      */
     @Test
+    @Disabled("IGNITE-23677")
     void laggingSchemasOnFollowerPreventSnapshotInstallation() throws 
Exception {
         startAndInitCluster();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
index e86488b3ce..b52a8f99e3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
@@ -135,6 +135,27 @@ public interface PartitionAccess {
      */
     long maxLastAppliedTerm();
 
+    /**
+     * Returns the start time of the known lease for this replication group.
+     *
+     * @return Lease start time.
+     */
+    long leaseStartTime();
+
+    /**
+     * Return the node ID of the known lease for this replication group.
+     *
+     * @return Primary replica node id or {@code null} if there is no 
information about lease in the storage.
+     */
+    @Nullable UUID primaryReplicaNodeId();
+
+    /**
+     * Return the node name of the known lease for this replication group.
+     *
+     * @return Primary replica node name or {@code null} if there is no 
information about lease in the storage.
+     */
+    @Nullable String primaryReplicaNodeName();
+
     /**
      * Prepares partition storages for rebalancing.
      * <ul>
@@ -158,7 +179,7 @@ public interface PartitionAccess {
      * <p>This method must be called before every rebalance and ends with a 
call to one of the methods:
      * <ul>
      *     <li>{@link #abortRebalance()} - in case of errors or cancellation 
of rebalance;</li>
-     *     <li>{@link #finishRebalance(long, long, RaftGroupConfiguration)} - 
in case of successful completion of rebalance.</li>
+     *     <li>{@link #finishRebalance(RaftSnapshotPartitionMeta)} - in case 
of successful completion of rebalance.</li>
      * </ul>
      *
      * @return Future of the operation.
@@ -191,13 +212,11 @@ public interface PartitionAccess {
      *
      * <p>If rebalance has not started, then {@link StorageRebalanceException} 
will be thrown.
      *
-     * @param lastAppliedIndex Last applied index.
-     * @param lastAppliedTerm Last applied term.
-     * @param raftGroupConfig RAFT group configuration.
+     * @param partitionMeta Partition metadata.
      * @return Future of the operation.
      * @throws StorageRebalanceException If there are errors when trying to 
finish rebalancing.
      */
-    CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long 
lastAppliedTerm, RaftGroupConfiguration raftGroupConfig);
+    CompletableFuture<Void> finishRebalance(RaftSnapshotPartitionMeta 
partitionMeta);
 
     /**
      * Returns the row ID for which the index needs to be built, {@code null} 
means that the index building has completed.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index 8f179c6c2f..8672dc1611 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -121,22 +121,22 @@ public class PartitionAccessImpl implements 
PartitionAccess {
 
     @Override
     public Cursor<IgniteBiTuple<UUID, TxMeta>> getAllTxMeta() {
-        return getTxStateStorage(partitionId()).scan();
+        return getTxStateStorage().scan();
     }
 
     @Override
     public void addTxMeta(UUID txId, TxMeta txMeta) {
-        getTxStateStorage(partitionId()).putForRebalance(txId, txMeta);
+        getTxStateStorage().putForRebalance(txId, txMeta);
     }
 
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) {
-        return getMvPartitionStorage(partitionId()).closestRowId(lowerBound);
+        return getMvPartitionStorage().closestRowId(lowerBound);
     }
 
     @Override
     public List<ReadResult> getAllRowVersions(RowId rowId) {
-        MvPartitionStorage mvPartitionStorage = 
getMvPartitionStorage(partitionId());
+        MvPartitionStorage mvPartitionStorage = getMvPartitionStorage();
 
         return mvPartitionStorage.runConsistently(locker -> {
             locker.lock(rowId);
@@ -149,14 +149,14 @@ public class PartitionAccessImpl implements 
PartitionAccess {
 
     @Override
     public @Nullable RaftGroupConfiguration committedGroupConfiguration() {
-        byte[] configBytes = 
getMvPartitionStorage(partitionId()).committedGroupConfiguration();
+        byte[] configBytes = 
getMvPartitionStorage().committedGroupConfiguration();
 
         return raftGroupConfigurationConverter.fromBytes(configBytes);
     }
 
     @Override
     public void addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int 
commitTableId, int commitPartitionId, int catalogVersion) {
-        MvPartitionStorage mvPartitionStorage = 
getMvPartitionStorage(partitionId());
+        MvPartitionStorage mvPartitionStorage = getMvPartitionStorage();
 
         List<IndexIdAndTableVersion> indexIdAndTableVersionList = 
fullStateTransferIndexChooser.chooseForAddWrite(
                 catalogVersion,
@@ -181,7 +181,7 @@ public class PartitionAccessImpl implements PartitionAccess 
{
 
     @Override
     public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, 
HybridTimestamp commitTimestamp, int catalogVersion) {
-        MvPartitionStorage mvPartitionStorage = 
getMvPartitionStorage(partitionId());
+        MvPartitionStorage mvPartitionStorage = getMvPartitionStorage();
 
         List<IndexIdAndTableVersion> indexIdAndTableVersionList = 
fullStateTransferIndexChooser.chooseForAddWriteCommitted(
                 catalogVersion,
@@ -207,38 +207,53 @@ public class PartitionAccessImpl implements 
PartitionAccess {
     @Override
     public long minLastAppliedIndex() {
         return Math.min(
-                getMvPartitionStorage(partitionId()).lastAppliedIndex(),
-                getTxStateStorage(partitionId()).lastAppliedIndex()
+                getMvPartitionStorage().lastAppliedIndex(),
+                getTxStateStorage().lastAppliedIndex()
         );
     }
 
     @Override
     public long minLastAppliedTerm() {
         return Math.min(
-                getMvPartitionStorage(partitionId()).lastAppliedTerm(),
-                getTxStateStorage(partitionId()).lastAppliedTerm()
+                getMvPartitionStorage().lastAppliedTerm(),
+                getTxStateStorage().lastAppliedTerm()
         );
     }
 
     @Override
     public long maxLastAppliedIndex() {
         return Math.max(
-                getMvPartitionStorage(partitionId()).lastAppliedIndex(),
-                getTxStateStorage(partitionId()).lastAppliedIndex()
+                getMvPartitionStorage().lastAppliedIndex(),
+                getTxStateStorage().lastAppliedIndex()
         );
     }
 
     @Override
     public long maxLastAppliedTerm() {
         return Math.max(
-                getMvPartitionStorage(partitionId()).lastAppliedTerm(),
-                getTxStateStorage(partitionId()).lastAppliedTerm()
+                getMvPartitionStorage().lastAppliedTerm(),
+                getTxStateStorage().lastAppliedTerm()
         );
     }
 
+    @Override
+    public long leaseStartTime() {
+        return getMvPartitionStorage().leaseStartTime();
+    }
+
+    @Override
+    public @Nullable UUID primaryReplicaNodeId() {
+        return getMvPartitionStorage().primaryReplicaNodeId();
+    }
+
+    @Override
+    public @Nullable String primaryReplicaNodeName() {
+        return getMvPartitionStorage().primaryReplicaNodeName();
+    }
+
     @Override
     public CompletableFuture<Void> startRebalance() {
-        TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+        TxStateStorage txStateStorage = getTxStateStorage();
 
         return mvGc.removeStorage(toTablePartitionId(partitionKey))
                 .thenCompose(unused -> CompletableFuture.allOf(
@@ -249,7 +264,7 @@ public class PartitionAccessImpl implements PartitionAccess 
{
 
     @Override
     public CompletableFuture<Void> abortRebalance() {
-        TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+        TxStateStorage txStateStorage = getTxStateStorage();
 
         return CompletableFuture.allOf(
                 mvTableStorage.abortRebalancePartition(partitionId()),
@@ -258,14 +273,14 @@ public class PartitionAccessImpl implements 
PartitionAccess {
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long 
lastAppliedTerm, RaftGroupConfiguration raftGroupConfig) {
-        TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+    public CompletableFuture<Void> finishRebalance(RaftSnapshotPartitionMeta 
partitionMeta) {
+        TxStateStorage txStateStorage = getTxStateStorage();
 
-        byte[] configBytes = 
raftGroupConfigurationConverter.toBytes(raftGroupConfig);
+        byte[] configBytes = 
raftGroupConfigurationConverter.toBytes(partitionMeta.raftGroupConfig());
 
         return CompletableFuture.allOf(
-                mvTableStorage.finishRebalancePartition(partitionId(), 
lastAppliedIndex, lastAppliedTerm, configBytes),
-                txStateStorage.finishRebalance(lastAppliedIndex, 
lastAppliedTerm)
+                mvTableStorage.finishRebalancePartition(partitionId(), 
partitionMeta.toMvPartitionMeta(configBytes)),
+                
txStateStorage.finishRebalance(partitionMeta.lastAppliedIndex(), 
partitionMeta.lastAppliedTerm())
         ).thenAccept(unused -> 
mvGc.addStorage(toTablePartitionId(partitionKey), gcUpdateHandler));
     }
 
@@ -276,7 +291,7 @@ public class PartitionAccessImpl implements PartitionAccess 
{
 
     @Override
     public void setNextRowIdToBuildIndex(Map<Integer, RowId> 
nextRowIdToBuildByIndexId) {
-        MvPartitionStorage mvPartitionStorage = 
getMvPartitionStorage(partitionId());
+        MvPartitionStorage mvPartitionStorage = getMvPartitionStorage();
 
         mvPartitionStorage.runConsistently(locker -> {
             
nextRowIdToBuildByIndexId.forEach(indexUpdateHandler::setNextRowIdToBuildIndex);
@@ -290,7 +305,9 @@ public class PartitionAccessImpl implements PartitionAccess 
{
         lowWatermark.updateLowWatermark(newLowWatermark);
     }
 
-    private MvPartitionStorage getMvPartitionStorage(int partitionId) {
+    private MvPartitionStorage getMvPartitionStorage() {
+        int partitionId = partitionId();
+
         MvPartitionStorage mvPartitionStorage = 
mvTableStorage.getMvPartition(partitionId);
 
         assert mvPartitionStorage != null : 
IgniteStringFormatter.format("tableId={}, partitionId={}", tableId(), 
partitionId);
@@ -298,7 +315,9 @@ public class PartitionAccessImpl implements PartitionAccess 
{
         return mvPartitionStorage;
     }
 
-    private TxStateStorage getTxStateStorage(int partitionId) {
+    private TxStateStorage getTxStateStorage() {
+        int partitionId = partitionId();
+
         TxStateStorage txStateStorage = 
txStateTableStorage.getTxStateStorage(partitionId);
 
         assert txStateStorage != null : 
IgniteStringFormatter.format("tableId={}, partitionId={}", tableId(), 
partitionId);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
index 93e02b32af..a60b2d819f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -21,15 +21,14 @@ import static 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoin
 import static 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotMetaUtils.snapshotMetaAt;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.network.TopologyService;
-import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
-import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
@@ -58,19 +57,7 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
 
     private final CatalogService catalogService;
 
-    /**
-     * RAFT log index, min of {@link MvPartitionStorage#lastAppliedIndex()} 
and {@link TxStateStorage#lastAppliedIndex()}
-     * at the moment of this factory instantiation.
-     */
-    private final long lastIncludedRaftIndex;
-
-    /** Term corresponding to {@link #lastIncludedRaftIndex}. */
-    private final long lastIncludedRaftTerm;
-
-    /** RAFT configuration corresponding to {@link #lastIncludedRaftIndex}. */
-    private final RaftGroupConfiguration lastIncludedConfiguration;
-
-    private final int lastCatalogVersionAtStart;
+    private final @Nullable SnapshotMeta startupSnapshotMeta;
 
     /** Incoming snapshots executor. */
     private final Executor incomingSnapshotsExecutor;
@@ -101,12 +88,25 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
 
         // We must choose the minimum applied index for local recovery so that 
we don't skip the raft commands for the storage with the
         // lowest applied index and thus no data loss occurs.
-        lastIncludedRaftIndex = partition.minLastAppliedIndex();
-        lastIncludedRaftTerm = partition.minLastAppliedTerm();
+        long lastIncludedRaftIndex = partition.minLastAppliedIndex();
+        long lastIncludedRaftTerm = partition.minLastAppliedTerm();
 
-        lastIncludedConfiguration = partition.committedGroupConfiguration();
+        int lastCatalogVersionAtStart = catalogService.latestCatalogVersion();
 
-        lastCatalogVersionAtStart = catalogService.latestCatalogVersion();
+        if (lastIncludedRaftIndex == 0) {
+            startupSnapshotMeta = null;
+        } else {
+            startupSnapshotMeta = snapshotMetaAt(
+                    lastIncludedRaftIndex,
+                    lastIncludedRaftTerm,
+                    
Objects.requireNonNull(partition.committedGroupConfiguration()),
+                    lastCatalogVersionAtStart,
+                    
collectNextRowIdToBuildIndexesAtStart(lastCatalogVersionAtStart),
+                    partition.leaseStartTime(),
+                    partition.primaryReplicaNodeId(),
+                    partition.primaryReplicaNodeName()
+            );
+        }
     }
 
     @Override
@@ -118,26 +118,12 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
                 raftOptions,
                 partition,
                 catalogService,
-                createStartupSnapshotMeta(),
+                startupSnapshotMeta,
                 incomingSnapshotsExecutor
         );
     }
 
-    private @Nullable SnapshotMeta createStartupSnapshotMeta() {
-        if (lastIncludedRaftIndex == 0) {
-            return null;
-        }
-
-        return snapshotMetaAt(
-                lastIncludedRaftIndex,
-                lastIncludedRaftTerm,
-                lastIncludedConfiguration,
-                lastCatalogVersionAtStart,
-                collectNextRowIdToBuildIndexesAtStart()
-        );
-    }
-
-    private Map<Integer, UUID> collectNextRowIdToBuildIndexesAtStart() {
+    private Map<Integer, UUID> collectNextRowIdToBuildIndexesAtStart(int 
lastCatalogVersionAtStart) {
         return collectNextRowIdToBuildIndexes(catalogService, partition, 
lastCatalogVersionAtStart);
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/RaftSnapshotPartitionMeta.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/RaftSnapshotPartitionMeta.java
new file mode 100644
index 0000000000..13a7162ba2
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/RaftSnapshotPartitionMeta.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.util.UUID;
+import 
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
+import org.apache.ignite.internal.storage.engine.PrimitivePartitionMeta;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Partition metadata for {@link PartitionAccess}.
+ */
+public class RaftSnapshotPartitionMeta extends PrimitivePartitionMeta {
+    private final RaftGroupConfiguration raftGroupConfig;
+
+    /** Constructs an {@link RaftSnapshotPartitionMeta} from a {@link 
PartitionSnapshotMeta} . */
+    public static RaftSnapshotPartitionMeta 
fromSnapshotMeta(PartitionSnapshotMeta meta, RaftGroupConfiguration 
raftGroupConfig) {
+        return new RaftSnapshotPartitionMeta(
+                meta.lastIncludedIndex(),
+                meta.lastIncludedTerm(),
+                raftGroupConfig,
+                meta.leaseStartTime(),
+                meta.primaryReplicaNodeId(),
+                meta.primaryReplicaNodeName()
+        );
+    }
+
+    /** Constructor. */
+    private RaftSnapshotPartitionMeta(
+            long lastAppliedIndex,
+            long lastAppliedTerm,
+            RaftGroupConfiguration raftGroupConfig,
+            long leaseStartTime,
+            @Nullable UUID primaryReplicaNodeId,
+            @Nullable String primaryReplicaNodeName
+    ) {
+        super(lastAppliedIndex, lastAppliedTerm, leaseStartTime, 
primaryReplicaNodeId, primaryReplicaNodeName);
+
+        this.raftGroupConfig = raftGroupConfig;
+    }
+
+    /** Returns replication group config. */
+    public RaftGroupConfiguration raftGroupConfig() {
+        return raftGroupConfig;
+    }
+
+    /**
+     * Converts this meta to {@link MvPartitionMeta}.
+     *
+     * @param configBytes Group config represented as bytes.
+     */
+    public MvPartitionMeta toMvPartitionMeta(byte[] configBytes) {
+        return new MvPartitionMeta(
+                lastAppliedIndex(),
+                lastAppliedTerm(),
+                configBytes,
+                leaseStartTime(),
+                primaryReplicaNodeId(),
+                primaryReplicaNodeName()
+        );
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index a8d3d99084..830ac6ddce 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -62,11 +62,11 @@ import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.RaftSnapshotPartitionMeta;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotUri;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
@@ -470,7 +470,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
                 return 
partitionSnapshotStorage.partition().abortRebalance().thenCompose(unused -> 
failedFuture(throwable));
             }
 
-            SnapshotMeta meta = snapshotMeta;
+            PartitionSnapshotMeta meta = snapshotMeta;
 
             RaftGroupConfiguration raftGroupConfig = new 
RaftGroupConfiguration(
                     meta.peersList(),
@@ -487,7 +487,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
                     raftGroupConfig
             );
 
-            return 
partitionSnapshotStorage.partition().finishRebalance(meta.lastIncludedIndex(), 
meta.lastIncludedTerm(), raftGroupConfig);
+            return 
partitionSnapshotStorage.partition().finishRebalance(RaftSnapshotPartitionMeta.fromSnapshotMeta(meta,
 raftGroupConfig));
         } finally {
             busyLock.leaveBusy();
         }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
index 6b5858d01b..a54385158d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -168,9 +168,6 @@ public class OutgoingSnapshot {
     }
 
     private PartitionSnapshotMeta takeSnapshotMeta() {
-        long lastAppliedIndex = partition.maxLastAppliedIndex();
-        long lastAppliedTerm = partition.maxLastAppliedTerm();
-
         RaftGroupConfiguration config = 
partition.committedGroupConfiguration();
 
         assert config != null : "Configuration should never be null when 
installing a snapshot";
@@ -179,7 +176,16 @@ public class OutgoingSnapshot {
 
         Map<Integer, UUID> nextRowIdToBuildByIndexId = 
collectNextRowIdToBuildIndexes(catalogService, partition, catalogVersion);
 
-        return snapshotMetaAt(lastAppliedIndex, lastAppliedTerm, config, 
catalogVersion, nextRowIdToBuildByIndexId);
+        return snapshotMetaAt(
+                partition.maxLastAppliedIndex(),
+                partition.maxLastAppliedTerm(),
+                config,
+                catalogVersion,
+                nextRowIdToBuildByIndexId,
+                partition.leaseStartTime(),
+                partition.primaryReplicaNodeId(),
+                partition.primaryReplicaNodeName()
+        );
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
index bbd2db8ef1..de405eb47f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMetaBuilder;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.RowId;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Utils to build {@link SnapshotMeta} instances.
@@ -45,6 +47,9 @@ public class SnapshotMetaUtils {
      * @param requiredCatalogVersion Catalog version that a follower/learner 
must have to have ability to accept this snapshot.
      * @param nextRowIdToBuildByIndexId Row ID for which the index needs to be 
built per building index ID at the time the snapshot meta was
      *      created.
+     * @param leaseStartTime Lease start time (as {@link 
HybridTimestamp#longValue()}).
+     * @param primaryReplicaNodeId Primary replica node ID.
+     * @param primaryReplicaNodeName Primary replica node name.
      * @return SnapshotMeta corresponding to the given log index.
      */
     public static PartitionSnapshotMeta snapshotMetaAt(
@@ -52,7 +57,10 @@ public class SnapshotMetaUtils {
             long term,
             RaftGroupConfiguration config,
             int requiredCatalogVersion,
-            Map<Integer, UUID> nextRowIdToBuildByIndexId
+            Map<Integer, UUID> nextRowIdToBuildByIndexId,
+            long leaseStartTime,
+            @Nullable UUID primaryReplicaNodeId,
+            @Nullable String primaryReplicaNodeName
     ) {
         PartitionSnapshotMetaBuilder metaBuilder = new 
PartitionReplicationMessagesFactory().partitionSnapshotMeta()
                 .lastIncludedIndex(logIndex)
@@ -60,7 +68,10 @@ public class SnapshotMetaUtils {
                 .peersList(config.peers())
                 .learnersList(config.learners())
                 .requiredCatalogVersion(requiredCatalogVersion)
-                .nextRowIdToBuildByIndexId(nextRowIdToBuildByIndexId);
+                .nextRowIdToBuildByIndexId(nextRowIdToBuildByIndexId)
+                .leaseStartTime(leaseStartTime)
+                .primaryReplicaNodeId(primaryReplicaNodeId)
+                .primaryReplicaNodeName(primaryReplicaNodeName);
 
         if (!config.isStable()) {
             //noinspection ConstantConditions
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/NullMvTableStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/NullMvTableStorage.java
index 5fc64644d9..e31ec44ec5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/NullMvTableStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/NullMvTableStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.storage;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -98,12 +99,7 @@ public class NullMvTableStorage implements MvTableStorage {
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(
-            int partitionId,
-            long lastAppliedIndex,
-            long lastAppliedTerm,
-            byte[] groupConfig
-    ) {
+    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
MvPartitionMeta partitionMeta) {
         return throwNoPartitionsException();
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7d73b2d82d..793538723d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -683,6 +683,8 @@ public class TableManagerTest extends IgniteAbstractTest {
             
when(mvPartitionStorage.lastAppliedIndex()).thenReturn(MvPartitionStorage.REBALANCE_IN_PROGRESS);
         }
 
+        when(mvPartitionStorage.committedGroupConfiguration()).thenReturn(new 
byte[0]);
+
         
doReturn(mock(PartitionTimestampCursor.class)).when(mvPartitionStorage).scan(any());
         when(txStateStorage.clear()).thenReturn(nullCompletedFuture());
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 5e8d5e5dda..f701f57230 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -172,6 +172,10 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
     private final long expLastAppliedTerm = 100L;
     private final RaftGroupConfiguration expLastGroupConfig = 
generateRaftGroupConfig();
 
+    private final long expLeaseStartTime = 3000000;
+    private final UUID expPrimaryReplicaNodeId = new UUID(1, 2);
+    private final String expPrimaryReplicaNodeName = "primary";
+
     private final List<RowId> rowIds = generateRowIds();
     private final List<UUID> txIds = generateTxIds();
 
@@ -238,6 +242,10 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
                 raftGroupConfigurationConverter.toBytes(expLastGroupConfig),
                 outgoingMvPartitionStorage.committedGroupConfiguration()
         );
+        assertEquals(expLeaseStartTime, 
outgoingMvPartitionStorage.leaseStartTime());
+        assertEquals(expPrimaryReplicaNodeId, 
outgoingMvPartitionStorage.primaryReplicaNodeId());
+        assertEquals(expPrimaryReplicaNodeName, 
outgoingMvPartitionStorage.primaryReplicaNodeName());
+
         assertEquals(expLastAppliedIndex, 
outgoingTxStatePartitionStorage.lastAppliedIndex());
         assertEquals(expLastAppliedTerm, 
outgoingTxStatePartitionStorage.lastAppliedTerm());
 
@@ -258,7 +266,16 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
     }
 
     private void fillOriginalStorages() {
-        fillMvPartitionStorage(outgoingMvPartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
+        fillMvPartitionStorage(
+                outgoingMvPartitionStorage,
+                expLastAppliedIndex,
+                expLastAppliedTerm,
+                expLastGroupConfig,
+                rowIds,
+                expLeaseStartTime,
+                expPrimaryReplicaNodeId,
+                expPrimaryReplicaNodeName
+        );
         fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, txIds);
     }
 
@@ -319,7 +336,10 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
                         expLastAppliedTerm,
                         expLastGroupConfig,
                         requiredCatalogVersion,
-                        Map.of(indexId, nextRowIdToBuildIndex.uuid())
+                        Map.of(indexId, nextRowIdToBuildIndex.uuid()),
+                        expLeaseStartTime,
+                        expPrimaryReplicaNodeId,
+                        expPrimaryReplicaNodeName
                 ))
                 .build();
     }
@@ -366,7 +386,10 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
             long lastAppliedIndex,
             long lastAppliedTerm,
             RaftGroupConfiguration raftGroupConfig,
-            List<RowId> rowIds
+            List<RowId> rowIds,
+            long leaseStartTime,
+            UUID primaryReplicaNodeId,
+            String primaryReplicaNodeName
     ) {
         assertEquals(0, rowIds.size() % 2, "size=" + rowIds.size());
 
@@ -385,6 +408,8 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
 
             
storage.committedGroupConfiguration(raftGroupConfigurationConverter.toBytes(raftGroupConfig));
 
+            storage.updateLease(leaseStartTime, primaryReplicaNodeId, 
primaryReplicaNodeName);
+
             return null;
         });
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
index fea847ddf1..8dfb27f193 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
@@ -71,9 +71,7 @@ class OutgoingSnapshotCommonTest extends 
BaseIgniteAbstractTest {
     @Test
     void sendsSnapshotMeta() {
         when(partitionAccess.maxLastAppliedIndex()).thenReturn(100L);
-
         when(partitionAccess.maxLastAppliedTerm()).thenReturn(3L);
-
         when(partitionAccess.committedGroupConfiguration()).thenReturn(new 
RaftGroupConfiguration(
                 List.of("peer1:3000", "peer2:3000"),
                 List.of("learner1:3000", "learner2:3000"),
@@ -83,6 +81,10 @@ class OutgoingSnapshotCommonTest extends 
BaseIgniteAbstractTest {
 
         
when(catalogService.latestCatalogVersion()).thenReturn(REQUIRED_CATALOG_VERSION);
 
+        when(partitionAccess.leaseStartTime()).thenReturn(333L);
+        when(partitionAccess.primaryReplicaNodeId()).thenReturn(new UUID(1, 
2));
+        when(partitionAccess.primaryReplicaNodeName()).thenReturn("primary");
+
         snapshot.freezeScopeUnderMvLock();
 
         SnapshotMetaResponse response = getSnapshotMetaResponse();
@@ -94,6 +96,9 @@ class OutgoingSnapshotCommonTest extends 
BaseIgniteAbstractTest {
         assertThat(response.meta().oldPeersList(), is(List.of("peer1:3000")));
         assertThat(response.meta().oldLearnersList(), 
is(List.of("learner1:3000")));
         assertThat(response.meta().requiredCatalogVersion(), 
is(REQUIRED_CATALOG_VERSION));
+        assertThat(response.meta().leaseStartTime(), is(333L));
+        assertThat(response.meta().primaryReplicaNodeId(), is(new UUID(1, 2)));
+        assertThat(response.meta().primaryReplicaNodeName(), is("primary"));
     }
 
     private SnapshotMetaResponse getSnapshotMetaResponse() {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
index 97fc4f01ec..411c748242 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
@@ -67,7 +67,16 @@ class SnapshotMetaUtilsTest extends BaseIgniteAbstractTest {
         UUID nextRowIdToBuild = UUID.randomUUID();
         int indexId = 1;
 
-        PartitionSnapshotMeta meta = snapshotMetaAt(100, 3, config, 42, 
Map.of(indexId, nextRowIdToBuild));
+        PartitionSnapshotMeta meta = snapshotMetaAt(
+                100,
+                3,
+                config,
+                42,
+                Map.of(indexId, nextRowIdToBuild),
+                777L,
+                new UUID(1, 2),
+                "primary"
+        );
 
         assertThat(meta.lastIncludedIndex(), is(100L));
         assertThat(meta.lastIncludedTerm(), is(3L));
@@ -77,11 +86,23 @@ class SnapshotMetaUtilsTest extends BaseIgniteAbstractTest {
         assertThat(meta.oldLearnersList(), is(List.of("learner1:3000")));
         assertThat(meta.requiredCatalogVersion(), is(42));
         assertThat(meta.nextRowIdToBuildByIndexId(), is(Map.of(indexId, 
nextRowIdToBuild)));
+        assertThat(meta.leaseStartTime(), is(777L));
+        assertThat(meta.primaryReplicaNodeId(), is(new UUID(1, 2)));
+        assertThat(meta.primaryReplicaNodeName(), is("primary"));
     }
 
     @Test
     void doesNotIncludeOldConfigWhenItIsNotThere() {
-        PartitionSnapshotMeta meta = snapshotMetaAt(100, 3, new 
RaftGroupConfiguration(List.of(), List.of(), null, null), 42, Map.of());
+        PartitionSnapshotMeta meta = snapshotMetaAt(
+                100,
+                3,
+                new RaftGroupConfiguration(List.of(), List.of(), null, null),
+                42,
+                Map.of(),
+                777L,
+                null,
+                null
+        );
 
         assertThat(meta.oldPeersList(), is(nullValue()));
         assertThat(meta.oldLearnersList(), is(nullValue()));

Reply via email to