This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 57c07ee68d IGNITE-22465 Recover zones' replicas and pending/stable 
events on a node restart (#4043)
57c07ee68d is described below

commit 57c07ee68d5d838a25f1a1d6dbefb244dfbf9b0d
Author: Mirza Aliev <[email protected]>
AuthorDate: Tue Jul 9 17:07:27 2024 +0400

    IGNITE-22465 Recover zones' replicas and pending/stable events on a node 
restart (#4043)
---
 .../DistributionZoneRebalanceEngineV2.java         |  46 ++-
 .../rebalance/ZoneRebalanceUtil.java               |  32 +-
 modules/partition-replicator/build.gradle          |   1 +
 .../replicator/ItReplicaLifecycleTest.java         | 345 ++++++++++++++++-----
 .../PartitionReplicaLifecycleManager.java          | 142 ++++++++-
 .../ignite/internal/replicator/ReplicaManager.java |   6 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   1 +
 .../internal/table/distributed/TableManager.java   |   2 +-
 8 files changed, 469 insertions(+), 106 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
index f409e9b316..1f6b1f53d3 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.distributionzones.rebalance;
 
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseDataNodes;
@@ -25,9 +27,11 @@ import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalan
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.triggerZonePartitionsRebalance;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
@@ -108,7 +112,14 @@ public class DistributionZoneRebalanceEngineV2 {
             // TODO: IGNITE-18694 - Recovery for the case when zones watch 
listener processed event but assignments were not updated.
             metaStorageManager.registerPrefixWatch(zoneDataNodesKey(), 
dataNodesListener);
 
-            return nullCompletedFuture();
+            CompletableFuture<Long> recoveryFinishFuture = 
metaStorageManager.recoveryFinishedFuture();
+
+            // At the moment of the start of this manager, it is guaranteed 
that Meta Storage has been recovered.
+            assert recoveryFinishFuture.isDone();
+
+            long recoveryRevision = recoveryFinishFuture.join();
+
+            return rebalanceTriggersRecovery(recoveryRevision);
         });
     }
 
@@ -161,7 +172,8 @@ public class DistributionZoneRebalanceEngineV2 {
                             zoneDescriptor,
                             filteredDataNodes,
                             evt.entryEvent().newEntry().revision(),
-                            metaStorageManager
+                            metaStorageManager,
+                            busyLock
                     );
                 });
             }
@@ -206,8 +218,36 @@ public class DistributionZoneRebalanceEngineV2 {
                             zoneDescriptor,
                             dataNodes,
                             causalityToken,
-                            metaStorageManager
+                            metaStorageManager,
+                            busyLock
                     );
                 }));
     }
+
+    /**
+     * Run the update of rebalance metastore's state.
+     *
+     * @param recoveryRevision Recovery revision.
+     */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-21058 At the moment 
this method produce many metastore multi-invokes
+    // TODO: which can be avoided by the local logic, which mirror the logic 
of metastore invokes.
+    // TODO: And then run the remote invoke, only if needed.
+    private CompletableFuture<Void> rebalanceTriggersRecovery(long 
recoveryRevision) {
+        if (recoveryRevision > 0) {
+            List<CompletableFuture<Void>> zonesRecoveryFutures = 
catalogService.zones(catalogService.latestCatalogVersion())
+                    .stream()
+                    .map(zoneDesc ->
+                            
recalculateAssignmentsAndTriggerZonePartitionsRebalance(
+                                    zoneDesc,
+                                    recoveryRevision,
+                                    catalogService.latestCatalogVersion()
+                            )
+                    )
+                    .collect(Collectors.toUnmodifiableList());
+
+            return allOf(zonesRecoveryFutures.toArray(new 
CompletableFuture[0]));
+        } else {
+            return completedFuture(null);
+        }
+    }
 }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
