This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b9ebbae99c IGNITE-22807 Repair CMG on node start (#4304)
b9ebbae99c is described below

commit b9ebbae99ceb42be1f5b7c698f5ee2807300a446
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Sat Aug 31 09:20:26 2024 +0400

    IGNITE-22807 Repair CMG on node start (#4304)
---
 modules/cluster-management/build.gradle            |   5 +
 .../management/ClusterManagementGroupManager.java  |  65 +++++-
 .../network/messages/CmgInitMessage.java           |   2 +
 .../ClusterManagementGroupManagerTest.java         |   2 +
 .../cluster/management/ClusterIdHolder.java        |   0
 .../internal/cluster/management/MockNode.java      |   2 +
 modules/distribution-zones/build.gradle            |   1 +
 ...niteDistributionZoneManagerNodeRestartTest.java |   4 +-
 modules/metastorage/build.gradle                   |   1 +
 .../ItMetaStorageMultipleNodesAbstractTest.java    |   2 +
 .../metastorage/impl/ItMetaStorageWatchTest.java   |   2 +
 modules/partition-replicator/build.gradle          |   1 +
 .../replicator/ItReplicaLifecycleTest.java         |   2 +
 modules/runner/build.gradle                        |   2 +
 .../ItDistributedConfigurationPropertiesTest.java  |   2 +
 .../ItDistributedConfigurationStorageTest.java     |   2 +
 .../runner/app/ItIgniteNodeRestartTest.java        |   6 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  29 ++-
 .../ignite/internal/app/IgniteServerImpl.java      |  48 ++++-
 .../java/org/apache/ignite/internal/Cluster.java   |  11 +-
 modules/system-disaster-recovery-api/README.md     |   3 +
 .../build.gradle                                   |   5 +-
 .../system/message/ResetClusterMessage.java        |   0
 .../SystemDisasterRecoveryMessageGroup.java        |   0
 .../system/storage/ClusterResetStorage.java}       |  19 +-
 modules/system-disaster-recovery/build.gradle      |  13 +-
 .../disaster/system/ItCmgDisasterRecoveryTest.java | 226 +++++++++++++++++++++
 .../disaster/system}/ClusterIdService.java         |  46 ++++-
 .../system/SystemDisasterRecoveryManager.java      |  13 +-
 .../system/SystemDisasterRecoveryManagerImpl.java  |  44 ++--
 .../system/SystemDisasterRecoveryStorage.java      |  81 ++++++++
 .../SystemDisasterRecoveryManagerImplTest.java     |  62 +++---
 modules/table/build.gradle                         |   1 +
 .../rebalance/ItRebalanceDistributedTest.java      |   2 +
 settings.gradle                                    |   2 +
 35 files changed, 607 insertions(+), 99 deletions(-)

