This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8537e7e45d IGNITE-22916 Validate Metastorage for divergency during 
join (#4624)
8537e7e45d is described below

commit 8537e7e45de6e35434b5455506137cc900ea7049
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Sat Oct 26 10:30:24 2024 +0400

    IGNITE-22916 Validate Metastorage for divergency during join (#4624)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 +
 .../management/ClusterManagementGroupManager.java  |  14 +-
 .../cluster/management/MetaStorageInfo.java        |  17 +--
 .../raft/ClusterStateStorageManager.java           |  19 +--
 .../management/raft/CmgRaftGroupListener.java      |   7 +-
 .../cluster/management/raft/CmgRaftService.java    |   8 +-
 .../commands/ChangeMetaStorageInfoCommand.java     |   7 -
 .../management/raft/CmgRaftGroupListenerTest.java  |   9 +-
 .../impl/ItIdempotentCommandCacheTest.java         |   2 +
 .../impl/ItMetaStorageMaintenanceTest.java         |   2 +
 .../impl/ItMetaStorageManagerImplTest.java         |   2 +
 .../ItMetaStorageMultipleNodesAbstractTest.java    |  11 +-
 .../ItMetaStorageMultipleNodesVsStorageTest.java   |  18 +++
 .../metastorage/impl/ItMetaStorageWatchTest.java   |  16 ++-
 .../service/ItAbstractListenerSnapshotTest.java    |   4 +-
 .../metastorage/command/GetChecksumCommand.java    |  35 +++++
 .../command/MetastorageCommandsMessageGroup.java   |   3 +
 .../metastorage/command/response/ChecksumInfo.java |  69 +++++++++
 .../metastorage/impl/MetaStorageManagerImpl.java   | 160 +++++++++++++++------
 .../metastorage/impl/MetaStorageService.java       |   8 ++
 .../metastorage/impl/MetaStorageServiceImpl.java   |  11 ++
 .../impl/MetastorageDivergedException.java         |  31 ++++
 .../impl/MetastorageDivergencyValidator.java       |  60 ++++++++
 .../metastorage/server/ChecksumAndRevisions.java   |  51 +++++++
 .../metastorage/server/KeyValueStorage.java        |   9 ++
 .../server/persistence/RocksDbKeyValueStorage.java |  45 ++++++
 .../server/raft/MetaStorageListener.java           |  11 ++
 .../MetaStorageDeployWatchesCorrectnessTest.java   |   2 +
 .../impl/MetaStorageManagerRecoveryTest.java       |   2 +
 .../impl/MetastorageDivergencyValidatorTest.java   |  87 +++++++++++
 .../server/RocksDbKeyValueStorageTest.java         |  51 +++++++
 .../impl/StandaloneMetaStorageManager.java         |  19 ++-
 .../server/SimpleInMemoryKeyValueStorage.java      |   5 +
 .../MultiActorPlacementDriverTest.java             |   2 +
 .../PlacementDriverManagerTest.java                |   2 +
 modules/platforms/cpp/ignite/common/error_codes.h  |   1 +
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   1 +
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   3 +
 .../ItDistributedConfigurationPropertiesTest.java  |  21 ++-
 .../ItDistributedConfigurationStorageTest.java     |  22 ++-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   9 +-
 modules/system-disaster-recovery/build.gradle      |   1 +
 .../disaster/system/ItCmgDisasterRecoveryTest.java |   4 -
 .../ItMetastorageGroupDisasterRecoveryTest.java    | 107 +++++++++++++-
 .../system/ItSystemGroupDisasterRecoveryTest.java  |   7 +-
 .../rebalance/ItRebalanceDistributedTest.java      |   2 +
 46 files changed, 839 insertions(+), 141 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 327a5972b1..1e48973122 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -305,6 +305,9 @@ public class ErrorGroups {
 
         /** Failed to perform a read operation on the underlying key value 
storage because the revision has already been compacted. */
         public static final int COMPACTED_ERR = 
META_STORAGE_ERR_GROUP.registerErrorCode((short) 6);
+
+        /** Failed to start a node because Metastorage has diverged. The node 
has to be cleared and entered the cluster as a blank node. */
+        public static final int DIVERGED_ERR = 
META_STORAGE_ERR_GROUP.registerErrorCode((short) 7);
     }
 
     /** Index error group. */
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 233b7da41a..d2b2eb023c 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.cluster.management;
 
-import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
@@ -1090,25 +1089,14 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             return failedFuture(new NodeStoppingException());
         }
 
-        UUID metastorageRepairClusterId = metastorageRepairingConfigIndex == 
null ? null : requiredClusterId();
-
         try {
             return raftServiceAfterJoin()
-                    .thenCompose(service -> service.changeMetastorageNodes(
-                            newMetastorageNodes,
-                            metastorageRepairClusterId,
-                            metastorageRepairingConfigIndex
-                    ));
+                    .thenCompose(service -> 
service.changeMetastorageNodes(newMetastorageNodes, 
metastorageRepairingConfigIndex));
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private UUID requiredClusterId() {
-        ClusterState clusterState = clusterStateStorageMgr.getClusterState();
-        return requireNonNull(clusterState, "Still no cluster 
state.").clusterTag().clusterId();
-    }
-
     /**
      * Returns a future resolving to the initial cluster configuration in 
HOCON format. The resulting configuration may be {@code null} if
      * not provided by the user.
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/MetaStorageInfo.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/MetaStorageInfo.java
index 387d17c95e..a9c43df86e 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/MetaStorageInfo.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/MetaStorageInfo.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.cluster.management;
 
 import java.io.Serializable;
 import java.util.Set;
-import java.util.UUID;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
@@ -35,15 +34,6 @@ public interface MetaStorageInfo extends NetworkMessage, 
Serializable {
      */
     Set<String> metaStorageNodes();
 
-    /**
-     * ID that the cluster had when MG was repaired (if it was repaired for 
this cluster ID), or {@code null} if no MG repair
-     * happened in the current cluster incarnation.
-     *
-     * <p>This can only contain current cluster ID; if there were earlier MG 
repairs, they happened in other incarnations of the cluster,
-     * so they will not leave a trace here.
-     */
-    @Nullable UUID metastorageRepairClusterId();
-
     /**
      * Raft index in the Metastorage group under which the forced 
configuration is (or will be) saved, or {@code null} if no MG
      * repair happened in the current cluster incarnation.
@@ -53,4 +43,11 @@ public interface MetaStorageInfo extends NetworkMessage, 
Serializable {
      * a trace here.
      */
     @Nullable Long metastorageRepairingConfigIndex();
+
+    /**
+     * Returns whether MG was repaired in this cluster incarnation.
+     */
+    default boolean metastorageRepairedInThisClusterIncarnation() {
+        return metastorageRepairingConfigIndex() != null;
+    }
 }
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
index d4365a105b..895cec9460 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.cluster.management.raft;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
-import static org.apache.ignite.internal.util.ByteUtils.bytesToUuid;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
 import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
 
@@ -45,7 +44,6 @@ public class ClusterStateStorageManager {
     /** Prefix for validation tokens. */
     private static final byte[] VALIDATED_NODE_PREFIX = 
"validation_".getBytes(UTF_8);
 
-    private static final byte[] METASTORAGE_REPAIR_CLUSTER_ID_KEY = 
"metastorageRepairClusterId".getBytes(UTF_8);
     private static final byte[] METASTORAGE_REPAIRING_CONFIG_INDEX_KEY = 
"metastorageRepairingConfigIndex".getBytes(UTF_8);
 
     private final ClusterStateStorage storage;
@@ -117,23 +115,10 @@ public class ClusterStateStorageManager {
     /**
      * Saves information about Metastorage repair.
      *
-     * @param repairClusterId ID that the cluster has when performaing the 
repair.
      * @param repairingConfigIndex Raft index in the Metastorage group under 
which the forced configuration is (or will be) saved.
      */
-    void saveMetastorageRepairInfo(UUID repairClusterId, long 
repairingConfigIndex) {
-        storage.putAll(
-                List.of(METASTORAGE_REPAIR_CLUSTER_ID_KEY, 
METASTORAGE_REPAIRING_CONFIG_INDEX_KEY),
-                List.of(uuidToBytes(repairClusterId), 
longToBytes(repairingConfigIndex))
-        );
-    }
-
-    /**
-     * Returns ID that the cluster had when MG was repaired (if it was 
repaired for this cluster ID), or {@code null} if no MG repair
-     * happened in the current cluster incarnation.
-     */
-    @Nullable UUID getMetastorageRepairClusterId() {
-        byte[] bytes = storage.get(METASTORAGE_REPAIR_CLUSTER_ID_KEY);
-        return bytes == null ? null : bytesToUuid(bytes);
+    void saveMetastorageRepairInfo(long repairingConfigIndex) {
+        storage.put(METASTORAGE_REPAIRING_CONFIG_INDEX_KEY, 
longToBytes(repairingConfigIndex));
     }
 
     /**
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index 297ad77350..d358e142bd 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -158,7 +158,6 @@ public class CmgRaftGroupListener implements 
RaftGroupListener {
 
         return cmgMessagesFactory.metaStorageInfo()
                 .metaStorageNodes(clusterState.metaStorageNodes())
-                
.metastorageRepairClusterId(storageManager.getMetastorageRepairClusterId())
                 
.metastorageRepairingConfigIndex(storageManager.getMetastorageRepairingConfigIndex())
                 .build();
     }
@@ -322,10 +321,8 @@ public class CmgRaftGroupListener implements 
RaftGroupListener {
 
         storageManager.putClusterState(newState);
 
-        assert (command.metastorageRepairClusterId() == null) == 
(command.metastorageRepairingConfigIndex() == null)
-                : "Repair-related properties must either all be present or all 
be absent [command=" + command + "]";
-        if (command.metastorageRepairClusterId() != null) {
-            
storageManager.saveMetastorageRepairInfo(command.metastorageRepairClusterId(), 
command.metastorageRepairingConfigIndex());
+        if (command.metastorageRepairingConfigIndex() != null) {
+            
storageManager.saveMetastorageRepairInfo(command.metastorageRepairingConfigIndex());
         }
     }
 
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index f632d2ffcf..f2a38a7c8c 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -25,7 +25,6 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -333,14 +332,9 @@ public class CmgRaftService implements ManuallyCloseable {
      *
      * @return Future that completes when the change is finished.
      */
-    public CompletableFuture<Void> changeMetastorageNodes(
-            Set<String> newMetastorageNodes,
-            @Nullable UUID metastorageRepairClusterId,
-            @Nullable Long metastorageRepairingConfigIndex
-    ) {
+    public CompletableFuture<Void> changeMetastorageNodes(Set<String> 
newMetastorageNodes, @Nullable Long metastorageRepairingConfigIndex) {
         ChangeMetaStorageInfoCommand command = 
msgFactory.changeMetaStorageInfoCommand()
                 .metaStorageNodes(Set.copyOf(newMetastorageNodes))
-                .metastorageRepairClusterId(metastorageRepairClusterId)
                 
.metastorageRepairingConfigIndex(metastorageRepairingConfigIndex)
                 .build();
         return raftService.run(command);
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ChangeMetaStorageInfoCommand.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ChangeMetaStorageInfoCommand.java
index 5e4046cc82..7337d678d9 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ChangeMetaStorageInfoCommand.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ChangeMetaStorageInfoCommand.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.cluster.management.raft.commands;
 
 import java.util.Set;
-import java.util.UUID;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import org.apache.ignite.internal.raft.WriteCommand;
@@ -38,11 +37,5 @@ public interface ChangeMetaStorageInfoCommand extends 
WriteCommand {
      * Raft index in the Metastorage group under which the forced 
configuration is (or will be) saved, or {@code null} if no MG
      * repair happened in the current cluster incarnation.
      */
-    @Nullable UUID metastorageRepairClusterId();
-
-    /**
-     * ID that the cluster had when MG was repaired (if it was repaired for 
this cluster ID), or {@code null} if no MG repair
-     * happened in the current cluster incarnation.
-     */
     @Nullable Long metastorageRepairingConfigIndex();
 }
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
index 6752c0eb0c..36c0ae34c8 100644
--- 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
@@ -42,7 +42,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
 import java.util.function.LongConsumer;
 import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
 import org.apache.ignite.internal.cluster.management.ClusterState;
@@ -217,9 +216,7 @@ public class CmgRaftGroupListenerTest extends 
BaseIgniteAbstractTest {
 
     @Test
     void changeMetastorageInfoChangesMsInfo() {
-        UUID metastorageRepairClusterId = randomUUID();
-
-        initCmgAndChangeMgInfo(metastorageRepairClusterId);
+        initCmgAndChangeMgInfo();
 
         ClusterState updatedState = 
listener.storageManager().getClusterState();
         assertThat(updatedState, is(notNullValue()));
@@ -231,11 +228,10 @@ public class CmgRaftGroupListenerTest extends 
BaseIgniteAbstractTest {
         assertThat(updatedState.initialClusterConfiguration(), 
is(state.initialClusterConfiguration()));
         assertThat(updatedState.formerClusterIds(), 
is(state.formerClusterIds()));
 
-        assertThat(listener.storageManager().getMetastorageRepairClusterId(), 
is(metastorageRepairClusterId));
         
assertThat(listener.storageManager().getMetastorageRepairingConfigIndex(), 
is(123L));
     }
 
-    private void initCmgAndChangeMgInfo(UUID metastorageRepairClusterId) {
+    private void initCmgAndChangeMgInfo() {
         listener.onWrite(iterator(
                 msgFactory.initCmgStateCommand()
                         .clusterState(state)
@@ -246,7 +242,6 @@ public class CmgRaftGroupListenerTest extends 
BaseIgniteAbstractTest {
         listener.onWrite(iterator(
                 msgFactory.changeMetaStorageInfoCommand()
                         .metaStorageNodes(Set.of("new-ms-1", "new-ms-2"))
-                        .metastorageRepairClusterId(metastorageRepairClusterId)
                         .metastorageRepairingConfigIndex(123L)
                         .build()
         ));
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index 3bc09ab916..dc867d2fd7 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
+import static 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.configureCmgManagerToStartMetastorage;
 import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -205,6 +206,7 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
             );
 
             cmgManager = mock(ClusterManagementGroupManager.class);
+            configureCmgManagerToStartMetastorage(cmgManager);
 
             ComponentWorkingDir metastorageWorkDir = new 
ComponentWorkingDir(workDir.resolve("metastorage" + index));
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
index b4e45c0ec1..e01535301e 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
@@ -86,6 +86,8 @@ class ItMetaStorageMaintenanceTest extends 
ItMetaStorageMultipleNodesAbstractTes
                 willCompleteSuccessfully()
         );
 
+        startMetastorageOn(List.of(node0, node1, node2));
+
         node0.waitWatches();
         node1.waitWatches();
         node2.waitWatches();
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index eae17da0ad..cee185e0e5 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.metastorage.impl;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.configureCmgManagerToStartMetastorage;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
@@ -169,6 +170,7 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
         when(cmgManager.metaStorageInfo()).thenReturn(completedFuture(
                 new 
CmgMessagesFactory().metaStorageInfo().metaStorageNodes(Set.of(clusterService.nodeName())).build()
         ));
+        configureCmgManagerToStartMetastorage(cmgManager);
 
         ComponentWorkingDir metastorageWorkDir = new 
ComponentWorkingDir(workDir.resolve("metastorage"));
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 5cdbae7848..9e72cbb610 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -107,6 +107,14 @@ abstract class ItMetaStorageMultipleNodesAbstractTest 
extends IgniteAbstractTest
         return new SimpleInMemoryKeyValueStorage(nodeName, 
readOperationForCompactionTracker);
     }
 
+    void startMetastorageOn(List<Node> nodes) {
+        ComponentContext componentContext = new ComponentContext();
+
+        for (Node node : nodes) {
+            assertThat(node.metaStorageManager.startAsync(componentContext), 
willCompleteSuccessfully());
+        }
+    }
+
     class Node {
         private final VaultManager vaultManager;
 
@@ -234,8 +242,7 @@ abstract class ItMetaStorageMultipleNodesAbstractTest 
extends IgniteAbstractTest
                     raftManager,
                     clusterStateStorage,
                     failureManager,
-                    cmgManager,
-                    metaStorageManager
+                    cmgManager
             );
 
             assertThat(startAsync(new ComponentContext(), components), 
willCompleteSuccessfully());
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesVsStorageTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesVsStorageTest.java
index 78692cd385..eed9a4780e 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesVsStorageTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesVsStorageTest.java
@@ -80,6 +80,8 @@ abstract class ItMetaStorageMultipleNodesVsStorageTest 
extends ItMetaStorageMult
 
         firstNode.cmgManager.initCluster(List.of(firstNode.name()), 
List.of(firstNode.name()), "test");
 
+        startMetastorageOn(List.of(firstNode));
+
         firstNode.waitWatches();
 
         var key = new ByteArray("foo");
@@ -91,6 +93,8 @@ abstract class ItMetaStorageMultipleNodesVsStorageTest 
extends ItMetaStorageMult
 
         Node secondNode = startNode();
 
+        startMetastorageOn(List.of(secondNode));
+
         secondNode.waitWatches();
 
         // Check that reading remote data works correctly.
@@ -142,6 +146,10 @@ abstract class ItMetaStorageMultipleNodesVsStorageTest 
extends ItMetaStorageMult
 
         firstNode.cmgManager.initCluster(List.of(firstNode.name()), 
List.of(firstNode.name()), "test");
 
+        assertThat(allOf(firstNode.cmgManager.onJoinReady(), 
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
+
+        startMetastorageOn(List.of(firstNode, secondNode));
+
         firstNode.waitWatches();
         secondNode.waitWatches();
 
@@ -171,6 +179,8 @@ abstract class ItMetaStorageMultipleNodesVsStorageTest 
extends ItMetaStorageMult
 
         assertThat(allOf(firstNode.cmgManager.onJoinReady(), 
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
 
+        startMetastorageOn(List.of(firstNode, secondNode));
+
         firstNode.waitWatches();
         secondNode.waitWatches();
 
@@ -217,6 +227,8 @@ abstract class ItMetaStorageMultipleNodesVsStorageTest 
extends ItMetaStorageMult
 
         assertThat(allOf(firstNode.cmgManager.onJoinReady(), 
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
 
+        startMetastorageOn(List.of(firstNode, secondNode));
+
         firstNode.waitWatches();
         secondNode.waitWatches();
 
@@ -299,6 +311,8 @@ abstract class ItMetaStorageMultipleNodesVsStorageTest 
extends ItMetaStorageMult
 
         assertThat(allOf(firstNode.cmgManager.onJoinReady(), 
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
 
+        startMetastorageOn(List.of(firstNode, secondNode));
+
         firstNode.waitWatches();
         secondNode.waitWatches();
 
@@ -345,6 +359,8 @@ abstract class ItMetaStorageMultipleNodesVsStorageTest 
extends ItMetaStorageMult
         assertThat(firstNode.cmgManager.onJoinReady(), 
willCompleteSuccessfully());
         assertThat(secondNode.cmgManager.onJoinReady(), 
willCompleteSuccessfully());
 
+        startMetastorageOn(List.of(firstNode, secondNode));
+
         firstNode.waitWatches();
         secondNode.waitWatches();
 
@@ -372,6 +388,8 @@ abstract class ItMetaStorageMultipleNodesVsStorageTest 
extends ItMetaStorageMult
         assertThat(firstNode.cmgManager.onJoinReady(), 
willCompleteSuccessfully());
         assertThat(secondNode.cmgManager.onJoinReady(), 
willCompleteSuccessfully());
 
+        startMetastorageOn(List.of(firstNode, secondNode));
+
         firstNode.waitWatches();
         secondNode.waitWatches();
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index 8452e4f31e..d3f01b1949 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -245,8 +245,6 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
                     msRaftConfigurer,
                     readOperationForCompactionTracker
             );
-
-            components.add(metaStorageManager);
         }
 
         void start() {
@@ -258,12 +256,15 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
         }
 
         void stop() throws Exception {
-            Collections.reverse(components);
+            List<IgniteComponent> componentsToStop = new 
ArrayList<>(components);
+            componentsToStop.add(metaStorageManager);
+
+            Collections.reverse(componentsToStop);
 
-            Stream<AutoCloseable> beforeNodeStop = components.stream().map(c 
-> c::beforeNodeStop);
+            Stream<AutoCloseable> beforeNodeStop = 
componentsToStop.stream().map(c -> c::beforeNodeStop);
 
             Stream<AutoCloseable> nodeStop = Stream.of(() ->
-                    assertThat(stopAsync(new ComponentContext(), components), 
willCompleteSuccessfully())
+                    assertThat(stopAsync(new ComponentContext(), 
componentsToStop), willCompleteSuccessfully())
             );
 
             IgniteUtils.closeAll(Stream.concat(beforeNodeStop, nodeStop));
@@ -302,6 +303,11 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
 
         nodes.get(0).cmgManager.initCluster(List.of(name), List.of(name), 
"test");
 
+        for (Node node : nodes) {
+            assertThat(node.cmgManager.onJoinReady(), 
willCompleteSuccessfully());
+            assertThat(node.metaStorageManager.startAsync(new 
ComponentContext()), willCompleteSuccessfully());
+        }
+
         for (Node node : nodes) {
             assertThat(node.metaStorageManager.recoveryFinishedFuture(), 
willCompleteSuccessfully());
         }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
index 5ec5623e56..848c002ecb 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
@@ -461,8 +461,8 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
                 createListener(service, componentWorkDir.dbPath()),
                 defaults()
                         .commandsMarshaller(commandsMarshaller(service))
-                        .setLogStorageFactory(logStorageFactories.get(idx))
-                        .serverDataPath(workingDirs.get(idx).metaPath())
+                        .setLogStorageFactory(partitionsLogStorageFactory)
+                        .serverDataPath(componentWorkDir.metaPath())
         );
 
         return server;
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetChecksumCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetChecksumCommand.java
new file mode 100644
index 0000000000..31f6a3b1a0
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetChecksumCommand.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.command;
+
+import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.raft.ReadCommand;
+
+/**
+ * Get command for MetaStorageCommandListener that retrieves information 
({@link ChecksumInfo}) about a checksum.
+ *
+ * @see ChecksumInfo
+ */
+@Transferable(MetastorageCommandsMessageGroup.GET_CHECKSUM)
+public interface GetChecksumCommand extends ReadCommand {
+    /**
+     * Revision for which to obtain a checksum.
+     */
+    long revision();
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
index 040db1118d..86f87e7846 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
@@ -41,6 +41,9 @@ public interface MetastorageCommandsMessageGroup {
     /** Message type for {@link GetCurrentRevisionCommand}. */
     short GET_CURRENT_REVISION = 33;
 
+    /** Message type for {@link GetChecksumCommand}. */
+    short GET_CHECKSUM = 34;
+
     /** Message type for {@link PutCommand}. */
     short PUT = 40;
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/response/ChecksumInfo.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/response/ChecksumInfo.java
new file mode 100644
index 0000000000..2fd174379a
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/response/ChecksumInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.command.response;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Information about checksum for a revision of the Metastorage.
+ */
+public class ChecksumInfo implements Serializable {
+    private static final long serialVersionUID = 8681846172504003981L;
+
+    private final long checksum;
+    private final long minRevision;
+    private final long maxRevision;
+
+    /**
+     * Constructor.
+     */
+    public ChecksumInfo(long checksum, long minRevision, long maxRevision) {
+        this.checksum = checksum;
+        this.minRevision = minRevision;
+        this.maxRevision = maxRevision;
+    }
+
+    /**
+     * The checksum corresponding to the requested revision, or 0 if it does 
not fit the [{@link #minRevision()}-{@link #maxRevision()}]
+     * interval.
+     */
+    public long checksum() {
+        return checksum;
+    }
+
+    /**
+     * Minimum revision for which the leader has checksum info (or 0 if the 
cluster has not been yet initialized).
+     * This is usually first not-yet-compacted revision.
+     */
+    public long minRevision() {
+        return minRevision;
+    }
+
+    /**
+     * Maximum revision for which the leader has checksum info (this matches 
current revision).
+     */
+    public long maxRevision() {
+        return maxRevision;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 42006ec065..55bbfecde0 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -44,6 +44,7 @@ import java.util.function.Function;
 import java.util.function.LongConsumer;
 import java.util.function.Supplier;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.ClusterState;
 import org.apache.ignite.internal.cluster.management.MetaStorageInfo;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
@@ -194,6 +195,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
     private final MetaStorageCompactionTrigger metaStorageCompactionTrigger;
 
+    private final MetastorageDivergencyValidator divergencyValidator = new 
MetastorageDivergencyValidator();
+
     /**
      * The constructor.
      *
@@ -390,15 +393,91 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         }
     }
 
-    private CompletableFuture<MetaStorageServiceImpl> 
initializeMetaStorage(MetaStorageInfo metaStorageInfo) {
+    private CompletableFuture<MetaStorageServiceImpl> 
reenterIfNeededAndInitializeMetaStorage(
+            MetaStorageInfo metaStorageInfo,
+            UUID currentClusterId
+    ) {
+        if (thisNodeDidNotWitnessMetaStorageRepair(metaStorageInfo, 
currentClusterId)) {
+            return 
tryReenteringMetastorage(metaStorageInfo.metaStorageNodes(), currentClusterId)
+                    .thenCompose(unused -> 
initializeMetastorage(metaStorageInfo));
+        } else {
+            return initializeMetastorage(metaStorageInfo);
+        }
+    }
+
+    /**
+     * Returns whether this node did not witness a Metastorage Repair in the 
current cluster incarnation. 'Did not witness' here means
+     * that the node neither participated in the repair (that is, provided 
information about its Metastorage group index+term to the repair
+     * conductor) nor it was migrated to the repaired cluster after the repair 
and passed through Metastorage vaidation (for divergence)
+     * and re-entry procedure.
+     *
+     * @param metaStorageInfo Information about Metastorage.
+     * @param currentClusterId Current cluster ID.
+     */
+    private boolean thisNodeDidNotWitnessMetaStorageRepair(MetaStorageInfo 
metaStorageInfo, UUID currentClusterId) {
+        UUID locallyWitnessedRepairClusterId = 
metastorageRepairStorage.readWitnessedMetastorageRepairClusterId();
+
+        return metaStorageInfo.metastorageRepairedInThisClusterIncarnation()
+                && !Objects.equals(locallyWitnessedRepairClusterId, 
currentClusterId);
+    }
+
+    private CompletableFuture<Void> tryReenteringMetastorage(Set<String> 
metastorageNodes, UUID currentClusterId) {
+        LOG.info("Trying to reenter Metastorage group");
+
+        return validateMetastorageForDivergence(metastorageNodes)
+                .thenRun(() -> prepareMetaStorageReentry(currentClusterId));
+    }
+
+    private CompletableFuture<Void> 
validateMetastorageForDivergence(Set<String> metastorageNodes) {
+        long localRevision = storage.revision();
+
+        if (localRevision == 0) {
+            // No revisions, so local Metastorage could not diverge.
+            return nullCompletedFuture();
+        }
+
+        return 
doWithOneOffRaftGroupService(PeersAndLearners.fromConsistentIds(metastorageNodes),
 raftClient -> {
+            return createMetaStorageService(raftClient).checksum(localRevision)
+                    .thenAccept(leaderChecksumInfo -> {
+                        long localChecksum = storage.checksum(localRevision);
+
+                        LOG.info(
+                                "Validating Metastorage for divergence 
[localRevision={}, localChecksum={}, leaderChecksumInfo={}",
+                                localRevision, localChecksum, 
leaderChecksumInfo
+                        );
+
+                        divergencyValidator.validate(localRevision, 
localChecksum, leaderChecksumInfo);
+
+                        LOG.info("Metastorage did not diverge, proceeding");
+                    });
+        });
+    }
+
+    private void prepareMetaStorageReentry(UUID currentClusterId) {
+        LOG.info("Preparing storages for reentry [clusterId={}]", 
currentClusterId);
+
         try {
-            if (thisNodeDidNotWitnessMetaStorageRepair(metaStorageInfo)) {
-                prepareMetaStorageReentry(metaStorageInfo);
-            }
+            destroyRaftAndStateMachineStorages();
         } catch (NodeStoppingException e) {
-            return failedFuture(e);
+            throw new RuntimeException(e);
         }
 
+        saveWitnessedMetastorageRepairClusterIdLocally(currentClusterId);
+    }
+
+    private void destroyRaftAndStateMachineStorages() throws 
NodeStoppingException {
+        raftMgr.destroyRaftNodeStorages(raftNodeId(), 
raftGroupOptionsConfigurer);
+
+        storage.clear();
+    }
+
+    private void saveWitnessedMetastorageRepairClusterIdLocally(UUID 
currentClusterId) {
+        assert currentClusterId != null;
+
+        
metastorageRepairStorage.saveWitnessedMetastorageRepairClusterId(currentClusterId);
+    }
+
+    private CompletableFuture<MetaStorageServiceImpl> 
initializeMetastorage(MetaStorageInfo metaStorageInfo) {
         String thisNodeName = clusterService.nodeName();
         var disruptorConfig = new 
RaftNodeDisruptorConfiguration("metastorage", 1);
 
@@ -417,44 +496,18 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
                 .thenApply(raftService -> {
                     raftServiceFuture.complete(raftService);
 
-                    return new MetaStorageServiceImpl(
-                            thisNodeName,
-                            raftService,
-                            busyLock,
-                            clusterTime,
-                            () -> 
clusterService.topologyService().localMember().id());
+                    return createMetaStorageService(raftService);
                 });
     }
 
-    private boolean thisNodeDidNotWitnessMetaStorageRepair(MetaStorageInfo 
metaStorageInfo) {
-        UUID repairClusterIdInCmg = 
metaStorageInfo.metastorageRepairClusterId();
-        UUID locallyWitnessedRepairClusterId = 
metastorageRepairStorage.readWitnessedMetastorageRepairClusterId();
-
-        return repairClusterIdInCmg != null && 
!Objects.equals(locallyWitnessedRepairClusterId, repairClusterIdInCmg);
-    }
-
-    private void prepareMetaStorageReentry(MetaStorageInfo metaStorageInfo) 
throws NodeStoppingException {
-        destroyRaftAndStateMachineStorages();
-        saveWitnessedMetastorageRepairClusterIdLocally(metaStorageInfo);
-    }
-
-    private void destroyRaftAndStateMachineStorages() throws 
NodeStoppingException {
-        raftMgr.destroyRaftNodeStorages(raftNodeId(), 
raftGroupOptionsConfigurer);
-
-        storage.clear();
-
-        // Here, we must destroy the storage, but it's already destroyed in 
the beginning of the startAsync() method (in its own #start()).
-        // Just to make sure this is maintained, we add an assertion.
-        assert storage.revision() == 0 : "It's expected that the storage is 
destroyed at startup, but now it's not (revision is "
-                + storage.revision() + "; if the flow has changed, this 
assertion has to be changed to actual storage destruction.";
-    }
-
-    private void 
saveWitnessedMetastorageRepairClusterIdLocally(MetaStorageInfo metaStorageInfo) 
{
-        UUID repairClusterId = metaStorageInfo.metastorageRepairClusterId();
-
-        assert repairClusterId != null;
-
-        
metastorageRepairStorage.saveWitnessedMetastorageRepairClusterId(repairClusterId);
+    private MetaStorageServiceImpl createMetaStorageService(RaftGroupService 
raftService) {
+        return new MetaStorageServiceImpl(
+                clusterService.nodeName(),
+                raftService,
+                busyLock,
+                clusterTime,
+                () -> clusterService.topologyService().localMember().id()
+        );
     }
 
     private CompletableFuture<? extends RaftGroupService> startVotingNode(
@@ -653,15 +706,19 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         storage.start();
 
         cmgMgr.metaStorageInfo()
-                .thenCompose(metaStorageInfo -> {
-                    LOG.info("Metastorage info on start is {}", 
metaStorageInfo);
+                .thenCombine(cmgMgr.clusterState(), 
MetaStorageInfoAndClusterState::new)
+                .thenCompose(infoAndState -> {
+                    LOG.info("Metastorage info on start is {}", 
infoAndState.metaStorageInfo);
 
                     if (!busyLock.enterBusy()) {
                         return failedFuture(new NodeStoppingException());
                     }
 
                     try {
-                        return initializeMetaStorage(metaStorageInfo);
+                        return reenterIfNeededAndInitializeMetaStorage(
+                                infoAndState.metaStorageInfo,
+                                
infoAndState.clusterState.clusterTag().clusterId()
+                        );
                     } finally {
                         busyLock.leaveBusy();
                     }
@@ -978,7 +1035,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
     @Override
     public Publisher<Entry> prefix(ByteArray keyPrefix) {
-        return prefix(keyPrefix, MetaStorageManager.LATEST_REVISION);
+        return prefix(keyPrefix, LATEST_REVISION);
     }
 
     @Override
@@ -1180,6 +1237,11 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         return inBusyLock(busyLock, storage::getCompactionRevision);
     }
 
+    @TestOnly
+    public KeyValueStorage storage() {
+        return storage;
+    }
+
     private <T> CompletableFuture<T> 
withTrackReadOperationFromLeaderFuture(Supplier<CompletableFuture<T>> 
readFromLeader) {
         long readOperationId = 
readOperationFromLeaderForCompactionTracker.generateReadOperationId();
         long compactionRevision = storage.getCompactionRevision();
@@ -1249,4 +1311,14 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
             throw t;
         }
     }
+
+    private static class MetaStorageInfoAndClusterState {
+        private final MetaStorageInfo metaStorageInfo;
+        private final ClusterState clusterState;
+
+        private MetaStorageInfoAndClusterState(MetaStorageInfo 
metaStorageInfo, ClusterState clusterState) {
+            this.metaStorageInfo = metaStorageInfo;
+            this.clusterState = clusterState;
+        }
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
index c2c5d41e92..577c3b6319 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -252,4 +253,11 @@ public interface MetaStorageService extends 
ManuallyCloseable {
      * Returns a future which will hold current revision of the metastorage 
leader.
      */
     CompletableFuture<Long> currentRevision();
+
+    /**
+     * Returns information about a revision checksum on the leader.
+     *
+     * @param revision Revision of interest.
+     */
+    CompletableFuture<ChecksumInfo> checksum(long revision);
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index dbd4e3f1a7..5f867370ff 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.command.CompactionCommand;
 import 
org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCacheCommand;
 import org.apache.ignite.internal.metastorage.command.GetAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetChecksumCommand;
 import org.apache.ignite.internal.metastorage.command.GetCommand;
 import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
 import org.apache.ignite.internal.metastorage.command.InvokeCommand;
@@ -49,6 +50,7 @@ import 
org.apache.ignite.internal.metastorage.command.PutCommand;
 import org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
 import org.apache.ignite.internal.metastorage.command.RemoveCommand;
 import org.apache.ignite.internal.metastorage.command.SyncTimeCommand;
+import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -264,6 +266,15 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
         return context.raftService().run(cmd);
     }
 
+    @Override
+    public CompletableFuture<ChecksumInfo> checksum(long revision) {
+        GetChecksumCommand cmd = context.commandsFactory().getChecksumCommand()
+                .revision(revision)
+                .build();
+
+        return context.raftService().run(cmd);
+    }
+
     /**
      * Removes obsolete entries from both volatile and persistent idempotent 
command cache.
      *
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageDivergedException.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageDivergedException.java
new file mode 100644
index 0000000000..66fe2720fb
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageDivergedException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.lang.ErrorGroups.MetaStorage;
+
+/**
+ * Thrown when Metastorage on a node doing Metastorage reentry has been found 
to be diverged with the leader.
+ */
+public class MetastorageDivergedException extends IgniteInternalException {
+    /** Constructor. */
+    public MetastorageDivergedException(String message) {
+        super(MetaStorage.DIVERGED_ERR, message);
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageDivergencyValidator.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageDivergencyValidator.java
new file mode 100644
index 0000000000..d64485db58
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageDivergencyValidator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
+
+/**
+ * Validates local Metastorage on divergency against the leader when a node 
tries to reenter the Metastorage group (after it missed
+ * the Metastorage repair).
+ */
+public class MetastorageDivergencyValidator {
+    /**
+     * Validates local Metastorage against leader's Metastorage to detect if 
the local one has diverged.
+     *
+     * @param revision Current revision locally.
+     * @param localChecksum Checksum corresponding to the local current 
revision.
+     * @param leaderChecksumInfo Checksum info obtained from the leader.
+     */
+    public void validate(long revision, long localChecksum, ChecksumInfo 
leaderChecksumInfo) {
+        if (leaderChecksumInfo.minRevision() == 0 || 
leaderChecksumInfo.maxRevision() == 0) {
+            throw new MetastorageDivergedException("Metastorage on leader does 
not have any checksums, this should not happen");
+        }
+
+        if (revision >= leaderChecksumInfo.minRevision() && revision <= 
leaderChecksumInfo.maxRevision()) {
+            if (localChecksum != leaderChecksumInfo.checksum()) {
+                throw new MetastorageDivergedException(String.format(
+                        "Metastorage has diverged [revision=%d, 
localChecksum=%d, leaderChecksum=%d",
+                        revision, localChecksum, leaderChecksumInfo.checksum()
+                ));
+            }
+        } else if (revision > leaderChecksumInfo.maxRevision()) {
+            throw new MetastorageDivergedException(String.format(
+                    "Node is ahead of the leader, this should not happen; 
probably means divergence [localRevision=%d, leaderRevision=%d]",
+                    revision, leaderChecksumInfo.maxRevision()
+            ));
+        } else {
+            assert revision < leaderChecksumInfo.minRevision();
+
+            throw new MetastorageDivergedException(String.format(
+                    "Node revision is already removed due to compaction on the 
leader [localRevision=%d, minLeaderRevision=%d]",
+                    revision, leaderChecksumInfo.minRevision()
+            ));
+        }
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ChecksumAndRevisions.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ChecksumAndRevisions.java
new file mode 100644
index 0000000000..c6bb32dab0
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ChecksumAndRevisions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+/**
+ * Information about a checksum and checksummed revisions.
+ */
+public class ChecksumAndRevisions {
+    private final long checksum;
+    private final long minChecksummedRevision;
+    private final long maxChecksummedRevision;
+
+    /**
+     * Constructor.
+     */
+    public ChecksumAndRevisions(long checksum, long minChecksummedRevision, 
long maxChecksummedRevision) {
+        this.checksum = checksum;
+        this.minChecksummedRevision = minChecksummedRevision;
+        this.maxChecksummedRevision = maxChecksummedRevision;
+    }
+
+    /** Checksum (or 0 if there is no checksum for the requested revision). */
+    public long checksum() {
+        return checksum;
+    }
+
+    /** Min revision that has a checksum (0 if there are no such revisions). */
+    public long minChecksummedRevision() {
+        return minChecksummedRevision;
+    }
+
+    /** Max revision that has a checksum (0 if there are no such revisions). */
+    public long maxChecksummedRevision() {
+        return maxChecksummedRevision;
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index e278e74850..4fef7a7cdb 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -566,6 +566,15 @@ public interface KeyValueStorage extends ManuallyCloseable 
{
      */
     long checksum(long revision);
 
+    /**
+     * Returns information about a checksum and checksummed revisions. Never 
throws a {@link CompactedException}; if the requested revision
+     * is compacted, just returns 0 as checksum (and the requested revision 
will not fall in
+     * {@link ChecksumAndRevisions#minChecksummedRevision()} - {@link 
ChecksumAndRevisions#maxChecksummedRevision()} interval).
+     *
+     * @param revision Revision for which to obtain a checksum.
+     */
+    ChecksumAndRevisions checksumAndRevisions(long revision);
+
     /**
      * Clears the content of the storage. Should only be called when no one 
else uses this storage.
      */
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 6e42e4ba7b..2f0c29b7da 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -82,6 +82,7 @@ import 
org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
 import org.apache.ignite.internal.metastorage.impl.EntryImpl;
 import org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.ChecksumAndRevisions;
 import org.apache.ignite.internal.metastorage.server.Condition;
 import org.apache.ignite.internal.metastorage.server.If;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
@@ -396,6 +397,16 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         return bytesToLong(bytes);
     }
 
+    private long checksumByRevisionOrZero(long revision) throws 
RocksDBException {
+        byte[] bytes = revisionToChecksum.get(longToBytes(revision));
+
+        if (bytes == null) {
+            return 0;
+        }
+
+        return bytesToLong(bytes);
+    }
+
     /**
      * Clear the RocksDB instance.
      *
@@ -1302,6 +1313,40 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         }
     }
 
+    @Override
+    public ChecksumAndRevisions checksumAndRevisions(long revision) {
+        rwLock.readLock().lock();
+
+        try {
+            return new ChecksumAndRevisions(
+                    checksumByRevisionOrZero(revision),
+                    minChecksummedRevisionOrZero(),
+                    rev
+            );
+        } catch (RocksDBException e) {
+            throw new MetaStorageException(INTERNAL_ERR, "Cannot get checksum 
by revision: " + revision, e);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private long minChecksummedRevisionOrZero() throws RocksDBException {
+        try (
+                var options = new ReadOptions().setTailing(true);
+                RocksIterator it = revisionToChecksum.newIterator(options)
+        ) {
+            it.seekToFirst();
+
+            if (it.isValid()) {
+                return bytesToLong(it.key());
+            } else {
+                it.status();
+
+                return 0;
+            }
+        }
+    }
+
     @Override
     public void clear() {
         // There's no way to easily remove all data from RocksDB, so we need 
to re-create it from scratch.
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index a2947cf298..f8c61f571f 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -30,12 +30,15 @@ import java.util.function.Consumer;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.command.GetAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetChecksumCommand;
 import org.apache.ignite.internal.metastorage.command.GetCommand;
 import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
 import org.apache.ignite.internal.metastorage.command.GetPrefixCommand;
 import org.apache.ignite.internal.metastorage.command.GetRangeCommand;
 import org.apache.ignite.internal.metastorage.command.PaginationCommand;
 import org.apache.ignite.internal.metastorage.command.response.BatchResponse;
+import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
+import org.apache.ignite.internal.metastorage.server.ChecksumAndRevisions;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Command;
@@ -150,6 +153,14 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
                     long revision = storage.revision();
 
                     clo.result(revision);
+                } else if (command instanceof GetChecksumCommand) {
+                    ChecksumAndRevisions checksumInfo = 
storage.checksumAndRevisions(((GetChecksumCommand) command).revision());
+
+                    clo.result(new ChecksumInfo(
+                            checksumInfo.checksum(),
+                            checksumInfo.minChecksummedRevision(),
+                            checksumInfo.maxChecksummedRevision()
+                    ));
                 } else {
                     assert false : "Command was not found [cmd=" + command + 
']';
                 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
index 97896da02f..7ef5a979a4 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.configureCmgManagerToStartMetastorage;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -77,6 +78,7 @@ public class MetaStorageDeployWatchesCorrectnessTest extends 
IgniteAbstractTest
         when(cmgManager.metaStorageInfo()).thenReturn(completedFuture(
                 new 
CmgMessagesFactory().metaStorageInfo().metaStorageNodes(Set.of(mcNodeName)).build()
         ));
+        configureCmgManagerToStartMetastorage(cmgManager);
         when(clusterService.nodeName()).thenReturn(mcNodeName);
         when(raftManager.startRaftGroupNodeAndWaitNodeReady(any(), any(), 
any(), any(), any(), any(), any()))
                 .thenReturn(raftGroupService);
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
index b914085b72..77ad87ff8f 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.configureCmgManagerToStartMetastorage;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext.kvContext;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
@@ -160,6 +161,7 @@ public class MetaStorageManagerRecoveryTest extends 
BaseIgniteAbstractTest {
         when(mock.metaStorageInfo()).thenReturn(completedFuture(
                 new 
CmgMessagesFactory().metaStorageInfo().metaStorageNodes(Set.of(LEADER_NAME)).build()
         ));
+        configureCmgManagerToStartMetastorage(mock);
 
         return mock;
     }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetastorageDivergencyValidatorTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetastorageDivergencyValidatorTest.java
new file mode 100644
index 0000000000..4498387022
--- /dev/null
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetastorageDivergencyValidatorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class MetastorageDivergencyValidatorTest {
+    private final MetastorageDivergencyValidator validator = new 
MetastorageDivergencyValidator();
+
+    @Test
+    void doesNothingIfLeaderHasNoChecksumsInfo() {
+        MetastorageDivergedException ex = assertThrows(
+                MetastorageDivergedException.class,
+                () -> validator.validate(1, 42, checksumInfo(0, 0, 0))
+        );
+
+        assertThat(ex.getMessage(), is("Metastorage on leader does not have 
any checksums, this should not happen"));
+    }
+
+    @ParameterizedTest
+    @ValueSource(longs = {1, 5, 10})
+    void validationSucceedsIfChecksumMatches(long revision) {
+        assertDoesNotThrow(() -> validator.validate(revision, 42, 
checksumInfo(42, 1, 10)));
+    }
+
+    @ParameterizedTest
+    @ValueSource(longs = {1, 5, 10})
+    void validationFailsIfChecksumDoesNotMatchForKnownRevision(long revision) {
+        MetastorageDivergedException ex = assertThrows(
+                MetastorageDivergedException.class,
+                () -> validator.validate(revision, 42, checksumInfo(123, 1, 
10))
+        );
+
+        assertThat(ex.getMessage(), is("Metastorage has diverged [revision=" + 
revision + ", localChecksum=42, leaderChecksum=123"));
+    }
+
+    @Test
+    void validationFailsIfNodeIsAheadLeader() {
+        MetastorageDivergedException ex = assertThrows(
+                MetastorageDivergedException.class,
+                () -> validator.validate(11, 42, checksumInfo(0, 1, 10))
+        );
+
+        assertThat(ex.getMessage(), is("Node is ahead of the leader, this 
should not happen; probably means divergence "
+                + "[localRevision=11, leaderRevision=10]"));
+    }
+
+    @Test
+    void validationFailsIfNodeIsBehindCompactionRevision() {
+        MetastorageDivergedException ex = assertThrows(
+                MetastorageDivergedException.class,
+                () -> validator.validate(9, 42, checksumInfo(0, 10, 20))
+        );
+
+        assertThat(
+                ex.getMessage(),
+                is("Node revision is already removed due to compaction on the 
leader [localRevision=9, minLeaderRevision=10]")
+        );
+    }
+
+    private static ChecksumInfo checksumInfo(long checksum, long minRevision, 
long maxRevision) {
+        return new ChecksumInfo(checksum, minRevision, maxRevision);
+    }
+}
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index 2157630920..b40028ed13 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -339,4 +339,55 @@ public class RocksDbKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTe
 
         return checksum.getValue();
     }
+
+    @Test
+    public void checksumAndRevisionsForChecksummedRevision() {
+        byte[] key = key(1);
+
+        putToMs(key, keyValue(1, 1));
+        putToMs(key, keyValue(1, 2));
+        putToMs(key, keyValue(1, 3));
+
+        ChecksumAndRevisions checksumAndRevisions = 
storage.checksumAndRevisions(2);
+
+        assertThat(checksumAndRevisions.checksum(), is(-3394571179261091112L));
+        assertThat(checksumAndRevisions.minChecksummedRevision(), is(1L));
+        assertThat(checksumAndRevisions.maxChecksummedRevision(), is(3L));
+    }
+
+    @Test
+    public void checksumAndRevisionsForEmptyStorage() {
+        ChecksumAndRevisions checksumAndRevisions = 
storage.checksumAndRevisions(1);
+
+        assertThat(checksumAndRevisions.checksum(), is(0L));
+        assertThat(checksumAndRevisions.minChecksummedRevision(), is(0L));
+        assertThat(checksumAndRevisions.maxChecksummedRevision(), is(0L));
+    }
+
+    @Test
+    public void checksumAndRevisionsForNotYetCreatedRevision() {
+        putToMs(key(1), keyValue(1, 1));
+
+        ChecksumAndRevisions checksumAndRevisions = 
storage.checksumAndRevisions(2);
+
+        assertThat(checksumAndRevisions.checksum(), is(0L));
+        assertThat(checksumAndRevisions.minChecksummedRevision(), is(1L));
+        assertThat(checksumAndRevisions.maxChecksummedRevision(), is(1L));
+    }
+
+    @Test
+    public void checksumAndRevisionsForCompactedRevision() {
+        byte[] key = key(1);
+
+        putToMs(key, keyValue(1, 1));
+        putToMs(key, keyValue(1, 2));
+
+        storage.compact(1);
+
+        ChecksumAndRevisions checksumAndRevisions = 
storage.checksumAndRevisions(1);
+
+        assertThat(checksumAndRevisions.checksum(), is(0L));
+        assertThat(checksumAndRevisions.minChecksummedRevision(), is(2L));
+        assertThat(checksumAndRevisions.maxChecksummedRevision(), is(2L));
+    }
 }
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 792d24827a..d42a274477 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -30,6 +30,8 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.configuration.ConfigurationValue;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.internal.cluster.management.ClusterTag;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -187,15 +189,30 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
     }
 
     private static ClusterManagementGroupManager mockClusterGroupManager() {
-
         ClusterManagementGroupManager cmgManager = 
mock(ClusterManagementGroupManager.class);
+
         when(cmgManager.metaStorageInfo()).thenReturn(completedFuture(
                 new 
CmgMessagesFactory().metaStorageInfo().metaStorageNodes(Set.of(TEST_NODE_NAME)).build()
         ));
 
+        configureCmgManagerToStartMetastorage(cmgManager);
+
         return cmgManager;
     }
 
+    /**
+     * Configures {@link ClusterManagementGroupManager} mock to return cluster 
state needed for {@link MetaStorageManagerImpl} start.
+     *
+     * @param cmgManagerMock Mock to configure.
+     */
+    public static void 
configureCmgManagerToStartMetastorage(ClusterManagementGroupManager 
cmgManagerMock) {
+        ClusterState clusterState = mock(ClusterState.class);
+        ClusterTag clusterTag = ClusterTag.randomClusterTag(new 
CmgMessagesFactory(), "cluster");
+
+        when(clusterState.clusterTag()).thenReturn(clusterTag);
+        
when(cmgManagerMock.clusterState()).thenReturn(completedFuture(clusterState));
+    }
+
     private static RaftManager mockRaftManager() {
         ArgumentCaptor<RaftGroupListener> listenerCaptor = 
ArgumentCaptor.forClass(RaftGroupListener.class);
         RaftManager raftManager = mock(RaftManager.class, LENIENT_SETTINGS);
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 2d2de27071..4078fe6602 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -780,6 +780,11 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public ChecksumAndRevisions checksumAndRevisions(long revision) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void clear() {
         rwLock.writeLock().lock();
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index 65a2ebf4ab..05e01aa27c 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.placementdriver;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.configureCmgManagerToStartMetastorage;
 import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -247,6 +248,7 @@ public class MultiActorPlacementDriverTest extends 
BasePlacementDriverTest {
             when(cmgManager.metaStorageInfo()).thenReturn(completedFuture(
                     new 
CmgMessagesFactory().metaStorageInfo().metaStorageNodes(metaStorageNodes).build()
             ));
+            configureCmgManagerToStartMetastorage(cmgManager);
 
             RaftGroupEventsClientListener eventsClientListener = new 
RaftGroupEventsClientListener();
 
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index 2fafc58b91..31f9cf2f58 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
 import static org.apache.ignite.internal.lang.ByteArray.fromString;
+import static 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.configureCmgManagerToStartMetastorage;
 import static 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
 import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -192,6 +193,7 @@ public class PlacementDriverManagerTest extends 
BasePlacementDriverTest {
         when(cmgManager.metaStorageInfo()).thenReturn(completedFuture(
                 new 
CmgMessagesFactory().metaStorageInfo().metaStorageNodes(metastorageNodes).build()
         ));
+        configureCmgManagerToStartMetastorage(cmgManager);
 
         RaftGroupEventsClientListener eventsClientListener = new 
RaftGroupEventsClientListener();
 
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
index e8bc214956..32f19740ec 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -113,6 +113,7 @@ enum class code : underlying_t {
     OP_EXECUTION = 0x50004,
     OP_EXECUTION_TIMEOUT = 0x50005,
     COMPACTED = 0x50006,
+    DIVERGED = 0x50007,
 
     // Index group. Group code: 6
     INVALID_INDEX_DEFINITION = 0x60001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp 
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index e027b159a0..981b07b33b 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -182,6 +182,7 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::COMPACTION:
             return sql_state::SHY000_GENERAL_ERROR;
         case error::code::COMPACTED:
+        case error::code::DIVERGED:
             return sql_state::SHY000_GENERAL_ERROR;
 
         // Index group. Group code: 6
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index c83d19ea5e..f9b7d3e206 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -232,6 +232,9 @@ namespace Apache.Ignite
 
             /// <summary> Compacted error. </summary>
             public const int Compacted = (GroupCode << 16) | (6 & 0xFFFF);
+
+            /// <summary> Diverged error. </summary>
+            public const int Diverged = (GroupCode << 16) | (7 & 0xFFFF);
         }
 
         /// <summary> Index errors. </summary>
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index f41c125ec6..1f1f249c5c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -278,7 +278,7 @@ public class ItDistributedConfigurationPropertiesTest 
extends BaseIgniteAbstract
         /**
          * Starts the created components.
          */
-        CompletableFuture<Void> start() {
+        void startUpToCmgManager() {
             assertThat(
                     startAsync(new ComponentContext(),
                             vaultManager,
@@ -288,11 +288,17 @@ public class ItDistributedConfigurationPropertiesTest 
extends BaseIgniteAbstract
                             msLogStorageFactory,
                             raftManager,
                             failureManager,
-                            cmgManager,
-                            metaStorageManager
+                            cmgManager
                     ),
                     willCompleteSuccessfully()
             );
+        }
+
+        CompletableFuture<Void> startComponentsAfterCmgManager() {
+            assertThat(
+                    startAsync(new ComponentContext(), metaStorageManager),
+                    willCompleteSuccessfully()
+            );
 
             return CompletableFuture.runAsync(() ->
                     assertThat(distributedCfgManager.startAsync(new 
ComponentContext()), willCompleteSuccessfully())
@@ -375,11 +381,16 @@ public class ItDistributedConfigurationPropertiesTest 
extends BaseIgniteAbstract
                 raftConfiguration
         );
 
-        CompletableFuture<?>[] startFutures = Stream.of(firstNode, 
secondNode).parallel().map(Node::start)
-                .toArray(CompletableFuture[]::new);
+        Stream.of(firstNode, 
secondNode).parallel().forEach(Node::startUpToCmgManager);
 
         firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(), 
"cluster");
 
+        assertThat(firstNode.cmgManager.onJoinReady(), 
willCompleteSuccessfully());
+        assertThat(secondNode.cmgManager.onJoinReady(), 
willCompleteSuccessfully());
+
+        CompletableFuture<?>[] startFutures = Stream.of(firstNode, 
secondNode).parallel().map(Node::startComponentsAfterCmgManager)
+                .toArray(CompletableFuture[]::new);
+
         assertThat(allOf(startFutures), willCompleteSuccessfully());
 
         Stream.of(firstNode, secondNode).parallel().forEach(Node::waitWatches);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 5ff4a1162f..e62df7085c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -230,7 +230,7 @@ public class ItDistributedConfigurationStorageTest extends 
BaseIgniteAbstractTes
         /**
          * Starts the created components.
          */
-        void start() {
+        void startUpToCmgManager() {
             assertThat(
                     startAsync(new ComponentContext(),
                             vaultManager,
@@ -240,11 +240,20 @@ public class ItDistributedConfigurationStorageTest 
extends BaseIgniteAbstractTes
                             msLogStorageFactory,
                             raftManager,
                             failureManager,
-                            cmgManager,
-                            metaStorageManager
+                            cmgManager
                     ),
                     willCompleteSuccessfully()
             );
+        }
+
+        /**
+         * Starts the created components.
+         */
+        void startComponentsAfterCmgManager() {
+            assertThat(
+                    startAsync(new ComponentContext(), metaStorageManager),
+                    willCompleteSuccessfully()
+            );
 
             // this is needed to avoid assertion errors
             cfgStorage.registerConfigurationListener(changedEntries -> 
nullCompletedFuture());
@@ -299,10 +308,12 @@ public class ItDistributedConfigurationStorageTest 
extends BaseIgniteAbstractTes
         Map<String, Serializable> data = Map.of("foo", "bar");
 
         try {
-            node.start();
+            node.startUpToCmgManager();
 
             node.cmgManager.initCluster(List.of(node.name()), List.of(), 
"cluster");
 
+            node.startComponentsAfterCmgManager();
+
             node.waitWatches();
 
             assertThat(node.cfgStorage.write(data, 0), willBe(equalTo(true)));
@@ -318,7 +329,8 @@ public class ItDistributedConfigurationStorageTest extends 
BaseIgniteAbstractTes
         var node2 = new Node(testInfo, workDir);
 
         try {
-            node2.start();
+            node2.startUpToCmgManager();
+            node2.startComponentsAfterCmgManager();
 
             node2.waitWatches();
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c5e20183e6..55f8e981dc 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -364,6 +364,8 @@ public class IgniteImpl implements Ignite {
 
     private final LogicalTopologyService logicalTopologyService;
 
+    private final ComponentWorkingDir metastorageWorkDir;
+
     /** Client handler module. */
     private final ClientHandlerModule clientHandlerModule;
 
@@ -665,7 +667,7 @@ public class IgniteImpl implements Ignite {
                 raftGroupEventsClientListener
         );
 
-        ComponentWorkingDir metastorageWorkDir = 
metastoragePath(systemConfiguration, workDir);
+        metastorageWorkDir = metastoragePath(systemConfiguration, workDir);
 
         msLogStorageFactory = SharedLogStorageFactoryUtils.create(
                 "meta-storage log",
@@ -1791,6 +1793,11 @@ public class IgniteImpl implements Ignite {
         return clockService;
     }
 
+    @TestOnly
+    public ComponentWorkingDir metastorageWorkDir() {
+        return metastorageWorkDir;
+    }
+
     /** Returns the node's transaction manager. */
     @TestOnly
     public TxManager txManager() {
diff --git a/modules/system-disaster-recovery/build.gradle 
b/modules/system-disaster-recovery/build.gradle
index 43cdb5be46..a4e70f4790 100644
--- a/modules/system-disaster-recovery/build.gradle
+++ b/modules/system-disaster-recovery/build.gradle
@@ -55,6 +55,7 @@ dependencies {
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation testFixtures(project(':ignite-api'))
     integrationTestImplementation testFixtures(project(':ignite-runner'))
+    integrationTestImplementation 
testFixtures(project(':ignite-failure-handler'))
     integrationTestImplementation libs.jetbrains.annotations
 }
 
diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
index a22c15f97a..0bbdeb2481 100644
--- 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
@@ -177,10 +177,6 @@ class ItCmgDisasterRecoveryTest extends 
ItSystemGroupDisasterRecoveryTest {
         assertTopologyContainsNode(0, topologySnapshot);
     }
 
-    private void assertTopologyContainsNode(int nodeIndex, 
LogicalTopologySnapshot topologySnapshot) {
-        assertTrue(topologySnapshot.nodes().stream().anyMatch(node -> 
node.name().equals(cluster.nodeName(nodeIndex))));
-    }
-
     @Test
     void migratesManyNodesThatSawNoReparationToNewCluster() throws Exception {
         startAndInitCluster(5, new int[]{0, 1, 2}, new int[]{2, 3, 4});
diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
index 2226b4123a..d36f122f29 100644
--- 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.disaster.system;
 
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -29,22 +31,28 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ComponentWorkingDir;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import 
org.apache.ignite.internal.metastorage.impl.MetastorageDivergedException;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.Test;
 
 class ItMetastorageGroupDisasterRecoveryTest extends 
ItSystemGroupDisasterRecoveryTest {
@@ -186,8 +194,6 @@ class ItMetastorageGroupDisasterRecoveryTest extends 
ItSystemGroupDisasterRecove
         // This makes both CMG and MG majorities go away.
         cluster.stopNode(1);
 
-        IgniteImpl igniteImpl0BeforeRestart = igniteImpl(0);
-
         initiateMgRepairVia(0, 1, 0);
 
         IgniteImpl restartedIgniteImpl0 = waitTillNodeRestartsInternally(0);
@@ -230,7 +236,7 @@ class ItMetastorageGroupDisasterRecoveryTest extends 
ItSystemGroupDisasterRecove
     }
 
     private static RaftGroupService metastorageGroupClient(IgniteImpl ignite)
-            throws NodeStoppingException, ExecutionException, 
InterruptedException, TimeoutException {
+            throws NodeStoppingException {
         PeersAndLearners config = 
PeersAndLearners.fromConsistentIds(Set.of(ignite.name()));
         return 
ignite.raftManager().startRaftGroupService(MetastorageGroupId.INSTANCE, config);
     }
@@ -244,4 +250,95 @@ class ItMetastorageGroupDisasterRecoveryTest extends 
ItSystemGroupDisasterRecove
 
         return leader.consistentId();
     }
+
+    @Test
+    void 
migratesNodesThatSawNoReparationToNewClusterIfMetastorageDidNotDiverge() throws 
Exception {
+        startAndInitCluster(2, new int[]{0}, new int[]{1});
+        waitTillClusterStateIsSavedToVaultOnConductor(0);
+
+        ComponentWorkingDir msWorkDir0 = igniteImpl(0).metastorageWorkDir();
+        ComponentWorkingDir msWorkDir1 = igniteImpl(1).metastorageWorkDir();
+
+        IntStream.of(0, 1).parallel().forEach(cluster::stopNode);
+
+        // Copy Metastorage state from old leader (1) to future leader (0) to 
make sure that 1 is not ahead of 0 and there will be
+        // no Metastorage divergence when we make 0 new leader and migrate 1 
to cluster again.
+        copyMetastorageState(msWorkDir1, msWorkDir0);
+
+        // Repair MG with just node 0 in CMG.
+        cluster.startEmbeddedNode(0);
+        initiateMgRepairVia(0, 1, 0);
+        IgniteImpl restartedIgniteImpl0 = waitTillNodeRestartsInternally(0);
+        waitTillMgHasMajority(restartedIgniteImpl0);
+
+        // Starting the node that did not see the repair.
+        migrate(1, 0);
+
+        LogicalTopologySnapshot topologySnapshot = 
restartedIgniteImpl0.logicalTopologyService().logicalTopologyOnLeader().get(10, 
SECONDS);
+        assertTopologyContainsNode(1, topologySnapshot);
+    }
+
+    private static void copyMetastorageState(ComponentWorkingDir source, 
ComponentWorkingDir dest) throws IOException {
+        replaceDir(source.dbPath(), dest.dbPath());
+        replaceDir(source.raftLogPath(), dest.raftLogPath());
+
+        String pathToSnapshots = "metastorage_group-0/snapshot";
+        replaceDir(source.metaPath().resolve(pathToSnapshots), 
dest.metaPath().resolve(pathToSnapshots));
+    }
+
+    private static void replaceDir(Path sourceDir, Path destDir) throws 
IOException {
+        assertTrue(Files.isDirectory(sourceDir));
+        assertTrue(Files.isDirectory(destDir));
+
+        assertTrue(IgniteUtils.deleteIfExists(destDir));
+        Files.createDirectory(destDir);
+        copyDir(sourceDir, destDir);
+    }
+
+    private static void copyDir(Path src, Path dest) throws IOException {
+        try (Stream<Path> stream = Files.walk(src)) {
+            stream.forEach(source -> copyFile(source, 
dest.resolve(src.relativize(source))));
+        }
+    }
+
+    private static void copyFile(Path source, Path dest) {
+        try {
+            Files.copy(source, dest, REPLACE_EXISTING);
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    @SuppressWarnings("ThrowableNotThrown")
+    @Test
+    void detectsMetastorageDivergence() throws Exception {
+        startAndInitCluster(2, new int[]{0}, new int[]{1});
+        waitTillClusterStateIsSavedToVaultOnConductor(0);
+
+        // Stopping node 0 to make sure it does not see the subsequent 
Metastorage write accepted by the leader (1).
+        // Later, we'll stop node 1, repair MG on 0, try to migrate 1 to the 
cluster, and, as 1 has a Metastorage put which 0 does not
+        // have, Metastorage divergence will have to be detected.
+        cluster.stopNode(0);
+
+        igniteImpl(1).metaStorageManager().put(new ByteArray("test-key"), new 
byte[]{42});
+
+        // This makes the MG majority go away.
+        cluster.stopNode(1);
+
+        cluster.startEmbeddedNode(0);
+        initiateMgRepairVia(0, 1, 0);
+        IgniteImpl restartedIgniteImpl0 = waitTillNodeRestartsInternally(0);
+        waitTillMgHasMajority(restartedIgniteImpl0);
+
+        // Starting the node that did not see the repair.
+        initiateMigration(1, 0);
+
+        assertThat(waitForRestartOrShutdownFuture(1), 
willCompleteSuccessfully());
+
+        // Attempt to migrate should fail.
+        assertThrowsWithCause(() -> cluster.server(1).api(), 
MetastorageDivergedException.class, "Metastorage has diverged");
+
+        // Subsequent restart should also fail.
+        assertThrowsWithCause(() -> cluster.restartNode(1), 
MetastorageDivergedException.class, "Metastorage has diverged");
+    }
 }
diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
index fb59fb546b..481f1c58f1 100644
--- 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.app.IgniteServerImpl;
 import org.apache.ignite.internal.cluster.management.ClusterState;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -129,7 +130,7 @@ abstract class ItSystemGroupDisasterRecoveryTest extends 
ClusterPerTestIntegrati
         initiateMigrationToNewCluster(oldClusterNodeIndex, 
newClusterNodeIndex);
     }
 
-    void initiateMigrationToNewCluster(int nodeMissingRepairIndex, int 
repairedNodeIndex) throws Exception {
+    final void initiateMigrationToNewCluster(int nodeMissingRepairIndex, int 
repairedNodeIndex) throws Exception {
         recoveryClient.initiateMigration(
                 "localhost",
                 cluster.httpPort(nodeMissingRepairIndex),
@@ -137,4 +138,8 @@ abstract class ItSystemGroupDisasterRecoveryTest extends 
ClusterPerTestIntegrati
                 cluster.httpPort(repairedNodeIndex)
         );
     }
+
+    final void assertTopologyContainsNode(int nodeIndex, 
LogicalTopologySnapshot topologySnapshot) {
+        assertTrue(topologySnapshot.nodes().stream().anyMatch(node -> 
node.name().equals(cluster.nodeName(nodeIndex))));
+    }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 3eb2f64b2e..067ba8ca1b 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -257,6 +257,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
  */
 @ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 @Timeout(120)
+// TODO: IGNITE-23538 - enable after fixing the test
+@Disabled("IGNITE-23538")
 public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
     private static final IgniteLogger LOG = 
Loggers.forClass(ItRebalanceDistributedTest.class);
 

Reply via email to