index 23fea49067..61087071bb 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
@@ -38,6 +38,7 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.remove;
 import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
 import static 
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
@@ -62,6 +63,7 @@ import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -273,7 +275,8 @@ public class ZoneRebalanceUtil {
             CatalogZoneDescriptor zoneDescriptor,
             Set<String> dataNodes,
             long storageRevision,
-            MetaStorageManager metaStorageManager
+            MetaStorageManager metaStorageManager,
+            IgniteSpinBusyLock busyLock
     ) {
         CompletableFuture<Map<Integer, Assignments>> zoneAssignmentsFut = 
zoneAssignments(
                 metaStorageManager,
@@ -289,19 +292,20 @@ public class ZoneRebalanceUtil {
 
             int finalPartId = partId;
 
-            partitionFutures[partId] = 
zoneAssignmentsFut.thenCompose(zoneAssignments ->
-                    // TODO https://issues.apache.org/jira/browse/IGNITE-19763 
We should distinguish empty stable assignments on
-                    // TODO node recovery in case of interrupted table 
creation, and moving from empty assignments to non-empty.
-                    zoneAssignments.isEmpty() ? nullCompletedFuture() : 
updatePendingAssignmentsKeys(
-                            zoneDescriptor,
-                            replicaGrpId,
-                            dataNodes,
-                            zoneDescriptor.replicas(),
-                            storageRevision,
-                            metaStorageManager,
-                            finalPartId,
-                            zoneAssignments.get(finalPartId).nodes()
-                    ));
+            partitionFutures[partId] = 
zoneAssignmentsFut.thenCompose(zoneAssignments -> inBusyLockAsync(busyLock, () 
-> {
+                // TODO https://issues.apache.org/jira/browse/IGNITE-19763 We 
should distinguish empty stable assignments on
+                // TODO node recovery in case of interrupted table creation, 
and moving from empty assignments to non-empty.
+                return zoneAssignments.isEmpty() ? nullCompletedFuture() : 
updatePendingAssignmentsKeys(
+                        zoneDescriptor,
+                        replicaGrpId,
+                        dataNodes,
+                        zoneDescriptor.replicas(),
+                        storageRevision,
+                        metaStorageManager,
+                        finalPartId,
+                        zoneAssignments.get(finalPartId).nodes()
+                );
+            }));
         }
 
         // This set is used to deduplicate exceptions (if there is an 
exception from upstream, for instance,
diff --git a/modules/partition-replicator/build.gradle 
b/modules/partition-replicator/build.gradle
index 1911df7768..b6b52014b8 100644
--- a/modules/partition-replicator/build.gradle
+++ b/modules/partition-replicator/build.gradle
@@ -39,6 +39,7 @@ dependencies {
     implementation project(':ignite-schema')
     implementation project(':ignite-transactions')
     implementation project(':ignite-storage-api')
+    implementation project(':ignite-low-watermark')
 
     integrationTestImplementation testFixtures(project(':ignite-runner'))
     integrationTestImplementation testFixtures(project(':ignite-core'))
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index f0a435f281..77a90e9aff 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -27,6 +27,7 @@ import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZo
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
 import static 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.FEATURE_FLAG_NAME;
 import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
@@ -35,6 +36,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static 
org.apache.ignite.internal.testframework.TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
 import static org.apache.ignite.sql.ColumnType.INT32;
 import static org.apache.ignite.sql.ColumnType.INT64;
@@ -44,18 +46,23 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import java.util.function.LongSupplier;
@@ -103,6 +110,8 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
+import org.apache.ignite.internal.metastorage.dsl.Condition;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -219,10 +228,21 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
     @InjectConfiguration("mock.profiles = {" + DEFAULT_STORAGE_PROFILE + 
".engine = \"aipersist\", test.engine=\"test\"}")
     private static StorageConfiguration storageConfiguration;
 
+    private final List<NetworkAddress> nodeAddresses = new ArrayList<>();
+
+    private final List<NodeAttributesConfiguration> 
nodeAttributesConfigurations = new ArrayList<>(
+            List.of(nodeAttributes1, nodeAttributes2, nodeAttributes3)
+    );
+
+    /**
+     * Interceptor of {@link MetaStorageManager#invoke(Condition, Collection, 
Collection)}.
+     */
+    private final Map<Integer, InvokeInterceptor> 
metaStorageInvokeInterceptorByNode = new ConcurrentHashMap<>();
+
     @WorkDirectory
     private Path workDir;
 
-    private List<Node> nodes;
+    private Map<Integer, Node> nodes;
 
     private TestPlacementDriver placementDriver;
 
@@ -243,28 +263,29 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
     }
 
     @BeforeEach
-    void before(TestInfo testInfo) throws Exception {
-        nodes = new ArrayList<>();
+    void before(TestInfo testInfo) {
+        nodes = new HashMap<>();
 
         placementDriver = new TestPlacementDriver();
 
-        List<NetworkAddress> nodeAddresses = new ArrayList<>();
-
-        List<NodeAttributesConfiguration> nodeAttributesConfigurations = new 
ArrayList<>();
-        nodeAttributesConfigurations.addAll(List.of(nodeAttributes1, 
nodeAttributes2, nodeAttributes3));
-
         for (int i = 0; i < NODE_COUNT; i++) {
             nodeAddresses.add(new NetworkAddress(HOST, BASE_PORT + i));
         }
 
         finder = new StaticNodeFinder(nodeAddresses);
+    }
 
-        int i = 0;
+    @AfterEach
+    void after() {
+        metaStorageInvokeInterceptorByNode.clear();
+        nodes.values().forEach(Node::stop);
+    }
 
-        for (NetworkAddress addr : nodeAddresses) {
-            var node = new Node(testInfo, addr, 
nodeAttributesConfigurations.get(i++));
+    private void startNodes(TestInfo testInfo, int amount) throws 
NodeStoppingException, InterruptedException {
+        for (int i = 0; i < amount; i++) {
+            var node = new Node(testInfo, i);
 
-            nodes.add(node);
+            nodes.put(i, node);
 
             node.start();
         }
@@ -273,10 +294,10 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
         node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name), 
"cluster");
 
-        nodes.forEach(Node::waitWatches);
+        nodes.values().forEach(Node::waitWatches);
 
         assertThat(
-                allOf(nodes.stream().map(n -> 
n.cmgManager.onJoinReady()).toArray(CompletableFuture[]::new)),
+                allOf(nodes.values().stream().map(n -> 
n.cmgManager.onJoinReady()).toArray(CompletableFuture[]::new)),
                 willCompleteSuccessfully()
         );
 
@@ -286,21 +307,40 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
                     assertThat(logicalTopologyFuture, 
willCompleteSuccessfully());
 
-                    return logicalTopologyFuture.join().nodes().size() == 
NODE_COUNT;
+                    return logicalTopologyFuture.join().nodes().size() == 
amount;
                 },
                 AWAIT_TIMEOUT_MILLIS
         ));
     }
 
