This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c65c1fb57 IGNITE-22876 Implement cluster reset initiation (#4272)
6c65c1fb57 is described below

commit 6c65c1fb573e09ee25f16e467e976342ce1cec6a
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Aug 28 19:09:16 2024 +0400

    IGNITE-22876 Implement cluster reset initiation (#4272)
---
 .../internal/cluster/management/ClusterState.java  |   8 +
 .../raft/ClusterStateStorageManager.java           |   2 +-
 .../ignite/internal/util/CompletableFutures.java   |   2 +-
 modules/low-watermark/build.gradle                 |   1 -
 .../metastorage/impl/MetaStorageManagerImpl.java   |   2 +-
 modules/runner/build.gradle                        |   1 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |  18 +-
 .../ignite/internal/app/IgniteServerImpl.java      |   2 +-
 modules/system-disaster-recovery/README.md         |   3 +
 .../build.gradle                                   |  18 +-
 .../disaster/system/ClusterResetException.java     |  30 ++
 .../internal/disaster/system/ServerRestarter.java  |  29 ++
 .../system/SystemDisasterRecoveryManager.java      |  40 ++
 .../system/SystemDisasterRecoveryManagerImpl.java  | 288 +++++++++++++
 .../system/message/ResetClusterMessage.java        |  61 +++
 .../SystemDisasterRecoveryMessageGroup.java        |  31 ++
 .../SystemDisasterRecoveryManagerImplTest.java     | 456 +++++++++++++++++++++
 settings.gradle                                    |   2 +
 18 files changed, 974 insertions(+), 20 deletions(-)

diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
index fba541c0c5..66277cfbd1 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.cluster.management;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
@@ -68,4 +70,10 @@ public interface ClusterState extends NetworkMessage, 
Serializable {
      */
     @Nullable
     String initialClusterConfiguration();
+
+    /**
+     * Returns IDs this cluster had before ({@code null} if this is the first 
incarnation).
+     */
+    @Nullable
+    List<UUID> formerClusterIds();
 }
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
index bbe8697912..a483d48d3c 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
@@ -62,7 +62,7 @@ public class ClusterStateStorageManager {
      *
      * @param state Cluster state.
      */
-    void putClusterState(ClusterState state) {
+    public void putClusterState(ClusterState state) {
         storage.put(CMG_STATE_KEY, toBytes(state));
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
index 8d2fca1ff0..5839e3dd30 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
@@ -116,7 +116,7 @@ public class CompletableFutures {
      *
      * @param futures List of futures.
      */
-    public static CompletableFuture<Void> 
allOf(Collection<CompletableFuture<?>> futures) {
+    public static CompletableFuture<Void> allOf(Collection<? extends 
CompletableFuture<?>> futures) {
         return futures.isEmpty() ? nullCompletedFuture() : 
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
     }
 
diff --git a/modules/low-watermark/build.gradle 
b/modules/low-watermark/build.gradle
index 99dd52976b..4612e604ee 100644
--- a/modules/low-watermark/build.gradle
+++ b/modules/low-watermark/build.gradle
@@ -28,7 +28,6 @@ dependencies {
     implementation project(':ignite-core')
     implementation project(':ignite-failure-handler')
     implementation project(':ignite-network-api')
-    implementation project(':ignite-network-api')
     implementation project(':ignite-vault')
     implementation project(':ignite-configuration-api')
     implementation project(':ignite-replicator')
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 93c99c0aba..f1db841f2d 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -211,7 +211,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         electionListeners.add(listener);
     }
 
-    private CompletableFuture<Long> recover(MetaStorageServiceImpl service) {
+    private CompletableFuture<Long> recover(MetaStorageService service) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new NodeStoppingException());
         }
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 8b8a018c84..e95aa67397 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -89,6 +89,7 @@ dependencies {
     implementation project(':ignite-low-watermark')
     implementation project(':ignite-partition-replicator')
     implementation project(':ignite-catalog-compaction')
+    implementation project(':ignite-system-disaster-recovery')
     implementation libs.jetbrains.annotations
     implementation libs.micronaut.inject
     implementation libs.micronaut.validation
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 44c8c98fd7..2ec53343ea 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -120,6 +120,8 @@ import 
org.apache.ignite.internal.deployunit.DeploymentManagerImpl;
 import org.apache.ignite.internal.deployunit.IgniteDeployment;
 import 
org.apache.ignite.internal.deployunit.configuration.DeploymentExtensionConfiguration;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl;
+import org.apache.ignite.internal.disaster.system.ServerRestarter;
+import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryManagerImpl;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
 import 
org.apache.ignite.internal.eventlog.config.schema.EventLogExtensionConfiguration;
@@ -417,6 +419,8 @@ public class IgniteImpl implements Ignite {
     /** Remote triggered resources registry. */
     private final RemotelyTriggeredResourceRegistry resourcesRegistry;
 
+    private final SystemDisasterRecoveryManagerImpl 
systemDisasterRecoveryManager;
+
     private final IgniteTables publicTables;
     private final IgniteTransactions publicTransactions;
     private final IgniteSql publicSql;
@@ -446,6 +450,7 @@ public class IgniteImpl implements Ignite {
      */
     IgniteImpl(
             IgniteServer node,
+            ServerRestarter restarter,
             Path configPath,
             Path workDir,
             @Nullable ClassLoader serviceProviderClassLoader,
@@ -543,6 +548,15 @@ public class IgniteImpl implements Ignite {
                 failureProcessor
         );
 
+        systemDisasterRecoveryManager = new SystemDisasterRecoveryManagerImpl(
+                name,
+                clusterSvc.topologyService(),
+                clusterSvc.messagingService(),
+                vaultMgr,
+                restarter,
+                clusterStateStorage
+        );
+
         clock = new HybridClockImpl();
 
         clockWaiter = new ClockWaiter(name, clock);
@@ -1192,6 +1206,7 @@ public class IgniteImpl implements Ignite {
                     nettyBootstrapFactory,
                     nettyWorkersRegistrar,
                     clusterSvc,
+                    systemDisasterRecoveryManager,
                     restComponent,
                     partitionsLogStorageFactory,
                     msLogStorageFactory,
@@ -1601,7 +1616,8 @@ public class IgniteImpl implements Ignite {
                                     
clusterCfgMgr.configurationRegistry().initializeConfigurationWith(hoconSource);
                                 }, startupExecutor);
                     }
-                }, startupExecutor);
+                }, startupExecutor)
+                
.thenRunAsync(systemDisasterRecoveryManager::markNodeInitialized, 
startupExecutor);
     }
 
     /**
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
index 005ec204bd..c9a0a2982a 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
@@ -329,7 +329,7 @@ public class IgniteServerImpl implements IgniteServer {
     }
 
     private CompletableFuture<Void> doStartAsync() {
-        IgniteImpl instance = new IgniteImpl(this, configPath, workDir, 
classLoader, asyncContinuationExecutor);
+        IgniteImpl instance = new IgniteImpl(this, this::restartAsync, 
configPath, workDir, classLoader, asyncContinuationExecutor);
 
         ackBanner();
 
diff --git a/modules/system-disaster-recovery/README.md 
b/modules/system-disaster-recovery/README.md
new file mode 100644
index 0000000000..f62882b30e
--- /dev/null
+++ b/modules/system-disaster-recovery/README.md
@@ -0,0 +1,3 @@
+# System disaster recovery module
+
+This module provides capability to recover from disasters related to the 
system Raft groups (CMG and Metastorage group).
diff --git a/modules/low-watermark/build.gradle 
b/modules/system-disaster-recovery/build.gradle
similarity index 66%
copy from modules/low-watermark/build.gradle
copy to modules/system-disaster-recovery/build.gradle
index 99dd52976b..333b8e7938 100644
--- a/modules/low-watermark/build.gradle
+++ b/modules/system-disaster-recovery/build.gradle
@@ -21,29 +21,19 @@ apply from: "$rootDir/buildscripts/java-junit5.gradle"
 apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
 
 dependencies {
-    annotationProcessor project(':ignite-configuration-annotation-processor')
-    annotationProcessor project(':ignite-network-annotation-processor')
-    annotationProcessor libs.auto.service
+    annotationProcessor project(":ignite-network-annotation-processor")
 
     implementation project(':ignite-core')
-    implementation project(':ignite-failure-handler')
-    implementation project(':ignite-network-api')
     implementation project(':ignite-network-api')
     implementation project(':ignite-vault')
-    implementation project(':ignite-configuration-api')
-    implementation project(':ignite-replicator')
-    implementation project(':ignite-schema')
+    implementation project(':ignite-cluster-management')
     implementation libs.jetbrains.annotations
 
     testImplementation libs.hamcrest.core
     testImplementation libs.mockito.core
     testImplementation libs.mockito.junit
-    testImplementation project(":ignite-schema")
     testImplementation testFixtures(project(":ignite-core"))
-    testImplementation testFixtures(project(":ignite-configuration"))
-
-    testFixturesImplementation project(':ignite-core')
-    testFixturesImplementation libs.jetbrains.annotations
+    testImplementation testFixtures(project(":ignite-cluster-management"))
 }
 
-description = 'ignite-low-watermark'
+description = 'ignite-system-disaster-recovery'
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterResetException.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterResetException.java
new file mode 100644
index 0000000000..3f6009e203
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterResetException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.lang.ErrorGroups.Common;
+
+/**
+ * Thrown when a cluster reset cannot be initiated.
+ */
+public class ClusterResetException extends IgniteInternalException {
+    public ClusterResetException(String msg) {
+        super(Common.INTERNAL_ERR, msg);
+    }
+}
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ServerRestarter.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ServerRestarter.java
new file mode 100644
index 0000000000..644f9006a7
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ServerRestarter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+/**
+ * Knows how to initiate an Ignite node restart.
+ */
+@FunctionalInterface
+public interface ServerRestarter {
+    /**
+     * Initiates server restart.
+     */
+    void initiateRestart();
+}
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
new file mode 100644
index 0000000000..0cb06071fb
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+
+/**
+ * Manages disaster recovery of system groups, namely the Cluster Management 
Group (CMG) and the Metastorage group (MG).
+ */
+public interface SystemDisasterRecoveryManager {
+    /**
+     * Marks this node as initialized.
+     */
+    void markNodeInitialized();
+
+    /**
+     * Initiates cluster reset.
+     *
+     * @param proposedCmgConsistentIds Names of the nodes that will be the new 
CMG nodes.
+     * @return Future completing with the result of the operation ({@link 
ResetClusterMessage} in case of error related to reset logic).
+     */
+    CompletableFuture<Void> resetCluster(List<String> 
proposedCmgConsistentIds);
+}
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
new file mode 100644
index 0000000000..8db5db4a08
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.Objects.requireNonNullElse;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import 
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implementation of {@link SystemDisasterRecoveryManager}.
+ */
+public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecoveryManager, IgniteComponent {
+    private static final String NODE_INITIALIZED_VAULT_KEY = 
"systemRecovery.nodeInitialized";
+    private static final String RESET_CLUSTER_MESSAGE_VAULT_KEY = 
"systemRecovery.resetClusterMessage";
+
+    private final String thisNodeName;
+    private final TopologyService topologyService;
+    private final MessagingService messagingService;
+    private final VaultManager vaultManager;
+    private final ServerRestarter restarter;
+
+    private final ClusterStateStorageManager clusterStateStorageManager;
+
+    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
+    private static final CmgMessagesFactory cmgMessagesFactory = new 
CmgMessagesFactory();
+
+    /** This executor spawns a thread per task and should only be used for 
very rare tasks. */
+    private final Executor restartExecutor;
+
+    /** Constructor. */
+    public SystemDisasterRecoveryManagerImpl(
+            String thisNodeName,
+            TopologyService topologyService,
+            MessagingService messagingService,
+            VaultManager vaultManager,
+            ServerRestarter restarter,
+            ClusterStateStorage clusterStateStorage
+    ) {
+        this.thisNodeName = thisNodeName;
+        this.topologyService = topologyService;
+        this.messagingService = messagingService;
+        this.vaultManager = vaultManager;
+        this.restarter = restarter;
+
+        clusterStateStorageManager = new 
ClusterStateStorageManager(clusterStateStorage);
+
+        restartExecutor = new ThreadPerTaskExecutor(thisNodeName + 
"-restart-");
+    }
+
+    @Override
+    public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
+        
messagingService.addMessageHandler(SystemDisasterRecoveryMessageGroup.class, 
(message, sender, correlationId) -> {
+            if (message instanceof ResetClusterMessage) {
+                assert correlationId != null;
+                handleResetClusterMessage((ResetClusterMessage) message, 
sender, correlationId);
+            }
+        });
+
+        return nullCompletedFuture();
+    }
+
+    private void handleResetClusterMessage(ResetClusterMessage message, 
ClusterNode sender, long correlationId) {
+        restartExecutor.execute(() -> {
+            vaultManager.put(new ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY), 
toBytes(message));
+
+            messagingService.respond(sender, successResponseMessage(), 
correlationId)
+                    .thenRunAsync(() -> {
+                        if (!thisNodeName.equals(message.conductor())) {
+                            restarter.initiateRestart();
+                        }
+                    }, restartExecutor);
+        });
+    }
+
+    private SuccessResponseMessage successResponseMessage() {
+        return cmgMessagesFactory.successResponseMessage().build();
+    }
+
+    @Override
+    public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
+        return nullCompletedFuture();
+    }
+
+    @Override
+    public void markNodeInitialized() {
+        vaultManager.put(new ByteArray(NODE_INITIALIZED_VAULT_KEY), new 
byte[]{1});
+    }
+
+    @Override
+    public CompletableFuture<Void> resetCluster(List<String> 
proposedCmgConsistentIds) {
+        try {
+            return doResetCluster(proposedCmgConsistentIds);
+        } catch (ClusterResetException e) {
+            return failedFuture(e);
+        }
+    }
+
+    private CompletableFuture<Void> doResetCluster(List<String> 
proposedCmgConsistentIds) {
+        ensureNoRepetitions(proposedCmgConsistentIds);
+        ensureContainsThisNodeName(proposedCmgConsistentIds);
+
+        Collection<ClusterNode> nodesInTopology = topologyService.allMembers();
+        ensureAllProposedCmgNodesAreInTopology(proposedCmgConsistentIds, 
nodesInTopology);
+
+        ensureNodeIsInitialized();
+        ClusterState clusterState = ensureClusterStateIsPresent();
+
+        ResetClusterMessage message = buildResetClusterMessage(
+                proposedCmgConsistentIds,
+                clusterState
+        );
+
+        Map<String, CompletableFuture<NetworkMessage>> responseFutures = 
sendResetClusterMessageTo(
+                nodesInTopology,
+                message
+        );
+
+        return allOf(responseFutures.values())
+                .handleAsync((res, ex) -> {
+                    // We ignore upstream exceptions on purpose.
+
+                    if (isMajorityOfCmgAreSuccesses(proposedCmgConsistentIds, 
responseFutures)) {
+                        restarter.initiateRestart();
+
+                        return null;
+                    } else {
+                        throw new ClusterResetException("Did not get 
successful responses from new CMG majority, failing cluster reset.");
+                    }
+                }, restartExecutor);
+    }
+
+    private Map<String, CompletableFuture<NetworkMessage>> 
sendResetClusterMessageTo(Collection<ClusterNode> nodesInTopology,
+            ResetClusterMessage message) {
+        Map<String, CompletableFuture<NetworkMessage>> responseFutures = new 
HashMap<>();
+        for (ClusterNode node : nodesInTopology) {
+            responseFutures.put(node.name(), messagingService.invoke(node, 
message, 10_000));
+        }
+        return responseFutures;
+    }
+
+    private void ensureNodeIsInitialized() {
+        VaultEntry initializedEntry = vaultManager.get(new 
ByteArray(NODE_INITIALIZED_VAULT_KEY));
+        if (initializedEntry == null) {
+            throw new ClusterResetException("Node is not initialized and 
cannot serve as a cluster reset conductor.");
+        }
+    }
+
+    private static void ensureNoRepetitions(List<String> 
proposedCmgConsistentIds) {
+        if (new HashSet<>(proposedCmgConsistentIds).size() != 
proposedCmgConsistentIds.size()) {
+            throw new ClusterResetException("New CMG node consistentIds have 
repetitions: " + proposedCmgConsistentIds + ".");
+        }
+    }
+
+    private void ensureContainsThisNodeName(List<String> 
proposedCmgConsistentIds) {
+        if (!proposedCmgConsistentIds.contains(thisNodeName)) {
+            throw new ClusterResetException("Current node is not contained in 
the new CMG, so it cannot conduct a cluster reset.");
+        }
+    }
+
+    private static void ensureAllProposedCmgNodesAreInTopology(
+            List<String> proposedCmgConsistentIds,
+            Collection<ClusterNode> nodesInTopology
+    ) {
+        Set<String> consistentIdsOfNodesInTopology = 
nodesInTopology.stream().map(ClusterNode::name).collect(toSet());
+
+        Set<String> notInTopology = new HashSet<>(proposedCmgConsistentIds);
+        notInTopology.removeAll(consistentIdsOfNodesInTopology);
+
+        if (!notInTopology.isEmpty()) {
+            throw new ClusterResetException("Some of proposed CMG nodes are 
not online: " + notInTopology + ".");
+        }
+    }
+
+    private ClusterState ensureClusterStateIsPresent() {
+        ClusterState clusterState = 
clusterStateStorageManager.getClusterState();
+        if (clusterState == null) {
+            throw new ClusterResetException("Node does not have cluster 
state.");
+        }
+        return clusterState;
+    }
+
+    private ResetClusterMessage buildResetClusterMessage(List<String> 
proposedCmgConsistentIds, ClusterState clusterState) {
+        List<UUID> formerClusterIds = new 
ArrayList<>(requireNonNullElse(clusterState.formerClusterIds(), new 
ArrayList<>()));
+        formerClusterIds.add(clusterState.clusterTag().clusterId());
+
+        return messagesFactory.resetClusterMessage()
+                .conductor(thisNodeName)
+                .cmgNodes(new HashSet<>(proposedCmgConsistentIds))
+                .metaStorageNodes(clusterState.metaStorageNodes())
+                .clusterName(clusterState.clusterTag().clusterName())
+                .clusterId(randomUUID())
+                .formerClusterIds(formerClusterIds)
+                .build();
+    }
+
+    private static boolean isMajorityOfCmgAreSuccesses(
+            List<String> proposedCmgConsistentIds,
+            Map<String, CompletableFuture<NetworkMessage>> responseFutures
+    ) {
+        Set<String> newCmgNodesSet = new HashSet<>(proposedCmgConsistentIds);
+        List<CompletableFuture<NetworkMessage>> futuresFromNewCmg = 
responseFutures.entrySet().stream()
+                .filter(entry -> newCmgNodesSet.contains(entry.getKey()))
+                .map(Entry::getValue)
+                .collect(toList());
+
+        assert futuresFromNewCmg.size() == proposedCmgConsistentIds.size()
+                : futuresFromNewCmg.size() + " futures, but " + 
proposedCmgConsistentIds.size() + " nodes";
+
+        long successes = futuresFromNewCmg.stream()
+                .filter(CompletableFutures::isCompletedSuccessfully)
+                .count();
+
+        return successes >= (futuresFromNewCmg.size() + 1) / 2;
+    }
+
+    /**
+     * An executor that spawns a thread per task. This is very inefficient for 
scenarios with many tasks,
+     * but we use it here for very rare tasks; it has an advantage that it 
doesn't need to be shut down
+     * (node restart is performed in it, so it might cause troubles to use 
normal pools created in the Ignite node).
+     */
+    private static class ThreadPerTaskExecutor implements Executor {
+        private final String threadNamePrefix;
+
+        private ThreadPerTaskExecutor(String threadNamePrefix) {
+            this.threadNamePrefix = threadNamePrefix;
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            Thread thread = new Thread(command);
+
+            thread.setName(threadNamePrefix + thread.getId());
+            thread.setDaemon(true);
+
+            thread.start();
+        }
+    }
+}
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
new file mode 100644
index 0000000000..7acc6bff0f
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system.message;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+
+/**
+ * Message for resetting the cluster.
+ */
+@Transferable(SystemDisasterRecoveryMessageGroup.RESET_CLUSTER)
+public interface ResetClusterMessage extends NetworkMessage, Serializable {
+    /**
+     * Consistent IDs of nodes that will host the new CMG.
+     */
+    Set<String> cmgNodes();
+
+    /**
+     * Consistent IDs of nodes that host the Meta Storage.
+     */
+    Set<String> metaStorageNodes();
+
+    /**
+     * Name of the cluster that will be a part of the generated cluster tag.
+     */
+    String clusterName();
+
+    /**
+     * New ID of the cluster.
+     */
+    UUID clusterId();
+
+    /**
+     * IDs that the cluster had before (including the current incarnation by 
which this message is sent).
+     */
+    List<UUID> formerClusterIds();
+
+    /**
+     * Consistent ID of the node that conducts the cluster reset.
+     */
+    String conductor();
+}
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
new file mode 100644
index 0000000000..c6f0b0fa07
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system.message;
+
+import org.apache.ignite.internal.network.annotations.MessageGroup;
+
+/**
+ * Message Group for disaster recovery of system groups.
+ */
+@MessageGroup(groupType = 15, groupName = "SystemDisasterRecoveryMessages")
+public class SystemDisasterRecoveryMessageGroup {
+    /**
+     * Message type for {@link ResetClusterMessage}.
+     */
+    public static final short RESET_CLUSTER = 1;
+}
diff --git 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
new file mode 100644
index 0000000000..27c5813fc8
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.cluster.management.ClusterTag.randomClusterTag;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import 
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.NetworkMessageHandler;
+import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.properties.IgniteProductVersion;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+class SystemDisasterRecoveryManagerImplTest extends BaseIgniteAbstractTest {
+    private static final String CLUSTER_NAME = "cluster";
+
+    private static final String NODE_INITIALIZED_VAULT_KEY = 
"systemRecovery.nodeInitialized";
+    private static final String RESET_CLUSTER_MESSAGE_VAULT_KEY = 
"systemRecovery.resetClusterMessage";
+
+    @WorkDirectory
+    private Path workDir;
+
+    private static final String thisNodeName = "node1";
+
+    @Mock
+    private TopologyService topologyService;
+
+    @Mock
+    private MessagingService messagingService;
+
+    private VaultManager vaultManager;
+
+    @Mock
+    private ServerRestarter restarter;
+
+    private final ClusterStateStorage clusterStateStorage = new 
TestClusterStateStorage();
+    private final ClusterStateStorageManager clusterStateStorageManager = new 
ClusterStateStorageManager(clusterStateStorage);
+
+    private SystemDisasterRecoveryManagerImpl manager;
+
+    private final ComponentContext componentContext = new ComponentContext();
+
+    private final ClusterNode thisNode = new 
ClusterNodeImpl(randomUUID().toString(), thisNodeName, new 
NetworkAddress("host", 1001));
+
+    private final ClusterNode node2 = new 
ClusterNodeImpl(randomUUID().toString(), "node2", new NetworkAddress("host", 
1002));
+    private final ClusterNode node3 = new 
ClusterNodeImpl(randomUUID().toString(), "node3", new NetworkAddress("host", 
1003));
+    private final ClusterNode node4 = new 
ClusterNodeImpl(randomUUID().toString(), "node4", new NetworkAddress("host", 
1004));
+    private final ClusterNode node5 = new 
ClusterNodeImpl(randomUUID().toString(), "node5", new NetworkAddress("host", 
1005));
+
+    private final CmgMessagesFactory cmgMessagesFactory = new 
CmgMessagesFactory();
+    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
+
+    private final ClusterState usualClusterState = 
cmgMessagesFactory.clusterState()
+            .cmgNodes(Set.of(thisNodeName))
+            .metaStorageNodes(Set.of(thisNodeName))
+            .version(IgniteProductVersion.CURRENT_VERSION.toString())
+            .clusterTag(randomClusterTag(cmgMessagesFactory, CLUSTER_NAME))
+            .build();
+
+    private final SuccessResponseMessage successResponseMessage = 
cmgMessagesFactory.successResponseMessage().build();
+
+    @BeforeEach
+    void init() {
+        vaultManager = spy(new VaultManager(new 
PersistentVaultService(workDir.resolve("vault"))));
+        assertThat(vaultManager.startAsync(componentContext), 
willCompleteSuccessfully());
+
+        lenient().when(messagingService.respond(any(ClusterNode.class), 
any(NetworkMessage.class), anyLong()))
+                .thenReturn(nullCompletedFuture());
+
+        manager = new SystemDisasterRecoveryManagerImpl(
+                thisNodeName,
+                topologyService,
+                messagingService,
+                vaultManager,
+                restarter,
+                clusterStateStorage
+        );
+        assertThat(manager.startAsync(componentContext), 
willCompleteSuccessfully());
+    }
+
+    @AfterEach
+    void cleanup() {
+        assertThat(vaultManager.stopAsync(), willCompleteSuccessfully());
+        assertThat(manager.stopAsync(), willCompleteSuccessfully());
+    }
+
+    @Test
+    void marksNodeInitialized() {
+        manager.markNodeInitialized();
+
+        VaultEntry entry = vaultManager.get(new 
ByteArray(NODE_INITIALIZED_VAULT_KEY));
+        assertThat(entry, is(notNullValue()));
+        assertThat(entry.value(), is(new byte[]{1}));
+    }
+
+    @Test
+    void resetClusterRejectsDuplicateNodeNames() {
+        ClusterResetException ex = assertWillThrow(
+                manager.resetCluster(List.of(thisNodeName, thisNodeName)),
+                ClusterResetException.class,
+                10, SECONDS
+        );
+        assertThat(ex.getMessage(), is("New CMG node consistentIds have 
repetitions: [node1, node1]."));
+    }
+
+    @Test
+    void resetClusterRequiresThisNodeToBeNewCmg() {
+        ClusterResetException ex = assertWillThrow(
+                manager.resetCluster(List.of("abc")),
+                ClusterResetException.class,
+                10, SECONDS
+        );
+        assertThat(ex.getMessage(), is("Current node is not contained in the 
new CMG, so it cannot conduct a cluster reset."));
+    }
+
+    @Test
+    void resetClusterRequiresNewCmgNodesToBeOnline() {
+        when(topologyService.allMembers()).thenReturn(List.of(thisNode));
+
+        ClusterResetException ex = assertWillThrow(
+                manager.resetCluster(List.of(thisNodeName, "abc")),
+                ClusterResetException.class,
+                10, SECONDS
+        );
+        assertThat(ex.getMessage(), is("Some of proposed CMG nodes are not 
online: [abc]."));
+    }
+
+    @Test
+    void resetClusterRequiresClusterState() {
+        when(topologyService.allMembers()).thenReturn(List.of(thisNode));
+        marksNodeInitialized();
+
+        ClusterResetException ex = assertWillThrow(
+                manager.resetCluster(List.of(thisNodeName)),
+                ClusterResetException.class,
+                10, SECONDS
+        );
+        assertThat(ex.getMessage(), is("Node does not have cluster state."));
+    }
+
+    @Test
+    void resetClusterRequiresNodeToBeInitialized() {
+        when(topologyService.allMembers()).thenReturn(List.of(thisNode));
+        putClusterState();
+
+        ClusterResetException ex = assertWillThrow(
+                manager.resetCluster(List.of(thisNodeName)),
+                ClusterResetException.class,
+                10, SECONDS
+        );
+        assertThat(ex.getMessage(), is("Node is not initialized and cannot 
serve as a cluster reset conductor."));
+    }
+
+    private void putClusterState() {
+        clusterStateStorageManager.putClusterState(usualClusterState);
+    }
+
+    private void makeNodeInitialized() {
+        vaultManager.put(new ByteArray(NODE_INITIALIZED_VAULT_KEY), new 
byte[]{1});
+    }
+
+    @Test
+    void resetClusterSendsMessages() {
+        ArgumentCaptor<ResetClusterMessage> messageCaptor = 
ArgumentCaptor.forClass(ResetClusterMessage.class);
+
+        when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2, 
node3));
+        prepareNodeStateForClusterReset();
+
+        when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+                .thenReturn(completedFuture(successResponseMessage));
+
+        CompletableFuture<Void> future = 
manager.resetCluster(List.of(thisNodeName, node2.name()));
+        assertThat(future, willCompleteSuccessfully());
+
+        verify(messagingService).invoke(eq(thisNode), messageCaptor.capture(), 
anyLong());
+        ResetClusterMessage messageToSelf = messageCaptor.getValue();
+        assertThatResetClusterMessageIsAsExpected(messageToSelf);
+
+        verify(messagingService).invoke(eq(node2), messageCaptor.capture(), 
anyLong());
+        ResetClusterMessage messageToOtherNewCmgNode = 
messageCaptor.getValue();
+        assertThatResetClusterMessageIsAsExpected(messageToOtherNewCmgNode);
+
+        verify(messagingService).invoke(eq(node3), messageCaptor.capture(), 
anyLong());
+        ResetClusterMessage messageToOtherNonCmgNode = 
messageCaptor.getValue();
+        assertThatResetClusterMessageIsAsExpected(messageToOtherNonCmgNode);
+
+        assertThat(messageToSelf.clusterId(), 
is(messageToOtherNewCmgNode.clusterId()));
+        assertThat(messageToSelf.clusterId(), 
is(messageToOtherNonCmgNode.clusterId()));
+    }
+
+    private void prepareNodeStateForClusterReset() {
+        makeNodeInitialized();
+        putClusterState();
+    }
+
+    private void assertThatResetClusterMessageIsAsExpected(ResetClusterMessage 
message) {
+        assertThatResetClusterMessageContentIsAsExpected(message, 
thisNodeName);
+    }
+
+    private void assertThatResetClusterMessageContentIsAsExpected(@Nullable 
ResetClusterMessage message, String expectedConductor) {
+        assertThat(message, is(notNullValue()));
+        assertThat(message.cmgNodes(), containsInAnyOrder(thisNodeName, 
node2.name()));
+        assertThat(message.metaStorageNodes(), 
is(usualClusterState.metaStorageNodes()));
+        assertThat(message.clusterName(), is(CLUSTER_NAME));
+        assertThat(message.clusterId(), 
is(not(usualClusterState.clusterTag().clusterId())));
+        assertThat(message.formerClusterIds(), 
contains(usualClusterState.clusterTag().clusterId()));
+        assertThat(message.conductor(), is(expectedConductor));
+    }
+
+    @Test
+    void resetClusterInitiatesRestartOnSuccess() {
+        when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2, 
node3));
+        prepareNodeStateForClusterReset();
+
+        when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+                .thenReturn(completedFuture(successResponseMessage));
+
+        CompletableFuture<Void> future = 
manager.resetCluster(List.of(thisNodeName, node2.name(), node3.name()));
+        assertThat(future, willCompleteSuccessfully());
+
+        verify(restarter).initiateRestart();
+    }
+
+    @Test
+    void resetClusterInitiatesRestartWhenMajorityOfCmgNodesRespondsWithOk() {
+        when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2, 
node3, node4, node5));
+        prepareNodeStateForClusterReset();
+
+        respondSuccessfullyFrom(thisNode, node2);
+        respondWithExceptionFrom(node3, node4, node5);
+
+        CompletableFuture<Void> future = 
manager.resetCluster(List.of(thisNodeName, node2.name(), node3.name()));
+        assertThat(future, willCompleteSuccessfully());
+
+        verify(restarter).initiateRestart();
+    }
+
+    private void respondSuccessfullyFrom(ClusterNode... nodes) {
+        for (ClusterNode node : nodes) {
+            respondSuccessfullyFrom(node);
+        }
+    }
+
+    private void respondSuccessfullyFrom(ClusterNode node) {
+        when(messagingService.invoke(eq(node), any(), anyLong()))
+                .thenReturn(completedFuture(successResponseMessage));
+    }
+
+    private void respondWithExceptionFrom(ClusterNode... nodes) {
+        for (ClusterNode node : nodes) {
+            respondWithExceptionFrom(node);
+        }
+    }
+
+    private void respondWithExceptionFrom(ClusterNode node) {
+        when(messagingService.invoke(eq(node), any(), anyLong()))
+                .thenReturn(failedFuture(new TimeoutException()));
+    }
+
+    @Test
+    @DisplayName("resetCluster() fails and does not restart when majority of 
new CMG nodes do not respond")
+    void resetClusterFailsWhenNewCmgMajorityDoesNotRespond() {
+        when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2, 
node3, node4, node5));
+        prepareNodeStateForClusterReset();
+
+        respondSuccessfullyFrom(thisNode, node4, node5);
+        respondWithExceptionFrom(node2, node3);
+
+        CompletableFuture<Void> future = 
manager.resetCluster(List.of(thisNodeName, node2.name(), node3.name()));
+        ClusterResetException ex = assertWillThrow(future, 
ClusterResetException.class, 10, SECONDS);
+        assertThat(ex.getMessage(), is("Did not get successful responses from 
new CMG majority, failing cluster reset."));
+
+        verify(restarter, never()).initiateRestart();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void savesToVaultWhenGetsMessage(boolean fromSelf) throws Exception {
+        NetworkMessageHandler handler = extractMessageHandler();
+
+        ClusterNode conductor = fromSelf ? thisNode : node2;
+        handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()), 
conductor, 0L);
+
+        assertTrue(waitForCondition(() -> vaultManager.get(new 
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)) != null, 10_000));
+        VaultEntry entry = vaultManager.get(new 
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY));
+        assertThat(entry, is(notNullValue()));
+
+        ResetClusterMessage savedMessage = fromBytes(entry.value());
+
+        assertThatResetClusterMessageContentIsAsExpected(savedMessage, 
conductor.name());
+    }
+
+    private NetworkMessageHandler extractMessageHandler() {
+        assertThat(manager.stopAsync(), willCompleteSuccessfully());
+
+        var handlerRef = new AtomicReference<NetworkMessageHandler>();
+
+        doAnswer(invocation -> {
+            handlerRef.set(invocation.getArgument(1));
+            return null;
+        
}).when(messagingService).addMessageHandler(eq(SystemDisasterRecoveryMessageGroup.class),
 any());
+
+        assertThat(manager.startAsync(componentContext), 
willCompleteSuccessfully());
+
+        NetworkMessageHandler handler = handlerRef.get();
+        assertThat("Handler was not installed", handler, is(notNullValue()));
+
+        return handler;
+    }
+
+    private ResetClusterMessage resetClusterMessageOn2Nodes(String 
conductorName) {
+        return messagesFactory.resetClusterMessage()
+                .cmgNodes(Set.of(thisNodeName, node2.name()))
+                .metaStorageNodes(usualClusterState.metaStorageNodes())
+                .clusterName(CLUSTER_NAME)
+                .clusterId(randomUUID())
+                
.formerClusterIds(List.of(usualClusterState.clusterTag().clusterId()))
+                .conductor(conductorName)
+                .build();
+    }
+
+    @Test
+    void respondsWhenGetsMessageFromSelf() {
+        ArgumentCaptor<NetworkMessage> messageCaptor = 
ArgumentCaptor.forClass(NetworkMessage.class);
+
+        NetworkMessageHandler handler = extractMessageHandler();
+        ClusterNode conductor = thisNode;
+
+        handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()), 
conductor, 123L);
+
+        InOrder inOrder = inOrder(messagingService, vaultManager);
+
+        inOrder.verify(vaultManager, timeout(SECONDS.toMillis(10))).put(eq(new 
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)), any());
+        inOrder.verify(messagingService, 
timeout(SECONDS.toMillis(10))).respond(eq(conductor), messageCaptor.capture(), 
eq(123L));
+
+        assertThat(messageCaptor.getValue(), 
instanceOf(SuccessResponseMessage.class));
+    }
+
+    @Test
+    void respondsWhenGetsMessageFromOtherNode() {
+        ArgumentCaptor<NetworkMessage> messageCaptor = 
ArgumentCaptor.forClass(NetworkMessage.class);
+
+        NetworkMessageHandler handler = extractMessageHandler();
+        ClusterNode conductor = node2;
+
+        handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()), 
conductor, 123L);
+
+        InOrder inOrder = inOrder(messagingService, vaultManager, restarter);
+
+        inOrder.verify(vaultManager).put(eq(new 
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)), any());
+        inOrder.verify(messagingService).respond(eq(conductor), 
messageCaptor.capture(), eq(123L));
+        inOrder.verify(restarter).initiateRestart();
+
+        assertThat(messageCaptor.getValue(), 
instanceOf(SuccessResponseMessage.class));
+    }
+
+    @Test
+    void initiatesRestartWhenGetsMessageFromOtherNode() {
+        NetworkMessageHandler handler = extractMessageHandler();
+
+        handler.onReceived(resetClusterMessageOn2Nodes(node2.name()), node2, 
0L);
+
+        verify(restarter, timeout(SECONDS.toMillis(10))).initiateRestart();
+    }
+
+    @Test
+    void doesNotInitiateRestartWhenGetsMessageFromSelf() {
+        NetworkMessageHandler handler = extractMessageHandler();
+
+        handler.onReceived(resetClusterMessageOn2Nodes(thisNodeName), 
thisNode, 0L);
+
+        verify(restarter, never()).initiateRestart();
+    }
+}
diff --git a/settings.gradle b/settings.gradle
index c6e4ac1b10..b6a2e84ec4 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -89,6 +89,7 @@ include(":ignite-eventlog")
 include(":ignite-low-watermark")
 include(":ignite-partition-replicator")
 include(':ignite-configuration-root')
+include(':ignite-system-disaster-recovery')
 
 project(":ignite-examples").projectDir = file('examples')
 project(":ignite-dev-utilities").projectDir = file('dev-utilities')
@@ -163,6 +164,7 @@ project(":ignite-eventlog").projectDir = 
file('modules/eventlog')
 project(":ignite-low-watermark").projectDir = file('modules/low-watermark')
 project(":ignite-partition-replicator").projectDir = 
file('modules/partition-replicator')
 project(":ignite-configuration-root").projectDir = 
file('modules/configuration-root')
+project(":ignite-system-disaster-recovery").projectDir = 
file('modules/system-disaster-recovery')
 
 ext.isCiServer = System.getenv().containsKey("IGNITE_CI")
 

Reply via email to