diff --git a/modules/cluster-management/build.gradle 
b/modules/cluster-management/build.gradle
index 5e12611208..b702a46b15 100644
--- a/modules/cluster-management/build.gradle
+++ b/modules/cluster-management/build.gradle
@@ -41,6 +41,7 @@ dependencies {
     implementation project(':ignite-storage-api')
     implementation project(':ignite-security')
     implementation project(':ignite-configuration-root')
+    implementation project(':ignite-system-disaster-recovery-api')
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
     implementation libs.auto.service.annotations
@@ -51,6 +52,7 @@ dependencies {
     testImplementation testFixtures(project(':ignite-core'))
     testImplementation testFixtures(project(':ignite-configuration'))
     testImplementation testFixtures(project(':ignite-network'))
+    testImplementation project(':ignite-system-disaster-recovery')
 
     testImplementation libs.hamcrest.core
     testImplementation libs.mockito.junit
@@ -62,6 +64,8 @@ dependencies {
     testFixturesImplementation project(':ignite-storage-api')
     testFixturesImplementation project(':ignite-vault')
     testFixturesImplementation project(':ignite-security')
+    testFixturesImplementation project(':ignite-system-disaster-recovery-api')
+    testFixturesImplementation project(':ignite-system-disaster-recovery')
     testFixturesImplementation testFixtures(project(':ignite-core'))
     testFixturesImplementation testFixtures(project(':ignite-configuration'))
     testFixturesImplementation testFixtures(project(':ignite-network'))
@@ -77,6 +81,7 @@ dependencies {
     integrationTestImplementation project(':ignite-raft-api')
     integrationTestImplementation project(':ignite-catalog')
     integrationTestImplementation project(':ignite-runner')
+    integrationTestImplementation project(':ignite-system-disaster-recovery')
     integrationTestImplementation testFixtures(project)
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation 
testFixtures(project(':ignite-configuration'))
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index cc355e6e1e..9d8affae0b 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -62,6 +62,8 @@ import 
org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyComm
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import org.apache.ignite.internal.disaster.system.storage.ClusterResetStorage;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.event.EventParameters;
 import org.apache.ignite.internal.failure.FailureContext;
@@ -152,6 +154,8 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
 
     private final ClusterIdStore clusterIdStore;
 
+    private final ClusterResetStorage clusterResetStorage;
+
     /** Future that resolves into the initial cluster configuration in HOCON 
format. */
     private final CompletableFuture<String> initialClusterConfigurationFuture 
= new CompletableFuture<>();
 
@@ -162,6 +166,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
     /** Constructor. */
     public ClusterManagementGroupManager(
             VaultManager vault,
+            ClusterResetStorage clusterResetStorage,
             ClusterService clusterService,
             ClusterInitializer clusterInitializer,
             RaftManager raftManager,
@@ -170,9 +175,10 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             ValidationManager validationManager,
             NodeAttributes nodeAttributes,
             FailureProcessor failureProcessor,
-            ClusterIdHolder clusterIdChanger,
+            ClusterIdStore clusterIdStore,
             RaftGroupOptionsConfigurer raftGroupOptionsConfigurer
     ) {
+        this.clusterResetStorage = clusterResetStorage;
         this.clusterService = clusterService;
         this.clusterInitializer = clusterInitializer;
         this.raftManager = raftManager;
@@ -182,7 +188,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
         this.localStateStorage = new LocalStateStorage(vault);
         this.nodeAttributes = nodeAttributes;
         this.failureProcessor = failureProcessor;
-        this.clusterIdStore = clusterIdChanger;
+        this.clusterIdStore = clusterIdStore;
         this.raftGroupOptionsConfigurer = raftGroupOptionsConfigurer;
 
         scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
@@ -223,6 +229,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
     @TestOnly
     public ClusterManagementGroupManager(
             VaultManager vault,
+            ClusterResetStorage clusterResetStorage,
             ClusterService clusterService,
             ClusterInitializer clusterInitializer,
             RaftManager raftManager,
@@ -230,11 +237,12 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             LogicalTopology logicalTopology,
             NodeAttributes nodeAttributes,
             FailureProcessor failureProcessor,
-            ClusterIdHolder clusterIdChanger,
+            ClusterIdStore clusterIdStore,
             RaftGroupOptionsConfigurer raftGroupOptionsConfigurer
     ) {
         this(
                 vault,
+                clusterResetStorage,
                 clusterService,
                 clusterInitializer,
                 raftManager,
@@ -243,7 +251,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                 new ValidationManager(new 
ClusterStateStorageManager(clusterStateStorage), logicalTopology),
                 nodeAttributes,
                 failureProcessor,
-                clusterIdChanger,
+                clusterIdStore,
                 raftGroupOptionsConfigurer
         );
     }
@@ -341,6 +349,11 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
 
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
+        ResetClusterMessage resetClusterMessage = 
clusterResetStorage.readResetClusterMessage();
+        if (resetClusterMessage != null) {
+            return doClusterReset(resetClusterMessage);
+        }
+
         synchronized (raftServiceLock) {
             raftService = recoverLocalState();
         }
@@ -350,6 +363,48 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
         return nullCompletedFuture();
     }
 
+    private CompletableFuture<Void> doClusterReset(ResetClusterMessage 
resetClusterMessage) {
+        LOG.info("Found a ResetClusterMessage in storage, going to do cluster 
reset [message={}]", resetClusterMessage);
+
+        return destroyCmgWithEvents()
+                .thenCompose(unused -> {
+                    if 
(resetClusterMessage.cmgNodes().contains(clusterService.nodeName())) {
+                        return doReinit(resetClusterMessage);
+                    } else {
+                        // Let's just wait for new CMG nodes to establish a 
majority and send us an invitation to join.
+                        cmgMessageHandler.onRecoveryComplete();
+                        return nullCompletedFuture();
+                    }
+                })
+                .thenRun(clusterResetStorage::removeResetClusterMessage);
+    }
+
+    private CompletableFuture<Void> doReinit(ResetClusterMessage 
resetClusterMessage) {
+        CompletableFuture<CmgRaftService> serviceFuture;
+
+        synchronized (raftServiceLock) {
+            // Disaster recovery means that the Repair Conductor has ensured 
the cluster was initialized,
+            // so we can just pass null as initialClusterConfig.
+            serviceFuture = 
startCmgRaftServiceWithEvents(resetClusterMessage.cmgNodes(), null);
+            raftService = serviceFuture;
+        }
+
+        cmgMessageHandler.onRecoveryComplete();
+
+        return serviceFuture
+                .thenCompose(service -> doInit(service, 
cmgInitMessageFromResetClusterMessage(resetClusterMessage)))
+                .thenApply(unused -> null);
+    }
+
+    private CmgInitMessage 
cmgInitMessageFromResetClusterMessage(ResetClusterMessage resetClusterMessage) {
+        return msgFactory.cmgInitMessage()
+                .cmgNodes(resetClusterMessage.cmgNodes())
+                .metaStorageNodes(resetClusterMessage.metaStorageNodes())
+                .clusterName(resetClusterMessage.clusterName())
+                .clusterId(resetClusterMessage.clusterId())
+                .build();
+    }
+
     /**
      * Returns the cluster state future or the future that will be resolved to 
null if the cluster is not initialized yet.
      */
@@ -697,7 +752,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
      * Starts the CMG Raft service using the provided node names as its peers.
      */
     private CompletableFuture<CmgRaftService> startCmgRaftService(Set<String> 
nodeNames) {
-        String thisNodeConsistentId = 
clusterService.topologyService().localMember().name();
+        String thisNodeConsistentId = clusterService.nodeName();
 
         // If we are not in the CMG, we must be a learner. List of learners 
will be updated by a leader accordingly,
         // but just to start a RAFT service we must include ourselves in the 
initial learners list, that's why we
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/network/messages/CmgInitMessage.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/network/messages/CmgInitMessage.java
index dcc31c5f2c..8cbbf0ec3a 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/network/messages/CmgInitMessage.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/network/messages/CmgInitMessage.java
@@ -21,6 +21,7 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Message for initializing the Cluster Management Group.
@@ -50,5 +51,6 @@ public interface CmgInitMessage extends NetworkMessage {
     /**
      * Cluster configuration that should be applied after init.
      */
+    @Nullable
     String initialClusterConfiguration();
 }
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
index 3ccbd04152..8fa1a581ad 100644
--- 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
@@ -36,6 +36,7 @@ import 
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
 import 
org.apache.ignite.internal.cluster.management.raft.commands.InitCmgStateCommand;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.ComponentContext;
@@ -103,6 +104,7 @@ class ClusterManagementGroupManagerTest extends 
BaseIgniteAbstractTest {
 
         cmgManager = new ClusterManagementGroupManager(
                 vaultManager,
+                new SystemDisasterRecoveryStorage(vaultManager),
                 clusterService,
                 clusterInitializer,
                 raftManager,
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdHolder.java
 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/ClusterIdHolder.java
similarity index 100%
rename from 
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdHolder.java
rename to 
modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/ClusterIdHolder.java
diff --git 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 07a25cb53a..3a9ffd847a 100644
--- 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -126,6 +127,7 @@ public class MockNode {
 
         this.clusterManager = new ClusterManagementGroupManager(
                 vaultManager,
+                new SystemDisasterRecoveryStorage(vaultManager),
                 clusterService,
                 new ClusterInitializer(clusterService, hocon -> hocon, new 
TestConfigurationValidator()),
                 raftManager,
diff --git a/modules/distribution-zones/build.gradle 
b/modules/distribution-zones/build.gradle
index bc6d2c4cc9..7c12418d6e 100644
--- a/modules/distribution-zones/build.gradle
+++ b/modules/distribution-zones/build.gradle
@@ -94,6 +94,7 @@ dependencies {
     integrationTestImplementation project(':ignite-security')
     integrationTestImplementation project(':ignite-system-view-api')
     integrationTestImplementation project(':ignite-failure-handler')
+    integrationTestImplementation project(':ignite-system-disaster-recovery')
     integrationTestImplementation testFixtures(project(':ignite-catalog'))
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation testFixtures(project(':ignite-runner'))
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 2c169badfb..aa3d1f953c 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -84,7 +84,6 @@ import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.catalog.CatalogTestUtils;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
-import org.apache.ignite.internal.cluster.management.ClusterIdService;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
@@ -99,6 +98,7 @@ import 
org.apache.ignite.internal.configuration.storage.DistributedConfiguration
 import 
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
+import org.apache.ignite.internal.disaster.system.ClusterIdService;
 import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
 import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
 import org.apache.ignite.internal.failure.FailureProcessor;
@@ -187,7 +187,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
 
         var clusterStateStorage = new TestClusterStateStorage();
 
-        var clusterIdService = new ClusterIdService(clusterStateStorage);
+        var clusterIdService = new ClusterIdService(vault);
 
         ConfigurationModules modules = loadConfigurationModules(log, 
Thread.currentThread().getContextClassLoader());
 
diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle
index 8e8b77728e..cfca86b3cc 100644
--- a/modules/metastorage/build.gradle
+++ b/modules/metastorage/build.gradle
@@ -62,6 +62,7 @@ dependencies {
     integrationTestImplementation project(":ignite-security")
     integrationTestImplementation project(':ignite-metrics')
     integrationTestImplementation project(':ignite-runner')
+    integrationTestImplementation project(':ignite-system-disaster-recovery')
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation testFixtures(project(':ignite-network'))
     integrationTestImplementation testFixtures(project(':ignite-raft'))
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index eb6c1d7b02..9c109bbe2c 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -60,6 +60,7 @@ import 
org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -197,6 +198,7 @@ public abstract class 
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
 
             this.cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
+                    new SystemDisasterRecoveryStorage(vaultManager),
                     clusterService,
                     clusterInitializer,
                     raftManager,
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index af288c02ad..20e887d07c 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -61,6 +61,7 @@ import 
org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -186,6 +187,7 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
 
             this.cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
+                    new SystemDisasterRecoveryStorage(vaultManager),
                     clusterService,
                     clusterInitializer,
                     raftManager,
diff --git a/modules/partition-replicator/build.gradle 
b/modules/partition-replicator/build.gradle
index a9345575c4..71a1d37c80 100644
--- a/modules/partition-replicator/build.gradle
+++ b/modules/partition-replicator/build.gradle
@@ -76,6 +76,7 @@ dependencies {
     integrationTestImplementation project(':ignite-placement-driver')
     integrationTestImplementation project(':ignite-affinity')
     integrationTestImplementation project(':ignite-runner')
+    integrationTestImplementation project(':ignite-system-disaster-recovery')
 }
 
 description = 'ignite-partition-replicator'
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index fea2d42cd3..5aea3a6229 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -100,6 +100,7 @@ import 
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationSt
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
 import org.apache.ignite.internal.failure.FailureProcessor;
@@ -969,6 +970,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
+                    new SystemDisasterRecoveryStorage(vaultManager),
                     clusterService,
                     clusterInitializer,
                     raftManager,
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index e95aa67397..8c64ae163c 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -90,6 +90,7 @@ dependencies {
     implementation project(':ignite-partition-replicator')
     implementation project(':ignite-catalog-compaction')
     implementation project(':ignite-system-disaster-recovery')
+    implementation project(':ignite-system-disaster-recovery-api')
     implementation libs.jetbrains.annotations
     implementation libs.micronaut.inject
     implementation libs.micronaut.validation
@@ -167,6 +168,7 @@ dependencies {
     integrationTestImplementation project(':ignite-low-watermark')
     integrationTestImplementation project(':ignite-partition-replicator')
     integrationTestImplementation project(':ignite-configuration-root')
+    integrationTestImplementation project(':ignite-system-disaster-recovery')
     integrationTestImplementation testFixtures(project(":ignite-api"))
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation 
testFixtures(project(':ignite-configuration'))
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index af03f15f6e..239574d66e 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -53,6 +53,7 @@ import 
org.apache.ignite.internal.configuration.storage.DistributedConfiguration
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -203,6 +204,7 @@ public class ItDistributedConfigurationPropertiesTest 
extends BaseIgniteAbstract
 
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
+                    new SystemDisasterRecoveryStorage(vaultManager),
                     clusterService,
                     clusterInitializer,
                     raftManager,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 5c9698a0c7..f8cfddef2d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -175,6 +176,7 @@ public class ItDistributedConfigurationStorageTest extends 
BaseIgniteAbstractTes
                     
RaftGroupOptionsConfigHelper.configureProperties(cmgLogStorageFactory, 
cmgWorkDir.metaPath());
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
+                    new SystemDisasterRecoveryStorage(vaultManager),
                     clusterService,
                     clusterInitializer,
                     raftManager,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 896d7a1a23..86ea473c4c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -97,7 +97,6 @@ import 
org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
 import 
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
 import 
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationExtensionConfiguration;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
-import org.apache.ignite.internal.cluster.management.ClusterIdService;
 import org.apache.ignite.internal.cluster.management.ClusterInitializer;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
@@ -122,6 +121,8 @@ import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import 
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import org.apache.ignite.internal.disaster.system.ClusterIdService;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.ClockService;
@@ -334,7 +335,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         var clusterStateStorage = new 
RocksDbClusterStateStorage(dir.resolve("cmg"), name);
 
-        var clusterIdService = new ClusterIdService(clusterStateStorage);
+        var clusterIdService = new ClusterIdService(vault);
 
         ConfigurationModules modules = loadConfigurationModules(log, 
Thread.currentThread().getContextClassLoader());
 
@@ -428,6 +429,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         var cmgManager = new ClusterManagementGroupManager(
                 vault,
+                new SystemDisasterRecoveryStorage(vault),
                 clusterSvc,
                 clusterInitializer,
                 raftMgr,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 2ec53343ea..a095b99611 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -69,7 +69,6 @@ import 
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationCon
 import 
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationExtensionConfiguration;
 import org.apache.ignite.internal.catalog.sql.IgniteCatalogSqlImpl;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
-import org.apache.ignite.internal.cluster.management.ClusterIdService;
 import org.apache.ignite.internal.cluster.management.ClusterInitializer;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.ClusterState;
@@ -120,8 +119,11 @@ import 
org.apache.ignite.internal.deployunit.DeploymentManagerImpl;
 import org.apache.ignite.internal.deployunit.IgniteDeployment;
 import 
org.apache.ignite.internal.deployunit.configuration.DeploymentExtensionConfiguration;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl;
+import org.apache.ignite.internal.disaster.system.ClusterIdService;
 import org.apache.ignite.internal.disaster.system.ServerRestarter;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryManager;
 import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryManagerImpl;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
 import 
org.apache.ignite.internal.eventlog.config.schema.EventLogExtensionConfiguration;
@@ -520,7 +522,7 @@ public class IgniteImpl implements Ignite {
         // TODO: IGNITE-16841 - use common RocksDB instance to store cluster 
state as well.
         clusterStateStorage = new 
RocksDbClusterStateStorage(cmgWorkDir.dbPath(), name);
 
-        clusterIdService = new ClusterIdService(clusterStateStorage);
+        clusterIdService = new ClusterIdService(vaultMgr);
 
         criticalWorkerRegistry = new CriticalWorkerWatchdog(
                 criticalWorkersConfiguration,
@@ -553,8 +555,7 @@ public class IgniteImpl implements Ignite {
                 clusterSvc.topologyService(),
                 clusterSvc.messagingService(),
                 vaultMgr,
-                restarter,
-                clusterStateStorage
+                restarter
         );
 
         clock = new HybridClockImpl();
@@ -629,6 +630,7 @@ public class IgniteImpl implements Ignite {
 
         cmgMgr = new ClusterManagementGroupManager(
                 vaultMgr,
+                new SystemDisasterRecoveryStorage(vaultMgr),
                 clusterSvc,
                 clusterInitializer,
                 raftMgr,
@@ -1248,6 +1250,8 @@ public class IgniteImpl implements Ignite {
         ComponentContext componentContext = new ComponentContext(joinExecutor);
 
         return cmgMgr.joinFuture()
+                .thenComposeAsync(unused -> cmgMgr.clusterState(), 
joinExecutor)
+                
.thenAcceptAsync(systemDisasterRecoveryManager::saveClusterState, joinExecutor)
                 // Disable REST component during initialization.
                 .thenAcceptAsync(unused -> restComponent.disable(), 
joinExecutor)
                 .thenComposeAsync(unused -> {
@@ -1434,6 +1438,16 @@ public class IgniteImpl implements Ignite {
         return disasterRecoveryManager;
     }
 
+    @TestOnly
+    public VaultManager vault() {
+        return vaultMgr;
+    }
+
+    @TestOnly
+    public ClusterStateStorage clusterStateStorage() {
+        return clusterStateStorage;
+    }
+
     @TestOnly
     public QueryProcessor queryEngine() {
         return qryEngine;
@@ -1617,7 +1631,7 @@ public class IgniteImpl implements Ignite {
                                 }, startupExecutor);
                     }
                 }, startupExecutor)
-                
.thenRunAsync(systemDisasterRecoveryManager::markNodeInitialized, 
startupExecutor);
+                
.thenRunAsync(systemDisasterRecoveryManager::markInitConfigApplied, 
startupExecutor);
     }
 
     /**
@@ -1765,4 +1779,9 @@ public class IgniteImpl implements Ignite {
         return replicaMgr;
     }
 
+    /** Returns disaster recovery manager for system groups. */
+    @TestOnly
+    public SystemDisasterRecoveryManager systemDisasterRecoveryManager() {
+        return systemDisasterRecoveryManager;
+    }
 }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
index c9a0a2982a..f80032706f 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
+import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteServer;
 import org.apache.ignite.InitParameters;
@@ -49,6 +50,7 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeNotStartedException;
 import org.apache.ignite.lang.NodeStartException;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Implementation of embedded node.
@@ -108,7 +110,8 @@ public class IgniteServerImpl implements IgniteServer {
      *
      * <p>Guarded by {@link #restartOrShutdownMutex}.
      */
-    private CompletableFuture<Void> restartOrShutdownFuture = 
nullCompletedFuture();
+    @Nullable
+    private CompletableFuture<Void> restartOrShutdownFuture;
 
     /**
      * Gets set to {@code true} when the node is shut down. This disallows 
restarts.
@@ -248,15 +251,24 @@ public class IgniteServerImpl implements IgniteServer {
                 throw new NodeNotStartedException();
             }
 
-            throwIfNotJoined();
-
-            result = restartOrShutdownFuture.thenCompose(unused -> 
doRestartAsync(instance));
-            restartOrShutdownFuture = result;
+            result = chainRestartOrShutdownAction(() -> 
doRestartAsync(instance));
         }
 
         return result;
     }
 
+    /**
+     * This MUST be called under synchronization on {@link 
#restartOrShutdownMutex}.
+     */
+    private CompletableFuture<Void> 
chainRestartOrShutdownAction(Supplier<CompletableFuture<Void>> action) {
+        CompletableFuture<Void> result = (restartOrShutdownFuture == null ? 
nullCompletedFuture() : restartOrShutdownFuture)
+                .thenCompose(unused -> action.get());
+
+        restartOrShutdownFuture = result;
+
+        return result;
+    }
+
     private CompletableFuture<Void> doRestartAsync(IgniteImpl instance) {
         // TODO: IGNITE-23006 - limit the wait to acquire the write lock with 
a timeout.
         return attachmentLock.detachedAsync(() -> {
@@ -277,8 +289,7 @@ public class IgniteServerImpl implements IgniteServer {
         CompletableFuture<Void> result;
 
         synchronized (restartOrShutdownMutex) {
-            result = restartOrShutdownFuture.thenCompose(unused -> 
doShutdownAsync());
-            restartOrShutdownFuture = result;
+            result = chainRestartOrShutdownAction(this::doShutdownAsync);
 
             shutDown = true;
         }
@@ -391,4 +402,27 @@ public class IgniteServerImpl implements IgniteServer {
             throw ExceptionUtils.sneakyThrow(e);
         }
     }
+
+    /**
+     * Returns the underlying IgniteImpl even if the join was not completed.
+     */
+    @TestOnly
+    public IgniteImpl igniteImpl() {
+        IgniteImpl instance = ignite;
+        if (instance == null) {
+            throw new NodeNotStartedException();
+        }
+
+        return instance;
+    }
+
+    /**
+     * Returns future that gets completed when restart or shutdown is complete.
+     */
+    @TestOnly
+    public @Nullable CompletableFuture<Void> restartOrShutdownFuture() {
+        synchronized (restartOrShutdownMutex) {
+            return restartOrShutdownFuture;
+        }
+    }
 }
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index 7be5bc0e18..15f32cfa58 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -259,7 +259,7 @@ public class Cluster {
      * @return Started server and its registration future.
      */
     public ServerRegistration startEmbeddedNode(int nodeIndex, String 
nodeBootstrapConfigTemplate) {
-        String nodeName = testNodeName(testInfo, nodeIndex);
+        String nodeName = nodeName(nodeIndex);
 
         String config = IgniteStringFormatter.format(
                 nodeBootstrapConfigTemplate,
@@ -288,6 +288,15 @@ public class Cluster {
         return new ServerRegistration(node, registrationFuture);
     }
 
+    /**
+     * Returns node name by index.
+     *
+     * @param nodeIndex Index of the node of interest.
+     */
+    public String nodeName(int nodeIndex) {
+        return testNodeName(testInfo, nodeIndex);
+    }
+
     private static <T> void setListAtIndex(List<T> list, int i, T element) {
         while (list.size() < i) {
             list.add(null);
diff --git a/modules/system-disaster-recovery-api/README.md 
b/modules/system-disaster-recovery-api/README.md
new file mode 100644
index 0000000000..2e8dc4b209
--- /dev/null
+++ b/modules/system-disaster-recovery-api/README.md
@@ -0,0 +1,3 @@
+# System disaster recovery API module
+
+This module provides API to recover from disasters related to the system Raft 
groups (CMG and Metastorage group).
diff --git a/modules/system-disaster-recovery/build.gradle 
b/modules/system-disaster-recovery-api/build.gradle
similarity index 85%
copy from modules/system-disaster-recovery/build.gradle
copy to modules/system-disaster-recovery-api/build.gradle
index 333b8e7938..4c93ec2153 100644
--- a/modules/system-disaster-recovery/build.gradle
+++ b/modules/system-disaster-recovery-api/build.gradle
@@ -26,14 +26,11 @@ dependencies {
     implementation project(':ignite-core')
     implementation project(':ignite-network-api')
     implementation project(':ignite-vault')
-    implementation project(':ignite-cluster-management')
     implementation libs.jetbrains.annotations
 
     testImplementation libs.hamcrest.core
     testImplementation libs.mockito.core
     testImplementation libs.mockito.junit
-    testImplementation testFixtures(project(":ignite-core"))
-    testImplementation testFixtures(project(":ignite-cluster-management"))
 }
 
-description = 'ignite-system-disaster-recovery'
+description = 'ignite-system-disaster-recovery-api'
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
similarity index 100%
rename from 
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
rename to 
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
similarity index 100%
copy from 
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
copy to 
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/storage/ClusterResetStorage.java
similarity index 61%
rename from 
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
rename to 
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/storage/ClusterResetStorage.java
index c6f0b0fa07..82ca25f3d8 100644
--- 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
+++ 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/storage/ClusterResetStorage.java
@@ -15,17 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.disaster.system.message;
+package org.apache.ignite.internal.disaster.system.storage;
 
-import org.apache.ignite.internal.network.annotations.MessageGroup;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Message Group for disaster recovery of system groups.
+ * Storage used by the cluster reset tools.
  */
-@MessageGroup(groupType = 15, groupName = "SystemDisasterRecoveryMessages")
-public class SystemDisasterRecoveryMessageGroup {
+public interface ClusterResetStorage {
     /**
-     * Message type for {@link ResetClusterMessage}.
+     * Reads {@link ResetClusterMessage}; returns {@code null} if it's not 
saved.
      */
-    public static final short RESET_CLUSTER = 1;
+    @Nullable ResetClusterMessage readResetClusterMessage();
+
+    /**
+     * Removes saved {@link ResetClusterMessage}.
+     */
+    void removeResetClusterMessage();
 }
diff --git a/modules/system-disaster-recovery/build.gradle 
b/modules/system-disaster-recovery/build.gradle
index 333b8e7938..e25c4e31fc 100644
--- a/modules/system-disaster-recovery/build.gradle
+++ b/modules/system-disaster-recovery/build.gradle
@@ -19,9 +19,10 @@ apply from: "$rootDir/buildscripts/java-core.gradle"
 apply from: "$rootDir/buildscripts/publishing.gradle"
 apply from: "$rootDir/buildscripts/java-junit5.gradle"
 apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
+apply from: "$rootDir/buildscripts/java-integration-test.gradle"
 
 dependencies {
-    annotationProcessor project(":ignite-network-annotation-processor")
+    api project(':ignite-system-disaster-recovery-api')
 
     implementation project(':ignite-core')
     implementation project(':ignite-network-api')
@@ -34,6 +35,16 @@ dependencies {
     testImplementation libs.mockito.junit
     testImplementation testFixtures(project(":ignite-core"))
     testImplementation testFixtures(project(":ignite-cluster-management"))
+    testImplementation libs.jetbrains.annotations
+
+    integrationTestImplementation 
project(':ignite-system-disaster-recovery-api')
+    integrationTestImplementation project(':ignite-cluster-management')
+    integrationTestImplementation project(':ignite-vault')
+    integrationTestImplementation project(':ignite-network-api')
+    integrationTestImplementation testFixtures(project(':ignite-core'))
+    integrationTestImplementation testFixtures(project(':ignite-api'))
+    integrationTestImplementation testFixtures(project(':ignite-runner'))
+    integrationTestImplementation libs.jetbrains.annotations
 }
 
 description = 'ignite-system-disaster-recovery'
diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
new file mode 100644
index 0000000000..3cdf5bd9ee
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.app.IgniteServerImpl;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+class ItCmgDisasterRecoveryTest extends ClusterPerTestIntegrationTest {
+    @Override
+    protected int initialNodes() {
+        return 0;
+    }
+
+    @Test
+    void repairWhenCmgWas1Node() throws Exception {
+        cluster.startAndInit(3, paramsBuilder -> {
+            paramsBuilder.cmgNodeNames(nodeNames(0));
+            paramsBuilder.metaStorageNodeNames(nodeNames(1));
+            // Node with index 2 will host neither of voting sets.
+        });
+        waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+        // This makes the CMG majority go away.
+        cluster.stopNode(0);
+
+        IgniteImpl igniteImpl1BeforeRestart = igniteImpl(1);
+
+        assertThatCmgHasNoMajority(igniteImpl1BeforeRestart);
+
+        initiateCmgRepairVia(igniteImpl1BeforeRestart, 1);
+
+        IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
+        waitTillCmgHasMajority(restartedIgniteImpl1);
+
+        assertResetClusterMessageIsNotPresentAt(restartedIgniteImpl1);
+        
assertResetClusterMessageIsNotPresentAt(waitTillNodeRestartsInternally(2));
+    }
+
+    private void waitTillClusterStateIsSavedToVaultOnConductor(int nodeIndex) 
throws InterruptedException {
+        assertTrue(waitForCondition(
+                () -> new 
SystemDisasterRecoveryStorage(igniteImpl(nodeIndex).vault()).readClusterState() 
!= null,
+                SECONDS.toMillis(10)
+        ));
+    }
+
+    private static void assertThatCmgHasNoMajority(IgniteImpl 
igniteImpl1BeforeRestart) {
+        
assertThat(igniteImpl1BeforeRestart.logicalTopologyService().logicalTopologyOnLeader(),
 willTimeoutIn(1, SECONDS));
+    }
+
+    private static void waitTillCmgHasMajority(IgniteImpl 
restartedIgniteImpl1) {
+        
assertThat(restartedIgniteImpl1.logicalTopologyService().logicalTopologyOnLeader(),
 willCompleteSuccessfully());
+    }
+
+    private void initiateCmgRepairVia(IgniteImpl conductor, int... 
newCmgIndexes) {
+        // TODO: IGNITE-22812 - initiate repair via CLI.
+
+        CompletableFuture<Void> initiationFuture = 
conductor.systemDisasterRecoveryManager()
+                .resetCluster(List.of(nodeNames(newCmgIndexes)));
+        assertThat(initiationFuture, willCompleteSuccessfully());
+    }
+
+    private String[] nodeNames(int... nodeIndexes) {
+        return IntStream.of(nodeIndexes)
+                .mapToObj(cluster::nodeName)
+                .toArray(String[]::new);
+    }
+
+    private IgniteImpl waitTillNodeRestartsInternally(int nodeIndex) throws 
InterruptedException {
+        // restartOrShutdownFuture() becomes non-null when restart or shutdown 
is initiated; we know it's restart.
+
+        assertTrue(waitForCondition(() -> restartOrShutdownFuture(nodeIndex) 
!= null, SECONDS.toMillis(20)));
+        assertThat(restartOrShutdownFuture(nodeIndex), 
willCompleteSuccessfully());
+
+        return unwrapIgniteImpl(cluster.server(nodeIndex).api());
+    }
+
+    @Nullable
+    private CompletableFuture<Void> restartOrShutdownFuture(int nodeIndex) {
+        return ((IgniteServerImpl) 
cluster.server(nodeIndex)).restartOrShutdownFuture();
+    }
+
+    private static void assertResetClusterMessageIsNotPresentAt(IgniteImpl 
ignite) {
+        assertThat(new 
SystemDisasterRecoveryStorage(ignite.vault()).readResetClusterMessage(), 
is(nullValue()));
+    }
+
+    @Test
+    void repairWhenCmgWas3Nodes() throws Exception {
+        cluster.startAndInit(6, paramsBuilder -> {
+            paramsBuilder.cmgNodeNames(nodeNames(0, 1, 2));
+            paramsBuilder.metaStorageNodeNames(nodeNames(2, 3, 4));
+            // Node with index 5 will host neither of voting sets.
+        });
+        waitTillClusterStateIsSavedToVaultOnConductor(2);
+
+        // Stop the majority of CMG.
+        IntStream.of(0, 1).parallel().forEach(cluster::stopNode);
+
+        IgniteImpl igniteImpl2BeforeRestart = igniteImpl(2);
+
+        assertThatCmgHasNoMajority(igniteImpl2BeforeRestart);
+
+        initiateCmgRepairVia(igniteImpl2BeforeRestart, 2, 3, 4);
+
+        IgniteImpl restartedIgniteImpl2 = waitTillNodeRestartsInternally(2);
+        waitTillCmgHasMajority(restartedIgniteImpl2);
+
+        // TODO: IGNITE-23096 - remove after the hang is fixed.
+        waitTillNodesRestartInProcess(3, 4, 5);
+    }
+
+    private void waitTillNodesRestartInProcess(int... nodeIndexes) throws 
InterruptedException {
+        for (int i : nodeIndexes) {
+            waitTillNodeRestartsInternally(i);
+        }
+    }
+
+    @Test
+    void repairedClusterCanJoinBlankNodes() throws Exception {
+        cluster.startAndInit(2, paramsBuilder -> {
+            paramsBuilder.cmgNodeNames(nodeNames(0));
+            paramsBuilder.metaStorageNodeNames(nodeNames(1));
+        });
+        waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+        cluster.stopNode(0);
+
+        initiateCmgRepairVia(igniteImpl(1), 1);
+
+        // Doing this wait to make sure that blank node will be able to 
connect at least someone. If we don't do this, the new node
+        // will still be able to connect, but this will happen on Scalecube's 
initial sync retry, and we don't want to wait for it
+        // in our test.
+        waitTillNodeRestartsInternally(1);
+
+        IgniteImpl node2 = unwrapIgniteImpl(cluster.startNode(2));
+
+        assertTrue(waitForCondition(
+                () -> 
node2.logicalTopologyService().localLogicalTopology().nodes().stream()
+                        .anyMatch(n -> node2.name().equals(n.name())),
+                SECONDS.toMillis(10)
+        ));
+    }
+
+    @Test
+    void repairIsPossibleWhenAllNodesWaitForCmgMajorityOnJoin() throws 
Exception {
+        cluster.startAndInit(3, paramsBuilder -> {
+            paramsBuilder.cmgNodeNames(nodeNames(0));
+            paramsBuilder.metaStorageNodeNames(nodeNames(1));
+            // Node with index 2 will host neither of voting sets.
+        });
+        waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+        cluster.stopNode(0);
+
+        IntStream.of(1, 2).parallel().forEach(this::restartPartially);
+
+        initiateCmgRepairVia(((IgniteServerImpl) 
cluster.server(1)).igniteImpl(), 1);
+
+        IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
+        waitTillCmgHasMajority(restartedIgniteImpl1);
+
+        // TODO: IGNITE-23096 - remove after the hang is fixed.
+        waitTillNodeRestartsInternally(2);
+    }
+
+    private void restartPartially(int index) {
+        cluster.stopNode(index);
+        cluster.startEmbeddedNode(index);
+    }
+
+    @Test
+    void nodesThatSawNoReparationHaveSeparatePhysicalTopology() throws 
Exception {
+        cluster.startAndInit(2, paramsBuilder -> {
+            paramsBuilder.cmgNodeNames(nodeNames(0));
+            paramsBuilder.metaStorageNodeNames(nodeNames(1));
+        });
+        waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+        // This makes the CMG majority go away.
+        cluster.stopNode(0);
+
+        initiateCmgRepairVia(igniteImpl(1), 1);
+
+        IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
+        waitTillCmgHasMajority(restartedIgniteImpl1);
+
+        // Starting the node that did not see the repair.
+        cluster.startEmbeddedNode(0);
+
+        assertFalse(
+                waitForCondition(() -> 
restartedIgniteImpl1.clusterNodes().size() > 1, SECONDS.toMillis(3)),
+                "Nodes from different clusters were able to establish a 
connection"
+        );
+    }
+}
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdService.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
similarity index 53%
rename from 
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdService.java
rename to 
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
index 9a001d5d8c..d8138db464 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdService.java
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
@@ -15,35 +15,46 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.cluster.management;
+package org.apache.ignite.internal.disaster.system;
 
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
-import 
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
+import org.apache.ignite.internal.cluster.management.ClusterIdStore;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Used to handle volatile information about cluster ID used to restrict which 
nodes can connect this one and vice versa.
  */
-public class ClusterIdService extends ClusterIdHolder implements 
IgniteComponent {
-    private final ClusterStateStorage clusterStateStorage;
+public class ClusterIdService implements ClusterIdSupplier, ClusterIdStore, 
IgniteComponent {
+    private final SystemDisasterRecoveryStorage storage;
 
-    public ClusterIdService(ClusterStateStorage clusterStateStorage) {
-        this.clusterStateStorage = clusterStateStorage;
+    private volatile @Nullable UUID clusterId;
+    private volatile @Nullable UUID clusterIdOverride;
+
+    public ClusterIdService(VaultManager vault) {
+        storage = new SystemDisasterRecoveryStorage(vault);
     }
 
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
-        var clusterStateManager = new 
ClusterStateStorageManager(clusterStateStorage);
-
-        ClusterState clusterState = clusterStateManager.getClusterState();
+        ClusterState clusterState = storage.readClusterState();
         if (clusterState != null) {
             clusterId(clusterState.clusterTag().clusterId());
         }
 
+        ResetClusterMessage resetClusterMessage = 
storage.readResetClusterMessage();
+        if (resetClusterMessage != null) {
+            this.clusterIdOverride = resetClusterMessage.clusterId();
+        }
+
         return nullCompletedFuture();
     }
 
@@ -51,4 +62,19 @@ public class ClusterIdService extends ClusterIdHolder 
implements IgniteComponent
     public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
         return nullCompletedFuture();
     }
+
+    @Override
+    public UUID clusterId() {
+        UUID override = clusterIdOverride;
+        if (override != null) {
+            return override;
+        }
+
+        return clusterId;
+    }
+
+    @Override
+    public void clusterId(UUID newClusterId) {
+        clusterId = newClusterId;
+    }
 }
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
index 0cb06071fb..ea2779a244 100644
--- 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.disaster.system;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterState;
 import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
 
 /**
@@ -26,9 +27,17 @@ import 
org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
  */
 public interface SystemDisasterRecoveryManager {
     /**
-     * Marks this node as initialized.
+     * Saves cluster state to make sure it can be used to initiate CMG/MG 
repair.
+     *
+     * @param clusterState State to save.
+     */
+    void saveClusterState(ClusterState clusterState);
+
+    /**
+     * Marks this node as a node that saw initial configuration application. 
After this happens, the initial configuration
+     * in CMG is not needed anymore and can be disposed.
      */
-    void markNodeInitialized();
+    void markInitConfigApplied();
 
     /**
      * Initiates cluster reset.
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
index 8db5db4a08..90299e3272 100644
--- 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
@@ -22,7 +22,6 @@ import static java.util.UUID.randomUUID;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static org.apache.ignite.internal.util.CompletableFutures.allOf;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
@@ -40,19 +39,15 @@ import java.util.concurrent.Executor;
 import org.apache.ignite.internal.cluster.management.ClusterState;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import 
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
-import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
-import 
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
 import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
 import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
 import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
-import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.util.CompletableFutures;
-import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.network.ClusterNode;
 
@@ -60,20 +55,16 @@ import org.apache.ignite.network.ClusterNode;
  * Implementation of {@link SystemDisasterRecoveryManager}.
  */
 public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecoveryManager, IgniteComponent {
-    private static final String NODE_INITIALIZED_VAULT_KEY = 
"systemRecovery.nodeInitialized";
-    private static final String RESET_CLUSTER_MESSAGE_VAULT_KEY = 
"systemRecovery.resetClusterMessage";
-
     private final String thisNodeName;
     private final TopologyService topologyService;
     private final MessagingService messagingService;
-    private final VaultManager vaultManager;
     private final ServerRestarter restarter;
 
-    private final ClusterStateStorageManager clusterStateStorageManager;
-
     private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
     private static final CmgMessagesFactory cmgMessagesFactory = new 
CmgMessagesFactory();
 
+    private final SystemDisasterRecoveryStorage storage;
+
     /** This executor spawns a thread per task and should only be used for 
very rare tasks. */
     private final Executor restartExecutor;
 
@@ -83,17 +74,14 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
             TopologyService topologyService,
             MessagingService messagingService,
             VaultManager vaultManager,
-            ServerRestarter restarter,
-            ClusterStateStorage clusterStateStorage
+            ServerRestarter restarter
     ) {
         this.thisNodeName = thisNodeName;
         this.topologyService = topologyService;
         this.messagingService = messagingService;
-        this.vaultManager = vaultManager;
         this.restarter = restarter;
 
-        clusterStateStorageManager = new 
ClusterStateStorageManager(clusterStateStorage);
-
+        storage = new SystemDisasterRecoveryStorage(vaultManager);
         restartExecutor = new ThreadPerTaskExecutor(thisNodeName + 
"-restart-");
     }
 
@@ -111,7 +99,7 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
 
     private void handleResetClusterMessage(ResetClusterMessage message, 
ClusterNode sender, long correlationId) {
         restartExecutor.execute(() -> {
-            vaultManager.put(new ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY), 
toBytes(message));
+            storage.saveResetClusterMessage(message);
 
             messagingService.respond(sender, successResponseMessage(), 
correlationId)
                     .thenRunAsync(() -> {
@@ -122,7 +110,7 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
         });
     }
 
-    private SuccessResponseMessage successResponseMessage() {
+    private static SuccessResponseMessage successResponseMessage() {
         return cmgMessagesFactory.successResponseMessage().build();
     }
 
@@ -132,8 +120,13 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
     }
 
     @Override
-    public void markNodeInitialized() {
-        vaultManager.put(new ByteArray(NODE_INITIALIZED_VAULT_KEY), new 
byte[]{1});
+    public void saveClusterState(ClusterState clusterState) {
+        storage.saveClusterState(clusterState);
+    }
+
+    @Override
+    public void markInitConfigApplied() {
+        storage.markInitConfigApplied();
     }
 
     @Override
@@ -152,7 +145,7 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
         Collection<ClusterNode> nodesInTopology = topologyService.allMembers();
         ensureAllProposedCmgNodesAreInTopology(proposedCmgConsistentIds, 
nodesInTopology);
 
-        ensureNodeIsInitialized();
+        ensureInitConfigApplied();
         ClusterState clusterState = ensureClusterStateIsPresent();
 
         ResetClusterMessage message = buildResetClusterMessage(
@@ -188,10 +181,9 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
         return responseFutures;
     }
 
-    private void ensureNodeIsInitialized() {
-        VaultEntry initializedEntry = vaultManager.get(new 
ByteArray(NODE_INITIALIZED_VAULT_KEY));
-        if (initializedEntry == null) {
-            throw new ClusterResetException("Node is not initialized and 
cannot serve as a cluster reset conductor.");
+    private void ensureInitConfigApplied() {
+        if (!storage.isInitConfigApplied()) {
+            throw new ClusterResetException("Initial configuration is not 
applied and cannot serve as a cluster reset conductor.");
         }
     }
 
@@ -222,7 +214,7 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
     }
 
     private ClusterState ensureClusterStateIsPresent() {
-        ClusterState clusterState = 
clusterStateStorageManager.getClusterState();
+        ClusterState clusterState = storage.readClusterState();
         if (clusterState == null) {
             throw new ClusterResetException("Node does not have cluster 
state.");
         }
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
new file mode 100644
index 0000000000..200a19b99b
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import org.apache.ignite.internal.disaster.system.storage.ClusterResetStorage;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage used by tools for disaster recovery of system groups.
+ */
+public class SystemDisasterRecoveryStorage implements ClusterResetStorage {
+    private static final ByteArray INIT_CONFIG_APPLIED_VAULT_KEY = new 
ByteArray("systemRecovery.initConfigApplied");
+    private static final ByteArray CLUSTER_STATE_VAULT_KEY = new 
ByteArray("systemRecovery.clusterState");
+    private static final ByteArray RESET_CLUSTER_MESSAGE_VAULT_KEY = new 
ByteArray("systemRecovery.resetClusterMessage");
+
+    private final VaultManager vault;
+
+    /** Constructor. */
+    public SystemDisasterRecoveryStorage(VaultManager vault) {
+        this.vault = vault;
+    }
+
+    @Override
+    public @Nullable ResetClusterMessage readResetClusterMessage() {
+        return readFromVault(RESET_CLUSTER_MESSAGE_VAULT_KEY);
+    }
+
+    @Override
+    public void removeResetClusterMessage() {
+        vault.remove(RESET_CLUSTER_MESSAGE_VAULT_KEY);
+    }
+
+    @Nullable ClusterState readClusterState() {
+        return readFromVault(CLUSTER_STATE_VAULT_KEY);
+    }
+
+    private <T> @Nullable T readFromVault(ByteArray key) {
+        VaultEntry entry = vault.get(key);
+        return entry != null ? ByteUtils.fromBytes(entry.value()) : null;
+    }
+
+    void saveClusterState(ClusterState clusterState) {
+        vault.put(CLUSTER_STATE_VAULT_KEY, ByteUtils.toBytes(clusterState));
+    }
+
+    boolean isInitConfigApplied() {
+        VaultEntry appliedEntry = vault.get(INIT_CONFIG_APPLIED_VAULT_KEY);
+        return appliedEntry != null;
+    }
+
+    void markInitConfigApplied() {
+        vault.put(INIT_CONFIG_APPLIED_VAULT_KEY, BYTE_EMPTY_ARRAY);
+    }
+
+    void saveResetClusterMessage(ResetClusterMessage message) {
+        vault.put(RESET_CLUSTER_MESSAGE_VAULT_KEY, ByteUtils.toBytes(message));
+    }
+}
diff --git 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index 013ec2b1cc..84cbd7c7d6 100644
--- 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++ 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -25,11 +25,14 @@ import static 
org.apache.ignite.internal.cluster.management.ClusterTag.randomClu
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
@@ -56,9 +59,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.cluster.management.ClusterState;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import 
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
-import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
-import 
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
-import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
 import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
 import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
 import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
@@ -96,8 +96,9 @@ import org.mockito.junit.jupiter.MockitoExtension;
 class SystemDisasterRecoveryManagerImplTest extends BaseIgniteAbstractTest {
     private static final String CLUSTER_NAME = "cluster";
 
-    private static final String NODE_INITIALIZED_VAULT_KEY = 
"systemRecovery.nodeInitialized";
-    private static final String RESET_CLUSTER_MESSAGE_VAULT_KEY = 
"systemRecovery.resetClusterMessage";
+    private static final ByteArray INIT_CONFIG_APPLIED_VAULT_KEY = new 
ByteArray("systemRecovery.initConfigApplied");
+    private static final ByteArray CLUSTER_STATE_VAULT_KEY = new 
ByteArray("systemRecovery.clusterState");
+    private static final ByteArray RESET_CLUSTER_MESSAGE_VAULT_KEY = new 
ByteArray("systemRecovery.resetClusterMessage");
 
     @WorkDirectory
     private Path workDir;
@@ -115,9 +116,6 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     @Mock
     private ServerRestarter restarter;
 
-    private final ClusterStateStorage clusterStateStorage = new 
TestClusterStateStorage();
-    private final ClusterStateStorageManager clusterStateStorageManager = new 
ClusterStateStorageManager(clusterStateStorage);
-
     private SystemDisasterRecoveryManagerImpl manager;
 
     private final ComponentContext componentContext = new ComponentContext();
@@ -154,8 +152,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
                 topologyService,
                 messagingService,
                 vaultManager,
-                restarter,
-                clusterStateStorage
+                restarter
         );
         assertThat(manager.startAsync(componentContext), 
willCompleteSuccessfully());
     }
@@ -167,12 +164,23 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void marksNodeInitialized() {
-        manager.markNodeInitialized();
+    void marksInitConfigApplied() {
+        manager.markInitConfigApplied();
+
+        VaultEntry entry = vaultManager.get(INIT_CONFIG_APPLIED_VAULT_KEY);
+        assertThat(entry, is(notNullValue()));
+        assertThat(entry.value(), is(notNullValue()));
+    }
+
+    @Test
+    void savesClusterState() {
+        manager.saveClusterState(usualClusterState);
 
-        VaultEntry entry = vaultManager.get(new 
ByteArray(NODE_INITIALIZED_VAULT_KEY));
+        VaultEntry entry = vaultManager.get(CLUSTER_STATE_VAULT_KEY);
         assertThat(entry, is(notNullValue()));
-        assertThat(entry.value(), is(new byte[]{1}));
+
+        ClusterState savedState = fromBytes(entry.value());
+        assertThat(savedState, is(equalTo(usualClusterState)));
     }
 
     @Test
@@ -210,7 +218,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     @Test
     void resetClusterRequiresClusterState() {
         when(topologyService.allMembers()).thenReturn(List.of(thisNode));
-        marksNodeInitialized();
+        markinitConfigApplied();
 
         ClusterResetException ex = assertWillThrow(
                 manager.resetCluster(List.of(thisNodeName)),
@@ -221,7 +229,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void resetClusterRequiresNodeToBeInitialized() {
+    void resetClusterRequiresInitConfigToBeApplied() {
         when(topologyService.allMembers()).thenReturn(List.of(thisNode));
         putClusterState();
 
@@ -230,15 +238,15 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
                 ClusterResetException.class,
                 10, SECONDS
         );
-        assertThat(ex.getMessage(), is("Node is not initialized and cannot 
serve as a cluster reset conductor."));
+        assertThat(ex.getMessage(), is("Initial configuration is not applied 
and cannot serve as a cluster reset conductor."));
     }
 
     private void putClusterState() {
-        clusterStateStorageManager.putClusterState(usualClusterState);
+        vaultManager.put(CLUSTER_STATE_VAULT_KEY, toBytes(usualClusterState));
     }
 
-    private void makeNodeInitialized() {
-        vaultManager.put(new ByteArray(NODE_INITIALIZED_VAULT_KEY), new 
byte[]{1});
+    private void markinitConfigApplied() {
+        vaultManager.put(INIT_CONFIG_APPLIED_VAULT_KEY, BYTE_EMPTY_ARRAY);
     }
 
     @Test
@@ -271,7 +279,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     }
 
     private void prepareNodeStateForClusterReset() {
-        makeNodeInitialized();
+        markinitConfigApplied();
         putClusterState();
     }
 
@@ -364,7 +372,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()), 
conductor, 0L);
 
         waitTillResetClusterMessageGetsSavedToVault();
-        VaultEntry entry = vaultManager.get(new 
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY));
+        VaultEntry entry = vaultManager.get(RESET_CLUSTER_MESSAGE_VAULT_KEY);
         assertThat(entry, is(notNullValue()));
 
         ResetClusterMessage savedMessage = fromBytes(entry.value());
@@ -373,7 +381,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     }
 
     private void waitTillResetClusterMessageGetsSavedToVault() throws 
InterruptedException {
-        assertTrue(waitForCondition(() -> vaultManager.get(new 
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)) != null, 10_000));
+        assertTrue(waitForCondition(() -> 
vaultManager.get(RESET_CLUSTER_MESSAGE_VAULT_KEY) != null, 10_000));
     }
 
     private NetworkMessageHandler extractMessageHandler() {
@@ -416,7 +424,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
 
         InOrder inOrder = inOrder(messagingService, vaultManager);
 
-        inOrder.verify(vaultManager, timeout(SECONDS.toMillis(10))).put(eq(new 
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)), any());
+        inOrder.verify(vaultManager, 
timeout(SECONDS.toMillis(10))).put(eq(RESET_CLUSTER_MESSAGE_VAULT_KEY), any());
         inOrder.verify(messagingService, 
timeout(SECONDS.toMillis(10))).respond(eq(conductor), messageCaptor.capture(), 
eq(123L));
 
         assertThat(messageCaptor.getValue(), 
instanceOf(SuccessResponseMessage.class));
@@ -433,9 +441,9 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
 
         InOrder inOrder = inOrder(messagingService, vaultManager, restarter);
 
-        inOrder.verify(vaultManager).put(eq(new 
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)), any());
-        inOrder.verify(messagingService).respond(eq(conductor), 
messageCaptor.capture(), eq(123L));
-        inOrder.verify(restarter).initiateRestart();
+        inOrder.verify(vaultManager, 
timeout(SECONDS.toMillis(10))).put(eq(RESET_CLUSTER_MESSAGE_VAULT_KEY), any());
+        inOrder.verify(messagingService, 
timeout(SECONDS.toMillis(10))).respond(eq(conductor), messageCaptor.capture(), 
eq(123L));
+        inOrder.verify(restarter, 
timeout(SECONDS.toMillis(10))).initiateRestart();
 
         assertThat(messageCaptor.getValue(), 
instanceOf(SuccessResponseMessage.class));
     }
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 3b98808ca9..fb4e317081 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -151,6 +151,7 @@ dependencies {
     integrationTestImplementation project(':ignite-system-view')
     integrationTestImplementation project(':ignite-partition-replicator')
     integrationTestImplementation project(':ignite-configuration-root')
+    integrationTestImplementation project(':ignite-system-disaster-recovery')
     integrationTestImplementation(testFixtures(project))
     integrationTestImplementation(testFixtures(project(':ignite-core')))
     integrationTestImplementation(testFixtures(project(':ignite-table')))
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 9d26e72700..5f547dac2d 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -127,6 +127,7 @@ import 
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationSt
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
 import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
@@ -1194,6 +1195,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
+                    new SystemDisasterRecoveryStorage(vaultManager),
                     clusterService,
                     clusterInitializer,
                     raftManager,
diff --git a/settings.gradle b/settings.gradle
index b6a2e84ec4..1f1ffaf2c2 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -90,6 +90,7 @@ include(":ignite-low-watermark")
 include(":ignite-partition-replicator")
 include(':ignite-configuration-root')
 include(':ignite-system-disaster-recovery')
+include(':ignite-system-disaster-recovery-api')
 
 project(":ignite-examples").projectDir = file('examples')
 project(":ignite-dev-utilities").projectDir = file('dev-utilities')
@@ -165,6 +166,7 @@ project(":ignite-low-watermark").projectDir = 
file('modules/low-watermark')
 project(":ignite-partition-replicator").projectDir = 
file('modules/partition-replicator')
 project(":ignite-configuration-root").projectDir = 
file('modules/configuration-root')
 project(":ignite-system-disaster-recovery").projectDir = 
file('modules/system-disaster-recovery')
+project(":ignite-system-disaster-recovery-api").projectDir = 
file('modules/system-disaster-recovery-api')
 
 ext.isCiServer = System.getenv().containsKey("IGNITE_CI")
 


Reply via email to