-    @AfterEach
-    void after() {
-        nodes.forEach(Node::stop);
+    private Node startNode(TestInfo testInfo, int idx) {
+        var node = new Node(testInfo, idx);
+
+        nodes.put(idx, node);
+
+        node.start();
+
+        node.waitWatches();
+
+        assertThat(node.cmgManager.onJoinReady(), willCompleteSuccessfully());
+
+        return node;
+    }
+
+    private void stopNode(int idx) {
+        Node node = getNode(idx);
+
+        node.stop();
+
+        nodes.remove(idx);
     }
 
     @Test
-    public void testEmptyReplicaListener() throws NodeStoppingException {
+    public void testEmptyReplicaListener(TestInfo testInfo) throws Exception {
+        startNodes(testInfo, 3);
+
         Assignment replicaAssignment = (Assignment) 
AffinityUtils.calculateAssignmentForPartition(
-                nodes.stream().map(n -> n.name).collect(Collectors.toList()), 
0, 1).toArray()[0];
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toList()), 0, 1).toArray()[0];
 
         Node node = getNode(replicaAssignment.consistentId());
 
@@ -323,13 +363,10 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void testAlterReplicaTrigger() throws Exception {
-        Assignment replicaAssignment = (Assignment) 
AffinityUtils.calculateAssignmentForPartition(
-                nodes.stream().map(n -> n.name).collect(Collectors.toList()), 
0, 1).toArray()[0];
-
-        Node node = getNode(replicaAssignment.consistentId());
+    void testAlterReplicaTrigger(TestInfo testInfo) throws Exception {
+        startNodes(testInfo, 3);
 
-        
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+        Node node = getNode(0);
 
         createZone(node, "test_zone", 1, 3);
 
@@ -344,7 +381,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 stablePartAssignmentsKey(partId),
                 (v) -> Assignments.fromBytes(v).nodes()
                         
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
-                nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toSet()),
                 20_000L
         );
 
@@ -360,24 +397,13 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 2,
                 20_000L
         );
-
-        assertValueInStorage(
-                metaStorageManager,
-                stablePartAssignmentsKey(partId),
-                (v) -> 
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
-                true,
-                20_000L
-        );
     }
 
     @Test
