This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6c65c1fb57 IGNITE-22876 Implement cluster reset initiation (#4272)
6c65c1fb57 is described below
commit 6c65c1fb573e09ee25f16e467e976342ce1cec6a
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Aug 28 19:09:16 2024 +0400
IGNITE-22876 Implement cluster reset initiation (#4272)
---
.../internal/cluster/management/ClusterState.java | 8 +
.../raft/ClusterStateStorageManager.java | 2 +-
.../ignite/internal/util/CompletableFutures.java | 2 +-
modules/low-watermark/build.gradle | 1 -
.../metastorage/impl/MetaStorageManagerImpl.java | 2 +-
modules/runner/build.gradle | 1 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 18 +-
.../ignite/internal/app/IgniteServerImpl.java | 2 +-
modules/system-disaster-recovery/README.md | 3 +
.../build.gradle | 18 +-
.../disaster/system/ClusterResetException.java | 30 ++
.../internal/disaster/system/ServerRestarter.java | 29 ++
.../system/SystemDisasterRecoveryManager.java | 40 ++
.../system/SystemDisasterRecoveryManagerImpl.java | 288 +++++++++++++
.../system/message/ResetClusterMessage.java | 61 +++
.../SystemDisasterRecoveryMessageGroup.java | 31 ++
.../SystemDisasterRecoveryManagerImplTest.java | 456 +++++++++++++++++++++
settings.gradle | 2 +
18 files changed, 974 insertions(+), 20 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
index fba541c0c5..66277cfbd1 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.cluster.management;
import java.io.Serializable;
+import java.util.List;
import java.util.Set;
+import java.util.UUID;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
@@ -68,4 +70,10 @@ public interface ClusterState extends NetworkMessage,
Serializable {
*/
@Nullable
String initialClusterConfiguration();
+
+ /**
+ * Returns IDs this cluster had before ({@code null} if this is the first
incarnation).
+ */
+ @Nullable
+ List<UUID> formerClusterIds();
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
index bbe8697912..a483d48d3c 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
@@ -62,7 +62,7 @@ public class ClusterStateStorageManager {
*
* @param state Cluster state.
*/
- void putClusterState(ClusterState state) {
+ public void putClusterState(ClusterState state) {
storage.put(CMG_STATE_KEY, toBytes(state));
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
index 8d2fca1ff0..5839e3dd30 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
@@ -116,7 +116,7 @@ public class CompletableFutures {
*
* @param futures List of futures.
*/
- public static CompletableFuture<Void>
allOf(Collection<CompletableFuture<?>> futures) {
+ public static CompletableFuture<Void> allOf(Collection<? extends
CompletableFuture<?>> futures) {
return futures.isEmpty() ? nullCompletedFuture() :
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
}
diff --git a/modules/low-watermark/build.gradle
b/modules/low-watermark/build.gradle
index 99dd52976b..4612e604ee 100644
--- a/modules/low-watermark/build.gradle
+++ b/modules/low-watermark/build.gradle
@@ -28,7 +28,6 @@ dependencies {
implementation project(':ignite-core')
implementation project(':ignite-failure-handler')
implementation project(':ignite-network-api')
- implementation project(':ignite-network-api')
implementation project(':ignite-vault')
implementation project(':ignite-configuration-api')
implementation project(':ignite-replicator')
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 93c99c0aba..f1db841f2d 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -211,7 +211,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
electionListeners.add(listener);
}
- private CompletableFuture<Long> recover(MetaStorageServiceImpl service) {
+ private CompletableFuture<Long> recover(MetaStorageService service) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 8b8a018c84..e95aa67397 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -89,6 +89,7 @@ dependencies {
implementation project(':ignite-low-watermark')
implementation project(':ignite-partition-replicator')
implementation project(':ignite-catalog-compaction')
+ implementation project(':ignite-system-disaster-recovery')
implementation libs.jetbrains.annotations
implementation libs.micronaut.inject
implementation libs.micronaut.validation
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 44c8c98fd7..2ec53343ea 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -120,6 +120,8 @@ import
org.apache.ignite.internal.deployunit.DeploymentManagerImpl;
import org.apache.ignite.internal.deployunit.IgniteDeployment;
import
org.apache.ignite.internal.deployunit.configuration.DeploymentExtensionConfiguration;
import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl;
+import org.apache.ignite.internal.disaster.system.ServerRestarter;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryManagerImpl;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
import
org.apache.ignite.internal.eventlog.config.schema.EventLogExtensionConfiguration;
@@ -417,6 +419,8 @@ public class IgniteImpl implements Ignite {
/** Remote triggered resources registry. */
private final RemotelyTriggeredResourceRegistry resourcesRegistry;
+ private final SystemDisasterRecoveryManagerImpl
systemDisasterRecoveryManager;
+
private final IgniteTables publicTables;
private final IgniteTransactions publicTransactions;
private final IgniteSql publicSql;
@@ -446,6 +450,7 @@ public class IgniteImpl implements Ignite {
*/
IgniteImpl(
IgniteServer node,
+ ServerRestarter restarter,
Path configPath,
Path workDir,
@Nullable ClassLoader serviceProviderClassLoader,
@@ -543,6 +548,15 @@ public class IgniteImpl implements Ignite {
failureProcessor
);
+ systemDisasterRecoveryManager = new SystemDisasterRecoveryManagerImpl(
+ name,
+ clusterSvc.topologyService(),
+ clusterSvc.messagingService(),
+ vaultMgr,
+ restarter,
+ clusterStateStorage
+ );
+
clock = new HybridClockImpl();
clockWaiter = new ClockWaiter(name, clock);
@@ -1192,6 +1206,7 @@ public class IgniteImpl implements Ignite {
nettyBootstrapFactory,
nettyWorkersRegistrar,
clusterSvc,
+ systemDisasterRecoveryManager,
restComponent,
partitionsLogStorageFactory,
msLogStorageFactory,
@@ -1601,7 +1616,8 @@ public class IgniteImpl implements Ignite {
clusterCfgMgr.configurationRegistry().initializeConfigurationWith(hoconSource);
}, startupExecutor);
}
- }, startupExecutor);
+ }, startupExecutor)
+
.thenRunAsync(systemDisasterRecoveryManager::markNodeInitialized,
startupExecutor);
}
/**
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
index 005ec204bd..c9a0a2982a 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
@@ -329,7 +329,7 @@ public class IgniteServerImpl implements IgniteServer {
}
private CompletableFuture<Void> doStartAsync() {
- IgniteImpl instance = new IgniteImpl(this, configPath, workDir,
classLoader, asyncContinuationExecutor);
+ IgniteImpl instance = new IgniteImpl(this, this::restartAsync,
configPath, workDir, classLoader, asyncContinuationExecutor);
ackBanner();
diff --git a/modules/system-disaster-recovery/README.md
b/modules/system-disaster-recovery/README.md
new file mode 100644
index 0000000000..f62882b30e
--- /dev/null
+++ b/modules/system-disaster-recovery/README.md
@@ -0,0 +1,3 @@
+# System disaster recovery module
+
+This module provides capability to recover from disasters related to the
system Raft groups (CMG and Metastorage group).
diff --git a/modules/low-watermark/build.gradle
b/modules/system-disaster-recovery/build.gradle
similarity index 66%
copy from modules/low-watermark/build.gradle
copy to modules/system-disaster-recovery/build.gradle
index 99dd52976b..333b8e7938 100644
--- a/modules/low-watermark/build.gradle
+++ b/modules/system-disaster-recovery/build.gradle
@@ -21,29 +21,19 @@ apply from: "$rootDir/buildscripts/java-junit5.gradle"
apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
dependencies {
- annotationProcessor project(':ignite-configuration-annotation-processor')
- annotationProcessor project(':ignite-network-annotation-processor')
- annotationProcessor libs.auto.service
+ annotationProcessor project(":ignite-network-annotation-processor")
implementation project(':ignite-core')
- implementation project(':ignite-failure-handler')
- implementation project(':ignite-network-api')
implementation project(':ignite-network-api')
implementation project(':ignite-vault')
- implementation project(':ignite-configuration-api')
- implementation project(':ignite-replicator')
- implementation project(':ignite-schema')
+ implementation project(':ignite-cluster-management')
implementation libs.jetbrains.annotations
testImplementation libs.hamcrest.core
testImplementation libs.mockito.core
testImplementation libs.mockito.junit
- testImplementation project(":ignite-schema")
testImplementation testFixtures(project(":ignite-core"))
- testImplementation testFixtures(project(":ignite-configuration"))
-
- testFixturesImplementation project(':ignite-core')
- testFixturesImplementation libs.jetbrains.annotations
+ testImplementation testFixtures(project(":ignite-cluster-management"))
}
-description = 'ignite-low-watermark'
+description = 'ignite-system-disaster-recovery'
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterResetException.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterResetException.java
new file mode 100644
index 0000000000..3f6009e203
--- /dev/null
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterResetException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.lang.ErrorGroups.Common;
+
+/**
+ * Thrown when a cluster reset cannot be initiated.
+ */
+public class ClusterResetException extends IgniteInternalException {
+ public ClusterResetException(String msg) {
+ super(Common.INTERNAL_ERR, msg);
+ }
+}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ServerRestarter.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ServerRestarter.java
new file mode 100644
index 0000000000..644f9006a7
--- /dev/null
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ServerRestarter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+/**
+ * Knows how to initiate an Ignite node restart.
+ */
+@FunctionalInterface
+public interface ServerRestarter {
+ /**
+ * Initiates server restart.
+ */
+ void initiateRestart();
+}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
new file mode 100644
index 0000000000..0cb06071fb
--- /dev/null
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+
+/**
+ * Manages disaster recovery of system groups, namely the Cluster Management
Group (CMG) and the Metastorage group (MG).
+ */
+public interface SystemDisasterRecoveryManager {
+ /**
+ * Marks this node as initialized.
+ */
+ void markNodeInitialized();
+
+ /**
+ * Initiates cluster reset.
+ *
+ * @param proposedCmgConsistentIds Names of the nodes that will be the new
CMG nodes.
+ * @return Future completing with the result of the operation ({@link
ResetClusterMessage} in case of error related to reset logic).
+ */
+ CompletableFuture<Void> resetCluster(List<String>
proposedCmgConsistentIds);
+}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
new file mode 100644
index 0000000000..8db5db4a08
--- /dev/null
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.Objects.requireNonNullElse;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
+import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implementation of {@link SystemDisasterRecoveryManager}.
+ */
+public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecoveryManager, IgniteComponent {
+ private static final String NODE_INITIALIZED_VAULT_KEY =
"systemRecovery.nodeInitialized";
+ private static final String RESET_CLUSTER_MESSAGE_VAULT_KEY =
"systemRecovery.resetClusterMessage";
+
+ private final String thisNodeName;
+ private final TopologyService topologyService;
+ private final MessagingService messagingService;
+ private final VaultManager vaultManager;
+ private final ServerRestarter restarter;
+
+ private final ClusterStateStorageManager clusterStateStorageManager;
+
+ private final SystemDisasterRecoveryMessagesFactory messagesFactory = new
SystemDisasterRecoveryMessagesFactory();
+ private static final CmgMessagesFactory cmgMessagesFactory = new
CmgMessagesFactory();
+
+ /** This executor spawns a thread per task and should only be used for
very rare tasks. */
+ private final Executor restartExecutor;
+
+ /** Constructor. */
+ public SystemDisasterRecoveryManagerImpl(
+ String thisNodeName,
+ TopologyService topologyService,
+ MessagingService messagingService,
+ VaultManager vaultManager,
+ ServerRestarter restarter,
+ ClusterStateStorage clusterStateStorage
+ ) {
+ this.thisNodeName = thisNodeName;
+ this.topologyService = topologyService;
+ this.messagingService = messagingService;
+ this.vaultManager = vaultManager;
+ this.restarter = restarter;
+
+ clusterStateStorageManager = new
ClusterStateStorageManager(clusterStateStorage);
+
+ restartExecutor = new ThreadPerTaskExecutor(thisNodeName +
"-restart-");
+ }
+
+ @Override
+ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
+
messagingService.addMessageHandler(SystemDisasterRecoveryMessageGroup.class,
(message, sender, correlationId) -> {
+ if (message instanceof ResetClusterMessage) {
+ assert correlationId != null;
+ handleResetClusterMessage((ResetClusterMessage) message,
sender, correlationId);
+ }
+ });
+
+ return nullCompletedFuture();
+ }
+
+ private void handleResetClusterMessage(ResetClusterMessage message,
ClusterNode sender, long correlationId) {
+ restartExecutor.execute(() -> {
+ vaultManager.put(new ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY),
toBytes(message));
+
+ messagingService.respond(sender, successResponseMessage(),
correlationId)
+ .thenRunAsync(() -> {
+ if (!thisNodeName.equals(message.conductor())) {
+ restarter.initiateRestart();
+ }
+ }, restartExecutor);
+ });
+ }
+
+ private SuccessResponseMessage successResponseMessage() {
+ return cmgMessagesFactory.successResponseMessage().build();
+ }
+
+ @Override
+ public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public void markNodeInitialized() {
+ vaultManager.put(new ByteArray(NODE_INITIALIZED_VAULT_KEY), new
byte[]{1});
+ }
+
+ @Override
+ public CompletableFuture<Void> resetCluster(List<String>
proposedCmgConsistentIds) {
+ try {
+ return doResetCluster(proposedCmgConsistentIds);
+ } catch (ClusterResetException e) {
+ return failedFuture(e);
+ }
+ }
+
+ private CompletableFuture<Void> doResetCluster(List<String>
proposedCmgConsistentIds) {
+ ensureNoRepetitions(proposedCmgConsistentIds);
+ ensureContainsThisNodeName(proposedCmgConsistentIds);
+
+ Collection<ClusterNode> nodesInTopology = topologyService.allMembers();
+ ensureAllProposedCmgNodesAreInTopology(proposedCmgConsistentIds,
nodesInTopology);
+
+ ensureNodeIsInitialized();
+ ClusterState clusterState = ensureClusterStateIsPresent();
+
+ ResetClusterMessage message = buildResetClusterMessage(
+ proposedCmgConsistentIds,
+ clusterState
+ );
+
+ Map<String, CompletableFuture<NetworkMessage>> responseFutures =
sendResetClusterMessageTo(
+ nodesInTopology,
+ message
+ );
+
+ return allOf(responseFutures.values())
+ .handleAsync((res, ex) -> {
+ // We ignore upstream exceptions on purpose.
+
+ if (isMajorityOfCmgAreSuccesses(proposedCmgConsistentIds,
responseFutures)) {
+ restarter.initiateRestart();
+
+ return null;
+ } else {
+ throw new ClusterResetException("Did not get
successful responses from new CMG majority, failing cluster reset.");
+ }
+ }, restartExecutor);
+ }
+
+ private Map<String, CompletableFuture<NetworkMessage>>
sendResetClusterMessageTo(Collection<ClusterNode> nodesInTopology,
+ ResetClusterMessage message) {
+ Map<String, CompletableFuture<NetworkMessage>> responseFutures = new
HashMap<>();
+ for (ClusterNode node : nodesInTopology) {
+ responseFutures.put(node.name(), messagingService.invoke(node,
message, 10_000));
+ }
+ return responseFutures;
+ }
+
+ private void ensureNodeIsInitialized() {
+ VaultEntry initializedEntry = vaultManager.get(new
ByteArray(NODE_INITIALIZED_VAULT_KEY));
+ if (initializedEntry == null) {
+ throw new ClusterResetException("Node is not initialized and
cannot serve as a cluster reset conductor.");
+ }
+ }
+
+ private static void ensureNoRepetitions(List<String>
proposedCmgConsistentIds) {
+ if (new HashSet<>(proposedCmgConsistentIds).size() !=
proposedCmgConsistentIds.size()) {
+ throw new ClusterResetException("New CMG node consistentIds have
repetitions: " + proposedCmgConsistentIds + ".");
+ }
+ }
+
+ private void ensureContainsThisNodeName(List<String>
proposedCmgConsistentIds) {
+ if (!proposedCmgConsistentIds.contains(thisNodeName)) {
+ throw new ClusterResetException("Current node is not contained in
the new CMG, so it cannot conduct a cluster reset.");
+ }
+ }
+
+ private static void ensureAllProposedCmgNodesAreInTopology(
+ List<String> proposedCmgConsistentIds,
+ Collection<ClusterNode> nodesInTopology
+ ) {
+ Set<String> consistentIdsOfNodesInTopology =
nodesInTopology.stream().map(ClusterNode::name).collect(toSet());
+
+ Set<String> notInTopology = new HashSet<>(proposedCmgConsistentIds);
+ notInTopology.removeAll(consistentIdsOfNodesInTopology);
+
+ if (!notInTopology.isEmpty()) {
+ throw new ClusterResetException("Some of proposed CMG nodes are
not online: " + notInTopology + ".");
+ }
+ }
+
+ private ClusterState ensureClusterStateIsPresent() {
+ ClusterState clusterState =
clusterStateStorageManager.getClusterState();
+ if (clusterState == null) {
+ throw new ClusterResetException("Node does not have cluster
state.");
+ }
+ return clusterState;
+ }
+
+ private ResetClusterMessage buildResetClusterMessage(List<String>
proposedCmgConsistentIds, ClusterState clusterState) {
+ List<UUID> formerClusterIds = new
ArrayList<>(requireNonNullElse(clusterState.formerClusterIds(), new
ArrayList<>()));
+ formerClusterIds.add(clusterState.clusterTag().clusterId());
+
+ return messagesFactory.resetClusterMessage()
+ .conductor(thisNodeName)
+ .cmgNodes(new HashSet<>(proposedCmgConsistentIds))
+ .metaStorageNodes(clusterState.metaStorageNodes())
+ .clusterName(clusterState.clusterTag().clusterName())
+ .clusterId(randomUUID())
+ .formerClusterIds(formerClusterIds)
+ .build();
+ }
+
+ private static boolean isMajorityOfCmgAreSuccesses(
+ List<String> proposedCmgConsistentIds,
+ Map<String, CompletableFuture<NetworkMessage>> responseFutures
+ ) {
+ Set<String> newCmgNodesSet = new HashSet<>(proposedCmgConsistentIds);
+ List<CompletableFuture<NetworkMessage>> futuresFromNewCmg =
responseFutures.entrySet().stream()
+ .filter(entry -> newCmgNodesSet.contains(entry.getKey()))
+ .map(Entry::getValue)
+ .collect(toList());
+
+ assert futuresFromNewCmg.size() == proposedCmgConsistentIds.size()
+ : futuresFromNewCmg.size() + " futures, but " +
proposedCmgConsistentIds.size() + " nodes";
+
+ long successes = futuresFromNewCmg.stream()
+ .filter(CompletableFutures::isCompletedSuccessfully)
+ .count();
+
+ return successes >= (futuresFromNewCmg.size() + 1) / 2;
+ }
+
+ /**
+ * An executor that spawns a thread per task. This is very inefficient for
scenarios with many tasks,
+ * but we use it here for very rare tasks; it has an advantage that it
doesn't need to be shut down
+ * (node restart is performed in it, so it might cause troubles to use
normal pools created in the Ignite node).
+ */
+ private static class ThreadPerTaskExecutor implements Executor {
+ private final String threadNamePrefix;
+
+ private ThreadPerTaskExecutor(String threadNamePrefix) {
+ this.threadNamePrefix = threadNamePrefix;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ Thread thread = new Thread(command);
+
+ thread.setName(threadNamePrefix + thread.getId());
+ thread.setDaemon(true);
+
+ thread.start();
+ }
+ }
+}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
new file mode 100644
index 0000000000..7acc6bff0f
--- /dev/null
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system.message;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+
+/**
+ * Message for resetting the cluster.
+ */
+@Transferable(SystemDisasterRecoveryMessageGroup.RESET_CLUSTER)
+public interface ResetClusterMessage extends NetworkMessage, Serializable {
+ /**
+ * Consistent IDs of nodes that will host the new CMG.
+ */
+ Set<String> cmgNodes();
+
+ /**
+ * Consistent IDs of nodes that host the Meta Storage.
+ */
+ Set<String> metaStorageNodes();
+
+ /**
+ * Name of the cluster that will be a part of the generated cluster tag.
+ */
+ String clusterName();
+
+ /**
+ * New ID of the cluster.
+ */
+ UUID clusterId();
+
+ /**
+ * IDs that the cluster had before (including the current incarnation by
which this message is sent).
+ */
+ List<UUID> formerClusterIds();
+
+ /**
+ * Consistent ID of the node that conducts the cluster reset.
+ */
+ String conductor();
+}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
new file mode 100644
index 0000000000..c6f0b0fa07
--- /dev/null
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system.message;
+
+import org.apache.ignite.internal.network.annotations.MessageGroup;
+
+/**
+ * Message Group for disaster recovery of system groups.
+ */
+@MessageGroup(groupType = 15, groupName = "SystemDisasterRecoveryMessages")
+public class SystemDisasterRecoveryMessageGroup {
+ /**
+ * Message type for {@link ResetClusterMessage}.
+ */
+ public static final short RESET_CLUSTER = 1;
+}
diff --git
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
new file mode 100644
index 0000000000..27c5813fc8
--- /dev/null
+++
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.cluster.management.ClusterTag.randomClusterTag;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
+import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
+import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.NetworkMessageHandler;
+import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.properties.IgniteProductVersion;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+class SystemDisasterRecoveryManagerImplTest extends BaseIgniteAbstractTest {
+ private static final String CLUSTER_NAME = "cluster";
+
+ private static final String NODE_INITIALIZED_VAULT_KEY =
"systemRecovery.nodeInitialized";
+ private static final String RESET_CLUSTER_MESSAGE_VAULT_KEY =
"systemRecovery.resetClusterMessage";
+
+ @WorkDirectory
+ private Path workDir;
+
+ private static final String thisNodeName = "node1";
+
+ @Mock
+ private TopologyService topologyService;
+
+ @Mock
+ private MessagingService messagingService;
+
+ private VaultManager vaultManager;
+
+ @Mock
+ private ServerRestarter restarter;
+
+ private final ClusterStateStorage clusterStateStorage = new
TestClusterStateStorage();
+ private final ClusterStateStorageManager clusterStateStorageManager = new
ClusterStateStorageManager(clusterStateStorage);
+
+ private SystemDisasterRecoveryManagerImpl manager;
+
+ private final ComponentContext componentContext = new ComponentContext();
+
+ private final ClusterNode thisNode = new
ClusterNodeImpl(randomUUID().toString(), thisNodeName, new
NetworkAddress("host", 1001));
+
+ private final ClusterNode node2 = new
ClusterNodeImpl(randomUUID().toString(), "node2", new NetworkAddress("host",
1002));
+ private final ClusterNode node3 = new
ClusterNodeImpl(randomUUID().toString(), "node3", new NetworkAddress("host",
1003));
+ private final ClusterNode node4 = new
ClusterNodeImpl(randomUUID().toString(), "node4", new NetworkAddress("host",
1004));
+ private final ClusterNode node5 = new
ClusterNodeImpl(randomUUID().toString(), "node5", new NetworkAddress("host",
1005));
+
+ private final CmgMessagesFactory cmgMessagesFactory = new
CmgMessagesFactory();
+ private final SystemDisasterRecoveryMessagesFactory messagesFactory = new
SystemDisasterRecoveryMessagesFactory();
+
+ private final ClusterState usualClusterState =
cmgMessagesFactory.clusterState()
+ .cmgNodes(Set.of(thisNodeName))
+ .metaStorageNodes(Set.of(thisNodeName))
+ .version(IgniteProductVersion.CURRENT_VERSION.toString())
+ .clusterTag(randomClusterTag(cmgMessagesFactory, CLUSTER_NAME))
+ .build();
+
+ private final SuccessResponseMessage successResponseMessage =
cmgMessagesFactory.successResponseMessage().build();
+
+ @BeforeEach
+ void init() {
+ vaultManager = spy(new VaultManager(new
PersistentVaultService(workDir.resolve("vault"))));
+ assertThat(vaultManager.startAsync(componentContext),
willCompleteSuccessfully());
+
+ lenient().when(messagingService.respond(any(ClusterNode.class),
any(NetworkMessage.class), anyLong()))
+ .thenReturn(nullCompletedFuture());
+
+ manager = new SystemDisasterRecoveryManagerImpl(
+ thisNodeName,
+ topologyService,
+ messagingService,
+ vaultManager,
+ restarter,
+ clusterStateStorage
+ );
+ assertThat(manager.startAsync(componentContext),
willCompleteSuccessfully());
+ }
+
+ @AfterEach
+ void cleanup() {
+ assertThat(vaultManager.stopAsync(), willCompleteSuccessfully());
+ assertThat(manager.stopAsync(), willCompleteSuccessfully());
+ }
+
+ @Test
+ void marksNodeInitialized() {
+ manager.markNodeInitialized();
+
+ VaultEntry entry = vaultManager.get(new
ByteArray(NODE_INITIALIZED_VAULT_KEY));
+ assertThat(entry, is(notNullValue()));
+ assertThat(entry.value(), is(new byte[]{1}));
+ }
+
+ @Test
+ void resetClusterRejectsDuplicateNodeNames() {
+ ClusterResetException ex = assertWillThrow(
+ manager.resetCluster(List.of(thisNodeName, thisNodeName)),
+ ClusterResetException.class,
+ 10, SECONDS
+ );
+ assertThat(ex.getMessage(), is("New CMG node consistentIds have
repetitions: [node1, node1]."));
+ }
+
+ @Test
+ void resetClusterRequiresThisNodeToBeNewCmg() {
+ ClusterResetException ex = assertWillThrow(
+ manager.resetCluster(List.of("abc")),
+ ClusterResetException.class,
+ 10, SECONDS
+ );
+ assertThat(ex.getMessage(), is("Current node is not contained in the
new CMG, so it cannot conduct a cluster reset."));
+ }
+
+ @Test
+ void resetClusterRequiresNewCmgNodesToBeOnline() {
+ when(topologyService.allMembers()).thenReturn(List.of(thisNode));
+
+ ClusterResetException ex = assertWillThrow(
+ manager.resetCluster(List.of(thisNodeName, "abc")),
+ ClusterResetException.class,
+ 10, SECONDS
+ );
+ assertThat(ex.getMessage(), is("Some of proposed CMG nodes are not
online: [abc]."));
+ }
+
+ @Test
+ void resetClusterRequiresClusterState() {
+ when(topologyService.allMembers()).thenReturn(List.of(thisNode));
+ marksNodeInitialized();
+
+ ClusterResetException ex = assertWillThrow(
+ manager.resetCluster(List.of(thisNodeName)),
+ ClusterResetException.class,
+ 10, SECONDS
+ );
+ assertThat(ex.getMessage(), is("Node does not have cluster state."));
+ }
+
+ @Test
+ void resetClusterRequiresNodeToBeInitialized() {
+ when(topologyService.allMembers()).thenReturn(List.of(thisNode));
+ putClusterState();
+
+ ClusterResetException ex = assertWillThrow(
+ manager.resetCluster(List.of(thisNodeName)),
+ ClusterResetException.class,
+ 10, SECONDS
+ );
+ assertThat(ex.getMessage(), is("Node is not initialized and cannot
serve as a cluster reset conductor."));
+ }
+
+ private void putClusterState() {
+ clusterStateStorageManager.putClusterState(usualClusterState);
+ }
+
+ private void makeNodeInitialized() {
+ vaultManager.put(new ByteArray(NODE_INITIALIZED_VAULT_KEY), new
byte[]{1});
+ }
+
+ @Test
+ void resetClusterSendsMessages() {
+ ArgumentCaptor<ResetClusterMessage> messageCaptor =
ArgumentCaptor.forClass(ResetClusterMessage.class);
+
+ when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2,
node3));
+ prepareNodeStateForClusterReset();
+
+ when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+ .thenReturn(completedFuture(successResponseMessage));
+
+ CompletableFuture<Void> future =
manager.resetCluster(List.of(thisNodeName, node2.name()));
+ assertThat(future, willCompleteSuccessfully());
+
+ verify(messagingService).invoke(eq(thisNode), messageCaptor.capture(),
anyLong());
+ ResetClusterMessage messageToSelf = messageCaptor.getValue();
+ assertThatResetClusterMessageIsAsExpected(messageToSelf);
+
+ verify(messagingService).invoke(eq(node2), messageCaptor.capture(),
anyLong());
+ ResetClusterMessage messageToOtherNewCmgNode =
messageCaptor.getValue();
+ assertThatResetClusterMessageIsAsExpected(messageToOtherNewCmgNode);
+
+ verify(messagingService).invoke(eq(node3), messageCaptor.capture(),
anyLong());
+ ResetClusterMessage messageToOtherNonCmgNode =
messageCaptor.getValue();
+ assertThatResetClusterMessageIsAsExpected(messageToOtherNonCmgNode);
+
+ assertThat(messageToSelf.clusterId(),
is(messageToOtherNewCmgNode.clusterId()));
+ assertThat(messageToSelf.clusterId(),
is(messageToOtherNonCmgNode.clusterId()));
+ }
+
+ private void prepareNodeStateForClusterReset() {
+ makeNodeInitialized();
+ putClusterState();
+ }
+
+ private void assertThatResetClusterMessageIsAsExpected(ResetClusterMessage
message) {
+ assertThatResetClusterMessageContentIsAsExpected(message,
thisNodeName);
+ }
+
+ private void assertThatResetClusterMessageContentIsAsExpected(@Nullable
ResetClusterMessage message, String expectedConductor) {
+ assertThat(message, is(notNullValue()));
+ assertThat(message.cmgNodes(), containsInAnyOrder(thisNodeName,
node2.name()));
+ assertThat(message.metaStorageNodes(),
is(usualClusterState.metaStorageNodes()));
+ assertThat(message.clusterName(), is(CLUSTER_NAME));
+ assertThat(message.clusterId(),
is(not(usualClusterState.clusterTag().clusterId())));
+ assertThat(message.formerClusterIds(),
contains(usualClusterState.clusterTag().clusterId()));
+ assertThat(message.conductor(), is(expectedConductor));
+ }
+
+ @Test
+ void resetClusterInitiatesRestartOnSuccess() {
+ when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2,
node3));
+ prepareNodeStateForClusterReset();
+
+ when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+ .thenReturn(completedFuture(successResponseMessage));
+
+ CompletableFuture<Void> future =
manager.resetCluster(List.of(thisNodeName, node2.name(), node3.name()));
+ assertThat(future, willCompleteSuccessfully());
+
+ verify(restarter).initiateRestart();
+ }
+
+ @Test
+ void resetClusterInitiatesRestartWhenMajorityOfCmgNodesRespondsWithOk() {
+ when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2,
node3, node4, node5));
+ prepareNodeStateForClusterReset();
+
+ respondSuccessfullyFrom(thisNode, node2);
+ respondWithExceptionFrom(node3, node4, node5);
+
+ CompletableFuture<Void> future =
manager.resetCluster(List.of(thisNodeName, node2.name(), node3.name()));
+ assertThat(future, willCompleteSuccessfully());
+
+ verify(restarter).initiateRestart();
+ }
+
+ private void respondSuccessfullyFrom(ClusterNode... nodes) {
+ for (ClusterNode node : nodes) {
+ respondSuccessfullyFrom(node);
+ }
+ }
+
+ private void respondSuccessfullyFrom(ClusterNode node) {
+ when(messagingService.invoke(eq(node), any(), anyLong()))
+ .thenReturn(completedFuture(successResponseMessage));
+ }
+
+ private void respondWithExceptionFrom(ClusterNode... nodes) {
+ for (ClusterNode node : nodes) {
+ respondWithExceptionFrom(node);
+ }
+ }
+
+ private void respondWithExceptionFrom(ClusterNode node) {
+ when(messagingService.invoke(eq(node), any(), anyLong()))
+ .thenReturn(failedFuture(new TimeoutException()));
+ }
+
+ @Test
+ @DisplayName("resetCluster() fails and does not restart when majority of
new CMG nodes do not respond")
+ void resetClusterFailsWhenNewCmgMajorityDoesNotRespond() {
+ when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2,
node3, node4, node5));
+ prepareNodeStateForClusterReset();
+
+ respondSuccessfullyFrom(thisNode, node4, node5);
+ respondWithExceptionFrom(node2, node3);
+
+ CompletableFuture<Void> future =
manager.resetCluster(List.of(thisNodeName, node2.name(), node3.name()));
+ ClusterResetException ex = assertWillThrow(future,
ClusterResetException.class, 10, SECONDS);
+ assertThat(ex.getMessage(), is("Did not get successful responses from
new CMG majority, failing cluster reset."));
+
+ verify(restarter, never()).initiateRestart();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void savesToVaultWhenGetsMessage(boolean fromSelf) throws Exception {
+ NetworkMessageHandler handler = extractMessageHandler();
+
+ ClusterNode conductor = fromSelf ? thisNode : node2;
+ handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()),
conductor, 0L);
+
+ assertTrue(waitForCondition(() -> vaultManager.get(new
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)) != null, 10_000));
+ VaultEntry entry = vaultManager.get(new
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY));
+ assertThat(entry, is(notNullValue()));
+
+ ResetClusterMessage savedMessage = fromBytes(entry.value());
+
+ assertThatResetClusterMessageContentIsAsExpected(savedMessage,
conductor.name());
+ }
+
+ private NetworkMessageHandler extractMessageHandler() {
+ assertThat(manager.stopAsync(), willCompleteSuccessfully());
+
+ var handlerRef = new AtomicReference<NetworkMessageHandler>();
+
+ doAnswer(invocation -> {
+ handlerRef.set(invocation.getArgument(1));
+ return null;
+
}).when(messagingService).addMessageHandler(eq(SystemDisasterRecoveryMessageGroup.class),
any());
+
+ assertThat(manager.startAsync(componentContext),
willCompleteSuccessfully());
+
+ NetworkMessageHandler handler = handlerRef.get();
+ assertThat("Handler was not installed", handler, is(notNullValue()));
+
+ return handler;
+ }
+
+ private ResetClusterMessage resetClusterMessageOn2Nodes(String
conductorName) {
+ return messagesFactory.resetClusterMessage()
+ .cmgNodes(Set.of(thisNodeName, node2.name()))
+ .metaStorageNodes(usualClusterState.metaStorageNodes())
+ .clusterName(CLUSTER_NAME)
+ .clusterId(randomUUID())
+
.formerClusterIds(List.of(usualClusterState.clusterTag().clusterId()))
+ .conductor(conductorName)
+ .build();
+ }
+
+ @Test
+ void respondsWhenGetsMessageFromSelf() {
+ ArgumentCaptor<NetworkMessage> messageCaptor =
ArgumentCaptor.forClass(NetworkMessage.class);
+
+ NetworkMessageHandler handler = extractMessageHandler();
+ ClusterNode conductor = thisNode;
+
+ handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()),
conductor, 123L);
+
+ InOrder inOrder = inOrder(messagingService, vaultManager);
+
+ inOrder.verify(vaultManager, timeout(SECONDS.toMillis(10))).put(eq(new
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)), any());
+ inOrder.verify(messagingService,
timeout(SECONDS.toMillis(10))).respond(eq(conductor), messageCaptor.capture(),
eq(123L));
+
+ assertThat(messageCaptor.getValue(),
instanceOf(SuccessResponseMessage.class));
+ }
+
+ @Test
+ void respondsWhenGetsMessageFromOtherNode() {
+ ArgumentCaptor<NetworkMessage> messageCaptor =
ArgumentCaptor.forClass(NetworkMessage.class);
+
+ NetworkMessageHandler handler = extractMessageHandler();
+ ClusterNode conductor = node2;
+
+ handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()),
conductor, 123L);
+
+ InOrder inOrder = inOrder(messagingService, vaultManager, restarter);
+
+ inOrder.verify(vaultManager).put(eq(new
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)), any());
+ inOrder.verify(messagingService).respond(eq(conductor),
messageCaptor.capture(), eq(123L));
+ inOrder.verify(restarter).initiateRestart();
+
+ assertThat(messageCaptor.getValue(),
instanceOf(SuccessResponseMessage.class));
+ }
+
+ @Test
+ void initiatesRestartWhenGetsMessageFromOtherNode() {
+ NetworkMessageHandler handler = extractMessageHandler();
+
+ handler.onReceived(resetClusterMessageOn2Nodes(node2.name()), node2,
0L);
+
+ verify(restarter, timeout(SECONDS.toMillis(10))).initiateRestart();
+ }
+
+ @Test
+ void doesNotInitiateRestartWhenGetsMessageFromSelf() {
+ NetworkMessageHandler handler = extractMessageHandler();
+
+ handler.onReceived(resetClusterMessageOn2Nodes(thisNodeName),
thisNode, 0L);
+
+ verify(restarter, never()).initiateRestart();
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index c6e4ac1b10..b6a2e84ec4 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -89,6 +89,7 @@ include(":ignite-eventlog")
include(":ignite-low-watermark")
include(":ignite-partition-replicator")
include(':ignite-configuration-root')
+include(':ignite-system-disaster-recovery')
project(":ignite-examples").projectDir = file('examples')
project(":ignite-dev-utilities").projectDir = file('dev-utilities')
@@ -163,6 +164,7 @@ project(":ignite-eventlog").projectDir =
file('modules/eventlog')
project(":ignite-low-watermark").projectDir = file('modules/low-watermark')
project(":ignite-partition-replicator").projectDir =
file('modules/partition-replicator')
project(":ignite-configuration-root").projectDir =
file('modules/configuration-root')
+project(":ignite-system-disaster-recovery").projectDir =
file('modules/system-disaster-recovery')
ext.isCiServer = System.getenv().containsKey("IGNITE_CI")