-    void testAlterReplicaTriggerDefaultZone() throws Exception {
-        Assignment replicaAssignment = (Assignment) 
AffinityUtils.calculateAssignmentForPartition(
-                nodes.stream().map(n -> n.name).collect(Collectors.toList()), 
0, 1).toArray()[0];
+    void testAlterReplicaTriggerDefaultZone(TestInfo testInfo) throws 
Exception {
+        startNodes(testInfo, 3);
 
-        Node node = getNode(replicaAssignment.consistentId());
-
-        
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+        Node node = getNode(0);
 
         CatalogManager catalogManager = node.catalogManager;
 
@@ -408,22 +434,13 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 2,
                 20_000L
         );
-
-        assertValueInStorage(
-                metaStorageManager,
-                stablePartAssignmentsKey(partId),
-                (v) -> 
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
-                true,
-                20_000L
-        );
     }
 
     @Test
-    void testAlterReplicaExtensionTrigger() throws Exception {
-        Assignment replicaAssignment = (Assignment) 
AffinityUtils.calculateAssignmentForPartition(
-                nodes.stream().map(n -> n.name).collect(Collectors.toList()), 
0, 1).toArray()[0];
+    void testAlterReplicaExtensionTrigger(TestInfo testInfo) throws Exception {
+        startNodes(testInfo, 3);
 
-        Node node = getNode(replicaAssignment.consistentId());
+        Node node = getNode(0);
 
         
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
 
@@ -446,14 +463,6 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 20_000L
         );
 
-        assertValueInStorage(
-                metaStorageManager,
-                stablePartAssignmentsKey(partId),
-                (v) -> 
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
-                true,
-                20_000L
-        );
-
         CatalogManager catalogManager = node.catalogManager;
 
         alterZone(catalogManager, "test_zone", 3);
@@ -463,17 +472,16 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 stablePartAssignmentsKey(partId),
                 (v) -> Assignments.fromBytes(v).nodes()
                         
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
-                nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toSet()),
                 20_000L
         );
     }
 
     @Test
-    void testAlterFilterTrigger() throws Exception {
-        Assignment replicaAssignment = (Assignment) 
AffinityUtils.calculateAssignmentForPartition(
-                nodes.stream().map(n -> n.name).collect(Collectors.toList()), 
0, 1).toArray()[0];
+    void testAlterFilterTrigger(TestInfo testInfo) throws Exception {
+        startNodes(testInfo, 3);
 
-        Node node = getNode(replicaAssignment.consistentId());
+        Node node = getNode(0);
 
         
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
 
@@ -490,7 +498,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 stablePartAssignmentsKey(partId),
                 (v) -> Assignments.fromBytes(v).nodes()
                         
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
-                nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toSet()),
                 20_000L
         );
 
@@ -508,14 +516,168 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                 Set.of(nodes.get(0).name),
                 20_000L
         );
+    }
+
+    @Test
+    void testReplicaIsStartedOnNodeStart(TestInfo testInfo) throws Exception {
+        startNodes(testInfo, 3);
+
+        Node node0 = getNode(0);
+
+        createZone(node0, "test_zone", 2, 3);
+
+        int zoneId = DistributionZonesTestUtil.getZoneId(node0.catalogManager, 
"test_zone", node0.hybridClock.nowLong());
+
+        MetaStorageManager metaStorageManager = node0.metaStorageManager;
+
+        ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> Assignments.fromBytes(v).nodes()
+                        
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toSet()),
+                20_000L
+        );
+
+        stopNode(2);
+
+        Node node2 = startNode(testInfo, 2);
+
+        assertTrue(waitForCondition(() -> 
node2.replicaManager.isReplicaStarted(partId), 10_000L));
+    }
+
+    @Test
+    void testStableAreWrittenAfterRestart(TestInfo testInfo) throws Exception {
+        startNodes(testInfo, 1);
+
+        Node node0 = getNode(0);
+
+        AtomicBoolean reached = new AtomicBoolean();
+
+        metaStorageInvokeInterceptorByNode.put(0, (condition, success, 
failure) -> {
+            if (skipMetaStorageInvoke(success, STABLE_ASSIGNMENTS_PREFIX)) {
+                reached.set(true);
+
+                return true;
+            }
+
+            return null;
+        });
+
+        createZone(node0, "test_zone", 2, 3);
+
+        int zoneId = DistributionZonesTestUtil.getZoneId(node0.catalogManager, 
"test_zone", node0.hybridClock.nowLong());
+
+        MetaStorageManager metaStorageManager = node0.metaStorageManager;
+
+        ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+        assertTrue(reached.get());
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                Assignments::fromBytes,
+                null,
+                20_000L
+        );
+
+        stopNode(0);
+
+        metaStorageInvokeInterceptorByNode.clear();
+
+        startNodes(testInfo, 1);
+
+        node0 = getNode(0);
+
+        metaStorageManager = node0.metaStorageManager;
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> Assignments.fromBytes(v).nodes()
+                        
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toSet()),
+                20_000L
+        );
+
+        assertTrue(waitForCondition(() -> 
getNode(0).replicaManager.isReplicaStarted(partId), 10_000L));
+    }
+
+    @Test
+    void testStableAreWrittenAfterRestartAndConcurrentStableUpdate(TestInfo 
testInfo) throws Exception {
+        startNodes(testInfo, 1);
+
+        Node node0 = getNode(0);
+
+        AtomicBoolean reached = new AtomicBoolean();
+
+        metaStorageInvokeInterceptorByNode.put(0, (condition, success, 
failure) -> {
+            if (skipMetaStorageInvoke(success, STABLE_ASSIGNMENTS_PREFIX)) {
+                reached.set(true);
+
+                return true;
+            }
+
+            return null;
+        });
+
+        createZone(node0, "test_zone", 1, 3);
+
+        int zoneId = DistributionZonesTestUtil.getZoneId(node0.catalogManager, 
"test_zone", node0.hybridClock.nowLong());
+
+        MetaStorageManager metaStorageManager = node0.metaStorageManager;
+
+        ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+        assertTrue(reached.get());
+
+        reached.set(false);
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                Assignments::fromBytes,
+                null,
+                20_000L
+        );
+
+        stopNode(0);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        metaStorageInvokeInterceptorByNode.put(0, (condition, success, 
failure) -> {
+            if (skipMetaStorageInvoke(success, 
stablePartAssignmentsKey(partId).toString())) {
+                reached.set(true);
+
+                Node node = nodes.get(0);
+
+                node.metaStorageManager.put(stablePartAssignmentsKey(partId), 
Assignments.of(Assignment.forPeer(node.name)).toBytes());
+            }
+
+            return null;
+        });
+
+        startNodes(testInfo, 1);
+
+        node0 = getNode(0);
+
+        metaStorageManager = node0.metaStorageManager;
+
+        assertTrue(reached.get());
 
         assertValueInStorage(
                 metaStorageManager,
                 stablePartAssignmentsKey(partId),
-                (v) -> 
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
-                true,
+                (v) -> Assignments.fromBytes(v).nodes()
+                        
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toSet()),
                 20_000L
         );
+
+        assertTrue(waitForCondition(() -> 
getNode(0).replicaManager.isReplicaStarted(partId), 10_000L));
     }
 
     private Node getNode(int nodeIndex) {
@@ -523,7 +685,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
     }
 
     private Node getNode(String nodeName) {
-        return nodes.stream().filter(n -> 
n.name.equals(nodeName)).findFirst().get();
+        return nodes.values().stream().filter(n -> 
n.name.equals(nodeName)).findFirst().get();
     }
 
     private static void createZone(Node node, String zoneName, int partitions, 
int replicas) {
@@ -635,10 +797,10 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
         /**
          * Constructor that simply creates a subset of components of this node.
          */
-        Node(TestInfo testInfo, NetworkAddress addr, 
NodeAttributesConfiguration nodeAttributes) {
-            networkAddress = addr;
+        Node(TestInfo testInfo, int idx) {
+            networkAddress = nodeAddresses.get(idx);
 
-            name = testNodeName(testInfo, addr.port());
+            name = testNodeName(testInfo, networkAddress.port());
 
             Path dir = workDir.resolve(name);
 
@@ -672,7 +834,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
             clusterService = ClusterServiceTestUtils.clusterService(
                     testInfo,
-                    addr.port(),
+                    networkAddress.port(),
                     finder
             );
 
@@ -711,7 +873,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     clusterStateStorage,
                     logicalTopology,
                     clusterManagementConfiguration,
-                    new NodeAttributesCollector(nodeAttributes, 
storageConfiguration),
+                    new 
NodeAttributesCollector(nodeAttributesConfigurations.get(idx), 
storageConfiguration),
                     failureProcessor
             );
 
@@ -738,7 +900,26 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     metaStorageConfiguration,
                     raftConfiguration.retryTimeout(),
                     completedFuture(() -> DEFAULT_MAX_CLOCK_SKEW_MS)
-            );
+            ) {
+                @Override
+                public CompletableFuture<Boolean> invoke(
+                        Condition condition,
+                        Collection<Operation> success,
+                        Collection<Operation> failure
+                ) {
+                    InvokeInterceptor invokeInterceptor = 
metaStorageInvokeInterceptorByNode.get(idx);
+
+                    if (invokeInterceptor != null) {
+                        var res = invokeInterceptor.invoke(condition, success, 
failure);
+
+                        if (res != null) {
+                            return completedFuture(res);
+                        }
+                    }
+
+                    return super.invoke(condition, success, failure);
+                }
+            };
 
             threadPoolsManager = new ThreadPoolsManager(name);
 
@@ -818,7 +999,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     replicaSvc,
                     lockManager,
                     clockService,
-                    new TransactionIdGenerator(addr.port()),
+                    new TransactionIdGenerator(networkAddress.port()),
                     placementDriver,
                     partitionIdleSafeTimePropagationPeriodMsSupplier,
                     new TestLocalRwTxCounter(),
@@ -878,6 +1059,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     distributionZoneManager,
                     metaStorageManager,
                     clusterService.topologyService(),
+                    lowWatermark,
                     threadPoolsManager.tableIoExecutor(),
                     rebalanceScheduler
             );
@@ -1026,4 +1208,13 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
             clusterCfgGenerator.close();
         }
     }
+
+    @FunctionalInterface
+    private interface InvokeInterceptor {
+        Boolean invoke(Condition condition, Collection<Operation> success, 
Collection<Operation> failure);
+    }
+
+    private static boolean skipMetaStorageInvoke(Collection<Operation> ops, 
String prefix) {
+        return ops.stream().anyMatch(op -> new String(toByteArray(op.key()), 
StandardCharsets.UTF_8).startsWith(prefix));
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index fa5fc9679b..1b7e7db986 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -38,6 +38,7 @@ import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalan
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsGetLocally;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -50,6 +51,7 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -63,6 +65,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -76,12 +79,14 @@ import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.rebalance.PartitionMover;
 import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
 import 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -101,6 +106,7 @@ import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -129,6 +135,8 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
 
     private final TopologyService topologyService;
 
+    private final LowWatermark lowWatermark;
+
     /** Meta storage listener for pending assignments. */
     private final WatchListener pendingAssignmentsRebalanceListener;
 
@@ -170,6 +178,7 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             DistributionZoneManager distributionZoneMgr,
             MetaStorageManager metaStorageMgr,
             TopologyService topologyService,
+            LowWatermark lowWatermark,
             ExecutorService ioExecutor,
             ScheduledExecutorService rebalanceScheduler
     ) {
@@ -178,6 +187,7 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
         this.distributionZoneMgr = distributionZoneMgr;
         this.metaStorageMgr = metaStorageMgr;
         this.topologyService = topologyService;
+        this.lowWatermark = lowWatermark;
         this.ioExecutor = ioExecutor;
         this.rebalanceScheduler = rebalanceScheduler;
 
@@ -192,6 +202,17 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             return nullCompletedFuture();
         }
 
+        CompletableFuture<Long> recoveryFinishFuture = 
metaStorageMgr.recoveryFinishedFuture();
+
+        assert recoveryFinishFuture.isDone();
+
+        long recoveryRevision = recoveryFinishFuture.join();
+
+        cleanUpResourcesForDroppedZonesOnRecovery();
+
+        CompletableFuture<Void> processZonesAndAssignmentsOnStart = 
processZonesOnStart(recoveryRevision, lowWatermark.getLowWatermark())
+                .thenCompose(ignored -> 
processAssignmentsOnRecovery(recoveryRevision));
+
         
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 pendingAssignmentsRebalanceListener);
 
         
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
@@ -203,22 +224,121 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
                         inBusyLock(busyLock, () -> 
onCreateZone(parameters).thenApply((ignored) -> false))
         );
 
-        return nullCompletedFuture();
+        return processZonesAndAssignmentsOnStart;
+    }
+
+    private CompletableFuture<Void> processZonesOnStart(long recoveryRevision, 
@Nullable HybridTimestamp lwm) {
+        int earliestCatalogVersion = 
catalogMgr.activeCatalogVersion(hybridTimestampToLong(lwm));
+        // TODO https://issues.apache.org/jira/browse/IGNITE-22679
+        int latestCatalogVersion = catalogMgr.latestCatalogVersion();
+
+        var startedZones = new IntOpenHashSet();
+        var startZoneFutures = new ArrayList<CompletableFuture<?>>();
+
+        for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion; 
ver--) {
+            int ver0 = ver;
+            catalogMgr.zones(ver).stream()
+                    .filter(zone -> startedZones.add(zone.id()))
+                    .forEach(zoneDescriptor -> startZoneFutures.add(
+                            
calculateZoneAssignmentsAndCreateReplicationNodes(recoveryRevision, ver0, 
zoneDescriptor)));
+        }
+
+        return allOf(startZoneFutures.toArray(CompletableFuture[]::new))
+                .whenComplete((unused, throwable) -> {
+                    if (throwable != null) {
+                        LOG.error("Error starting zones", throwable);
+                    } else {
+                        LOG.debug(
+                                "Zones started successfully 
[earliestCatalogVersion={}, latestCatalogVersion={}, startedZoneIds={}]",
+                                earliestCatalogVersion,
+                                latestCatalogVersion,
+                                startedZones
+                        );
+                    }
+                });
+    }
+
+    private CompletableFuture<Void> processAssignmentsOnRecovery(long 
recoveryRevision) {
+        var stableAssignmentsPrefix = new ByteArray(STABLE_ASSIGNMENTS_PREFIX);
+        var pendingAssignmentsPrefix = new 
ByteArray(PENDING_ASSIGNMENTS_PREFIX);
+
+        // It's required to handle stable assignments changes on recovery in 
order to cleanup obsolete resources.
+        CompletableFuture<Void> stableFuture = handleAssignmentsOnRecovery(
+                stableAssignmentsPrefix,
+                recoveryRevision,
+                (entry, rev) -> handleChangeStableAssignmentEvent(entry, rev, 
true),
+                "stable"
+        );
+
+        CompletableFuture<Void> pendingFuture = handleAssignmentsOnRecovery(
+                pendingAssignmentsPrefix,
+                recoveryRevision,
+                (entry, rev) -> handleChangePendingAssignmentEvent(entry, rev, 
true),
+                "pending"
+        );
+
+        return allOf(stableFuture, pendingFuture);
+    }
+
+    private CompletableFuture<Void> handleAssignmentsOnRecovery(
+            ByteArray prefix,
+            long revision,
+            BiFunction<Entry, Long, CompletableFuture<Void>> 
assignmentsEventHandler,
+            String assignmentsType
+    ) {
+        try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, 
revision)) {
+            CompletableFuture<?>[] futures = cursor.stream()
+                    .map(entry -> {
+                        if (LOG.isInfoEnabled()) {
+                            LOG.info(
+                                    "Non handled {} assignments for key '{}' 
discovered, performing recovery",
+                                    assignmentsType,
+                                    new String(entry.key(), UTF_8)
+                            );
+                        }
+
+                        return assignmentsEventHandler.apply(entry, revision);
+                    })
+                    .toArray(CompletableFuture[]::new);
+
+            return allOf(futures)
+                    .whenComplete((res, e) -> {
+                        if (e != null) {
+                            LOG.error("Error when performing assignments 
recovery", e);
+                        }
+                    });
+        }
+    }
+
+    private void cleanUpResourcesForDroppedZonesOnRecovery() {
+        // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones 
from vault and metastore
     }
 
     private CompletableFuture<Void> onCreateZone(CreateZoneEventParameters 
createZoneEventParameters) {
         // TODO: https://issues.apache.org/jira/browse/IGNITE-22535 start 
replica must be moved from metastore thread
+        return calculateZoneAssignmentsAndCreateReplicationNodes(
+                createZoneEventParameters.causalityToken(),
+                createZoneEventParameters.catalogVersion(),
+                createZoneEventParameters.zoneDescriptor()
+        );
+    }
+
+    private CompletableFuture<Void> 
calculateZoneAssignmentsAndCreateReplicationNodes(
+            long causalityToken,
+            int catalogVersion,
+            CatalogZoneDescriptor zoneDescriptor
+    ) {
         return inBusyLockAsync(busyLock, () -> {
             CompletableFuture<List<Assignments>> assignmentsFuture = 
getOrCreateAssignments(
-                    createZoneEventParameters.zoneDescriptor(),
-                    createZoneEventParameters.causalityToken(),
-                    createZoneEventParameters.catalogVersion()
+                    zoneDescriptor,
+                    causalityToken,
+                    catalogVersion
             );
 
             CompletableFuture<List<Assignments>> assignmentsFutureAfterInvoke =
-                    
writeZoneAssignmentsToMetastore(createZoneEventParameters.zoneDescriptor().id(),
 assignmentsFuture);
+                    writeZoneAssignmentsToMetastore(zoneDescriptor.id(), 
assignmentsFuture);
 
-            return createZoneReplicationNodes(assignmentsFutureAfterInvoke, 
createZoneEventParameters.zoneDescriptor().id());
+            return createZoneReplicationNodes(assignmentsFutureAfterInvoke, 
zoneDescriptor.id());
         });
     }
 
@@ -260,6 +380,9 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             return nullCompletedFuture();
         }
 
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 We need to 
integrate PartitionReplicatorNodeRecovery logic here when
+        //  we in the recovery phase.
+
         PeersAndLearners stablePeersAndLearners = 
fromAssignments(stableAssignments.nodes());
 
         ZonePartitionId replicaGrpId = new ZonePartitionId(zoneId, partId);
@@ -285,7 +408,8 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
                     new FailFastSnapshotStorageFactory(),
                     stablePeersAndLearners,
                     raftGroupListener,
-                    raftGroupEventsListener
+                    raftGroupEventsListener,
+                    busyLock
             ).thenApply(ignored -> null);
         } catch (NodeStoppingException e) {
             return failedFuture(e);
@@ -772,7 +896,7 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             localServicesStartFuture = nullCompletedFuture();
         }
 
-        return localServicesStartFuture.thenRunAsync(() -> {
+        return localServicesStartFuture.thenRunAsync(() -> 
inBusyLock(busyLock, () -> {
             if (!replicaMgr.isReplicaStarted(replicaGrpId)) {
                 return;
             }
@@ -784,7 +908,7 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
                     : RebalanceUtil.union(pendingAssignmentsNodes, 
stableAssignments.nodes());
 
             
replicaMgr.replica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments));
-        }, ioExecutor);
+        }), ioExecutor);
     }
 
     private CompletableFuture<Void> changePeersOnRebalance(
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index fd3d919f64..bdf06f50bd 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -608,7 +608,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 updateTableRaftService,
                 createListener,
                 storageIndexTracker,
-                newRaftClientFut);
+                newRaftClientFut
+        );
     }
 
     /**
@@ -678,7 +679,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             SnapshotStorageFactory snapshotStorageFactory,
             PeersAndLearners newConfiguration,
             RaftGroupListener raftGroupListener,
-            RaftGroupEventsListener raftGroupEventsListener
+            RaftGroupEventsListener raftGroupEventsListener,
+            IgniteSpinBusyLock busyLock
     ) throws NodeStoppingException {
         RaftGroupOptions groupOptions = groupOptionsForPartition(
                 false,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index bbdaa853d0..736cd84bd2 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -748,6 +748,7 @@ public class IgniteImpl implements Ignite {
                 distributionZoneManager,
                 metaStorageMgr,
                 clusterSvc.topologyService(),
+                lowWatermark,
                 threadPoolsManager.tableIoExecutor(),
                 rebalanceScheduler
         );
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ff6c7ce7f0..da8639b6a8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -671,7 +671,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                     .map(entry -> {
                         if (LOG.isInfoEnabled()) {
                             LOG.info(
-                                    "Missed {} assignments for key '{}' 
discovered, performing recovery",
+                                    "Non handled {} assignments for key '{}' 
discovered, performing recovery",
                                     assignmentsType,
                                     new String(entry.key(), UTF_8)
                             );

Reply via email to