This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b9ebbae99c IGNITE-22807 Repair CMG on node start (#4304)
b9ebbae99c is described below
commit b9ebbae99ceb42be1f5b7c698f5ee2807300a446
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Sat Aug 31 09:20:26 2024 +0400
IGNITE-22807 Repair CMG on node start (#4304)
---
modules/cluster-management/build.gradle | 5 +
.../management/ClusterManagementGroupManager.java | 65 +++++-
.../network/messages/CmgInitMessage.java | 2 +
.../ClusterManagementGroupManagerTest.java | 2 +
.../cluster/management/ClusterIdHolder.java | 0
.../internal/cluster/management/MockNode.java | 2 +
modules/distribution-zones/build.gradle | 1 +
...niteDistributionZoneManagerNodeRestartTest.java | 4 +-
modules/metastorage/build.gradle | 1 +
.../ItMetaStorageMultipleNodesAbstractTest.java | 2 +
.../metastorage/impl/ItMetaStorageWatchTest.java | 2 +
modules/partition-replicator/build.gradle | 1 +
.../replicator/ItReplicaLifecycleTest.java | 2 +
modules/runner/build.gradle | 2 +
.../ItDistributedConfigurationPropertiesTest.java | 2 +
.../ItDistributedConfigurationStorageTest.java | 2 +
.../runner/app/ItIgniteNodeRestartTest.java | 6 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 29 ++-
.../ignite/internal/app/IgniteServerImpl.java | 48 ++++-
.../java/org/apache/ignite/internal/Cluster.java | 11 +-
modules/system-disaster-recovery-api/README.md | 3 +
.../build.gradle | 5 +-
.../system/message/ResetClusterMessage.java | 0
.../SystemDisasterRecoveryMessageGroup.java | 0
.../system/storage/ClusterResetStorage.java} | 19 +-
modules/system-disaster-recovery/build.gradle | 13 +-
.../disaster/system/ItCmgDisasterRecoveryTest.java | 226 +++++++++++++++++++++
.../disaster/system}/ClusterIdService.java | 46 ++++-
.../system/SystemDisasterRecoveryManager.java | 13 +-
.../system/SystemDisasterRecoveryManagerImpl.java | 44 ++--
.../system/SystemDisasterRecoveryStorage.java | 81 ++++++++
.../SystemDisasterRecoveryManagerImplTest.java | 62 +++---
modules/table/build.gradle | 1 +
.../rebalance/ItRebalanceDistributedTest.java | 2 +
settings.gradle | 2 +
35 files changed, 607 insertions(+), 99 deletions(-)
diff --git a/modules/cluster-management/build.gradle
b/modules/cluster-management/build.gradle
index 5e12611208..b702a46b15 100644
--- a/modules/cluster-management/build.gradle
+++ b/modules/cluster-management/build.gradle
@@ -41,6 +41,7 @@ dependencies {
implementation project(':ignite-storage-api')
implementation project(':ignite-security')
implementation project(':ignite-configuration-root')
+ implementation project(':ignite-system-disaster-recovery-api')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.auto.service.annotations
@@ -51,6 +52,7 @@ dependencies {
testImplementation testFixtures(project(':ignite-core'))
testImplementation testFixtures(project(':ignite-configuration'))
testImplementation testFixtures(project(':ignite-network'))
+ testImplementation project(':ignite-system-disaster-recovery')
testImplementation libs.hamcrest.core
testImplementation libs.mockito.junit
@@ -62,6 +64,8 @@ dependencies {
testFixturesImplementation project(':ignite-storage-api')
testFixturesImplementation project(':ignite-vault')
testFixturesImplementation project(':ignite-security')
+ testFixturesImplementation project(':ignite-system-disaster-recovery-api')
+ testFixturesImplementation project(':ignite-system-disaster-recovery')
testFixturesImplementation testFixtures(project(':ignite-core'))
testFixturesImplementation testFixtures(project(':ignite-configuration'))
testFixturesImplementation testFixtures(project(':ignite-network'))
@@ -77,6 +81,7 @@ dependencies {
integrationTestImplementation project(':ignite-raft-api')
integrationTestImplementation project(':ignite-catalog')
integrationTestImplementation project(':ignite-runner')
+ integrationTestImplementation project(':ignite-system-disaster-recovery')
integrationTestImplementation testFixtures(project)
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation
testFixtures(project(':ignite-configuration'))
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index cc355e6e1e..9d8affae0b 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -62,6 +62,8 @@ import
org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyComm
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import org.apache.ignite.internal.disaster.system.storage.ClusterResetStorage;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.event.EventParameters;
import org.apache.ignite.internal.failure.FailureContext;
@@ -152,6 +154,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
private final ClusterIdStore clusterIdStore;
+ private final ClusterResetStorage clusterResetStorage;
+
/** Future that resolves into the initial cluster configuration in HOCON
format. */
private final CompletableFuture<String> initialClusterConfigurationFuture
= new CompletableFuture<>();
@@ -162,6 +166,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
/** Constructor. */
public ClusterManagementGroupManager(
VaultManager vault,
+ ClusterResetStorage clusterResetStorage,
ClusterService clusterService,
ClusterInitializer clusterInitializer,
RaftManager raftManager,
@@ -170,9 +175,10 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
ValidationManager validationManager,
NodeAttributes nodeAttributes,
FailureProcessor failureProcessor,
- ClusterIdHolder clusterIdChanger,
+ ClusterIdStore clusterIdStore,
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer
) {
+ this.clusterResetStorage = clusterResetStorage;
this.clusterService = clusterService;
this.clusterInitializer = clusterInitializer;
this.raftManager = raftManager;
@@ -182,7 +188,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
this.localStateStorage = new LocalStateStorage(vault);
this.nodeAttributes = nodeAttributes;
this.failureProcessor = failureProcessor;
- this.clusterIdStore = clusterIdChanger;
+ this.clusterIdStore = clusterIdStore;
this.raftGroupOptionsConfigurer = raftGroupOptionsConfigurer;
scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
@@ -223,6 +229,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
@TestOnly
public ClusterManagementGroupManager(
VaultManager vault,
+ ClusterResetStorage clusterResetStorage,
ClusterService clusterService,
ClusterInitializer clusterInitializer,
RaftManager raftManager,
@@ -230,11 +237,12 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
LogicalTopology logicalTopology,
NodeAttributes nodeAttributes,
FailureProcessor failureProcessor,
- ClusterIdHolder clusterIdChanger,
+ ClusterIdStore clusterIdStore,
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer
) {
this(
vault,
+ clusterResetStorage,
clusterService,
clusterInitializer,
raftManager,
@@ -243,7 +251,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
new ValidationManager(new
ClusterStateStorageManager(clusterStateStorage), logicalTopology),
nodeAttributes,
failureProcessor,
- clusterIdChanger,
+ clusterIdStore,
raftGroupOptionsConfigurer
);
}
@@ -341,6 +349,11 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
+ ResetClusterMessage resetClusterMessage =
clusterResetStorage.readResetClusterMessage();
+ if (resetClusterMessage != null) {
+ return doClusterReset(resetClusterMessage);
+ }
+
synchronized (raftServiceLock) {
raftService = recoverLocalState();
}
@@ -350,6 +363,48 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
return nullCompletedFuture();
}
+ private CompletableFuture<Void> doClusterReset(ResetClusterMessage
resetClusterMessage) {
+ LOG.info("Found a ResetClusterMessage in storage, going to do cluster
reset [message={}]", resetClusterMessage);
+
+ return destroyCmgWithEvents()
+ .thenCompose(unused -> {
+ if
(resetClusterMessage.cmgNodes().contains(clusterService.nodeName())) {
+ return doReinit(resetClusterMessage);
+ } else {
+ // Let's just wait for new CMG nodes to establish a
majority and send us an invitation to join.
+ cmgMessageHandler.onRecoveryComplete();
+ return nullCompletedFuture();
+ }
+ })
+ .thenRun(clusterResetStorage::removeResetClusterMessage);
+ }
+
+ private CompletableFuture<Void> doReinit(ResetClusterMessage
resetClusterMessage) {
+ CompletableFuture<CmgRaftService> serviceFuture;
+
+ synchronized (raftServiceLock) {
+ // Disaster recovery means that the Repair Conductor has ensured
the cluster was initialized,
+ // so we can just pass null as initialClusterConfig.
+ serviceFuture =
startCmgRaftServiceWithEvents(resetClusterMessage.cmgNodes(), null);
+ raftService = serviceFuture;
+ }
+
+ cmgMessageHandler.onRecoveryComplete();
+
+ return serviceFuture
+ .thenCompose(service -> doInit(service,
cmgInitMessageFromResetClusterMessage(resetClusterMessage)))
+ .thenApply(unused -> null);
+ }
+
+ private CmgInitMessage
cmgInitMessageFromResetClusterMessage(ResetClusterMessage resetClusterMessage) {
+ return msgFactory.cmgInitMessage()
+ .cmgNodes(resetClusterMessage.cmgNodes())
+ .metaStorageNodes(resetClusterMessage.metaStorageNodes())
+ .clusterName(resetClusterMessage.clusterName())
+ .clusterId(resetClusterMessage.clusterId())
+ .build();
+ }
+
/**
* Returns the cluster state future or the future that will be resolved to
null if the cluster is not initialized yet.
*/
@@ -697,7 +752,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
* Starts the CMG Raft service using the provided node names as its peers.
*/
private CompletableFuture<CmgRaftService> startCmgRaftService(Set<String>
nodeNames) {
- String thisNodeConsistentId =
clusterService.topologyService().localMember().name();
+ String thisNodeConsistentId = clusterService.nodeName();
// If we are not in the CMG, we must be a learner. List of learners
will be updated by a leader accordingly,
// but just to start a RAFT service we must include ourselves in the
initial learners list, that's why we
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/network/messages/CmgInitMessage.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/network/messages/CmgInitMessage.java
index dcc31c5f2c..8cbbf0ec3a 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/network/messages/CmgInitMessage.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/network/messages/CmgInitMessage.java
@@ -21,6 +21,7 @@ import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
/**
* Message for initializing the Cluster Management Group.
@@ -50,5 +51,6 @@ public interface CmgInitMessage extends NetworkMessage {
/**
* Cluster configuration that should be applied after init.
*/
+ @Nullable
String initialClusterConfiguration();
}
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
index 3ccbd04152..8fa1a581ad 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
@@ -36,6 +36,7 @@ import
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
import
org.apache.ignite.internal.cluster.management.raft.commands.InitCmgStateCommand;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
@@ -103,6 +104,7 @@ class ClusterManagementGroupManagerTest extends
BaseIgniteAbstractTest {
cmgManager = new ClusterManagementGroupManager(
vaultManager,
+ new SystemDisasterRecoveryStorage(vaultManager),
clusterService,
clusterInitializer,
raftManager,
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdHolder.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/ClusterIdHolder.java
similarity index 100%
rename from
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdHolder.java
rename to
modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/ClusterIdHolder.java
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 07a25cb53a..3a9ffd847a 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -126,6 +127,7 @@ public class MockNode {
this.clusterManager = new ClusterManagementGroupManager(
vaultManager,
+ new SystemDisasterRecoveryStorage(vaultManager),
clusterService,
new ClusterInitializer(clusterService, hocon -> hocon, new
TestConfigurationValidator()),
raftManager,
diff --git a/modules/distribution-zones/build.gradle
b/modules/distribution-zones/build.gradle
index bc6d2c4cc9..7c12418d6e 100644
--- a/modules/distribution-zones/build.gradle
+++ b/modules/distribution-zones/build.gradle
@@ -94,6 +94,7 @@ dependencies {
integrationTestImplementation project(':ignite-security')
integrationTestImplementation project(':ignite-system-view-api')
integrationTestImplementation project(':ignite-failure-handler')
+ integrationTestImplementation project(':ignite-system-disaster-recovery')
integrationTestImplementation testFixtures(project(':ignite-catalog'))
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 2c169badfb..aa3d1f953c 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -84,7 +84,6 @@ import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
-import org.apache.ignite.internal.cluster.management.ClusterIdService;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
@@ -99,6 +98,7 @@ import
org.apache.ignite.internal.configuration.storage.DistributedConfiguration
import
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
+import org.apache.ignite.internal.disaster.system.ClusterIdService;
import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -187,7 +187,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
var clusterStateStorage = new TestClusterStateStorage();
- var clusterIdService = new ClusterIdService(clusterStateStorage);
+ var clusterIdService = new ClusterIdService(vault);
ConfigurationModules modules = loadConfigurationModules(log,
Thread.currentThread().getContextClassLoader());
diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle
index 8e8b77728e..cfca86b3cc 100644
--- a/modules/metastorage/build.gradle
+++ b/modules/metastorage/build.gradle
@@ -62,6 +62,7 @@ dependencies {
integrationTestImplementation project(":ignite-security")
integrationTestImplementation project(':ignite-metrics')
integrationTestImplementation project(':ignite-runner')
+ integrationTestImplementation project(':ignite-system-disaster-recovery')
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-network'))
integrationTestImplementation testFixtures(project(':ignite-raft'))
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index eb6c1d7b02..9c109bbe2c 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -60,6 +60,7 @@ import
org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -197,6 +198,7 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
this.cmgManager = new ClusterManagementGroupManager(
vaultManager,
+ new SystemDisasterRecoveryStorage(vaultManager),
clusterService,
clusterInitializer,
raftManager,
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index af288c02ad..20e887d07c 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -61,6 +61,7 @@ import
org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -186,6 +187,7 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
this.cmgManager = new ClusterManagementGroupManager(
vaultManager,
+ new SystemDisasterRecoveryStorage(vaultManager),
clusterService,
clusterInitializer,
raftManager,
diff --git a/modules/partition-replicator/build.gradle
b/modules/partition-replicator/build.gradle
index a9345575c4..71a1d37c80 100644
--- a/modules/partition-replicator/build.gradle
+++ b/modules/partition-replicator/build.gradle
@@ -76,6 +76,7 @@ dependencies {
integrationTestImplementation project(':ignite-placement-driver')
integrationTestImplementation project(':ignite-affinity')
integrationTestImplementation project(':ignite-runner')
+ integrationTestImplementation project(':ignite-system-disaster-recovery')
}
description = 'ignite-partition-replicator'
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index fea2d42cd3..5aea3a6229 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -100,6 +100,7 @@ import
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationSt
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -969,6 +970,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
cmgManager = new ClusterManagementGroupManager(
vaultManager,
+ new SystemDisasterRecoveryStorage(vaultManager),
clusterService,
clusterInitializer,
raftManager,
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index e95aa67397..8c64ae163c 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -90,6 +90,7 @@ dependencies {
implementation project(':ignite-partition-replicator')
implementation project(':ignite-catalog-compaction')
implementation project(':ignite-system-disaster-recovery')
+ implementation project(':ignite-system-disaster-recovery-api')
implementation libs.jetbrains.annotations
implementation libs.micronaut.inject
implementation libs.micronaut.validation
@@ -167,6 +168,7 @@ dependencies {
integrationTestImplementation project(':ignite-low-watermark')
integrationTestImplementation project(':ignite-partition-replicator')
integrationTestImplementation project(':ignite-configuration-root')
+ integrationTestImplementation project(':ignite-system-disaster-recovery')
integrationTestImplementation testFixtures(project(":ignite-api"))
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation
testFixtures(project(':ignite-configuration'))
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index af03f15f6e..239574d66e 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -53,6 +53,7 @@ import
org.apache.ignite.internal.configuration.storage.DistributedConfiguration
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -203,6 +204,7 @@ public class ItDistributedConfigurationPropertiesTest
extends BaseIgniteAbstract
cmgManager = new ClusterManagementGroupManager(
vaultManager,
+ new SystemDisasterRecoveryStorage(vaultManager),
clusterService,
clusterInitializer,
raftManager,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 5c9698a0c7..f8cfddef2d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -45,6 +45,7 @@ import
org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -175,6 +176,7 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
RaftGroupOptionsConfigHelper.configureProperties(cmgLogStorageFactory,
cmgWorkDir.metaPath());
cmgManager = new ClusterManagementGroupManager(
vaultManager,
+ new SystemDisasterRecoveryStorage(vaultManager),
clusterService,
clusterInitializer,
raftManager,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 896d7a1a23..86ea473c4c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -97,7 +97,6 @@ import
org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationExtensionConfiguration;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
-import org.apache.ignite.internal.cluster.management.ClusterIdService;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
@@ -122,6 +121,8 @@ import
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import org.apache.ignite.internal.disaster.system.ClusterIdService;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
@@ -334,7 +335,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
var clusterStateStorage = new
RocksDbClusterStateStorage(dir.resolve("cmg"), name);
- var clusterIdService = new ClusterIdService(clusterStateStorage);
+ var clusterIdService = new ClusterIdService(vault);
ConfigurationModules modules = loadConfigurationModules(log,
Thread.currentThread().getContextClassLoader());
@@ -428,6 +429,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
var cmgManager = new ClusterManagementGroupManager(
vault,
+ new SystemDisasterRecoveryStorage(vault),
clusterSvc,
clusterInitializer,
raftMgr,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 2ec53343ea..a095b99611 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -69,7 +69,6 @@ import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationCon
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationExtensionConfiguration;
import org.apache.ignite.internal.catalog.sql.IgniteCatalogSqlImpl;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
-import org.apache.ignite.internal.cluster.management.ClusterIdService;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.ClusterState;
@@ -120,8 +119,11 @@ import
org.apache.ignite.internal.deployunit.DeploymentManagerImpl;
import org.apache.ignite.internal.deployunit.IgniteDeployment;
import
org.apache.ignite.internal.deployunit.configuration.DeploymentExtensionConfiguration;
import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl;
+import org.apache.ignite.internal.disaster.system.ClusterIdService;
import org.apache.ignite.internal.disaster.system.ServerRestarter;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryManager;
import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryManagerImpl;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
import
org.apache.ignite.internal.eventlog.config.schema.EventLogExtensionConfiguration;
@@ -520,7 +522,7 @@ public class IgniteImpl implements Ignite {
// TODO: IGNITE-16841 - use common RocksDB instance to store cluster
state as well.
clusterStateStorage = new
RocksDbClusterStateStorage(cmgWorkDir.dbPath(), name);
- clusterIdService = new ClusterIdService(clusterStateStorage);
+ clusterIdService = new ClusterIdService(vaultMgr);
criticalWorkerRegistry = new CriticalWorkerWatchdog(
criticalWorkersConfiguration,
@@ -553,8 +555,7 @@ public class IgniteImpl implements Ignite {
clusterSvc.topologyService(),
clusterSvc.messagingService(),
vaultMgr,
- restarter,
- clusterStateStorage
+ restarter
);
clock = new HybridClockImpl();
@@ -629,6 +630,7 @@ public class IgniteImpl implements Ignite {
cmgMgr = new ClusterManagementGroupManager(
vaultMgr,
+ new SystemDisasterRecoveryStorage(vaultMgr),
clusterSvc,
clusterInitializer,
raftMgr,
@@ -1248,6 +1250,8 @@ public class IgniteImpl implements Ignite {
ComponentContext componentContext = new ComponentContext(joinExecutor);
return cmgMgr.joinFuture()
+ .thenComposeAsync(unused -> cmgMgr.clusterState(),
joinExecutor)
+
.thenAcceptAsync(systemDisasterRecoveryManager::saveClusterState, joinExecutor)
// Disable REST component during initialization.
.thenAcceptAsync(unused -> restComponent.disable(),
joinExecutor)
.thenComposeAsync(unused -> {
@@ -1434,6 +1438,16 @@ public class IgniteImpl implements Ignite {
return disasterRecoveryManager;
}
+ @TestOnly
+ public VaultManager vault() {
+ return vaultMgr;
+ }
+
+ @TestOnly
+ public ClusterStateStorage clusterStateStorage() {
+ return clusterStateStorage;
+ }
+
@TestOnly
public QueryProcessor queryEngine() {
return qryEngine;
@@ -1617,7 +1631,7 @@ public class IgniteImpl implements Ignite {
}, startupExecutor);
}
}, startupExecutor)
-
.thenRunAsync(systemDisasterRecoveryManager::markNodeInitialized,
startupExecutor);
+
.thenRunAsync(systemDisasterRecoveryManager::markInitConfigApplied,
startupExecutor);
}
/**
@@ -1765,4 +1779,9 @@ public class IgniteImpl implements Ignite {
return replicaMgr;
}
+ /** Returns disaster recovery manager for system groups. */
+ @TestOnly
+ public SystemDisasterRecoveryManager systemDisasterRecoveryManager() {
+ return systemDisasterRecoveryManager;
+ }
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
index c9a0a2982a..f80032706f 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
+import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteServer;
import org.apache.ignite.InitParameters;
@@ -49,6 +50,7 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeNotStartedException;
import org.apache.ignite.lang.NodeStartException;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Implementation of embedded node.
@@ -108,7 +110,8 @@ public class IgniteServerImpl implements IgniteServer {
*
* <p>Guarded by {@link #restartOrShutdownMutex}.
*/
- private CompletableFuture<Void> restartOrShutdownFuture =
nullCompletedFuture();
+ @Nullable
+ private CompletableFuture<Void> restartOrShutdownFuture;
/**
* Gets set to {@code true} when the node is shut down. This disallows
restarts.
@@ -248,15 +251,24 @@ public class IgniteServerImpl implements IgniteServer {
throw new NodeNotStartedException();
}
- throwIfNotJoined();
-
- result = restartOrShutdownFuture.thenCompose(unused ->
doRestartAsync(instance));
- restartOrShutdownFuture = result;
+ result = chainRestartOrShutdownAction(() ->
doRestartAsync(instance));
}
return result;
}
+ /**
+ * This MUST be called under synchronization on {@link
#restartOrShutdownMutex}.
+ */
+ private CompletableFuture<Void>
chainRestartOrShutdownAction(Supplier<CompletableFuture<Void>> action) {
+ CompletableFuture<Void> result = (restartOrShutdownFuture == null ?
nullCompletedFuture() : restartOrShutdownFuture)
+ .thenCompose(unused -> action.get());
+
+ restartOrShutdownFuture = result;
+
+ return result;
+ }
+
private CompletableFuture<Void> doRestartAsync(IgniteImpl instance) {
// TODO: IGNITE-23006 - limit the wait to acquire the write lock with
a timeout.
return attachmentLock.detachedAsync(() -> {
@@ -277,8 +289,7 @@ public class IgniteServerImpl implements IgniteServer {
CompletableFuture<Void> result;
synchronized (restartOrShutdownMutex) {
- result = restartOrShutdownFuture.thenCompose(unused ->
doShutdownAsync());
- restartOrShutdownFuture = result;
+ result = chainRestartOrShutdownAction(this::doShutdownAsync);
shutDown = true;
}
@@ -391,4 +402,27 @@ public class IgniteServerImpl implements IgniteServer {
throw ExceptionUtils.sneakyThrow(e);
}
}
+
+ /**
+ * Returns the underlying IgniteImpl even if the join was not completed.
+ */
+ @TestOnly
+ public IgniteImpl igniteImpl() {
+ IgniteImpl instance = ignite;
+ if (instance == null) {
+ throw new NodeNotStartedException();
+ }
+
+ return instance;
+ }
+
+ /**
+ * Returns future that gets completed when restart or shutdown is complete.
+ */
+ @TestOnly
+ public @Nullable CompletableFuture<Void> restartOrShutdownFuture() {
+ synchronized (restartOrShutdownMutex) {
+ return restartOrShutdownFuture;
+ }
+ }
}
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index 7be5bc0e18..15f32cfa58 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -259,7 +259,7 @@ public class Cluster {
* @return Started server and its registration future.
*/
public ServerRegistration startEmbeddedNode(int nodeIndex, String
nodeBootstrapConfigTemplate) {
- String nodeName = testNodeName(testInfo, nodeIndex);
+ String nodeName = nodeName(nodeIndex);
String config = IgniteStringFormatter.format(
nodeBootstrapConfigTemplate,
@@ -288,6 +288,15 @@ public class Cluster {
return new ServerRegistration(node, registrationFuture);
}
+ /**
+ * Returns node name by index.
+ *
+ * @param nodeIndex Index of the node of interest.
+ */
+ public String nodeName(int nodeIndex) {
+ return testNodeName(testInfo, nodeIndex);
+ }
+
private static <T> void setListAtIndex(List<T> list, int i, T element) {
while (list.size() < i) {
list.add(null);
diff --git a/modules/system-disaster-recovery-api/README.md
b/modules/system-disaster-recovery-api/README.md
new file mode 100644
index 0000000000..2e8dc4b209
--- /dev/null
+++ b/modules/system-disaster-recovery-api/README.md
@@ -0,0 +1,3 @@
+# System disaster recovery API module
+
+This module provides API to recover from disasters related to the system Raft
groups (CMG and Metastorage group).
diff --git a/modules/system-disaster-recovery/build.gradle
b/modules/system-disaster-recovery-api/build.gradle
similarity index 85%
copy from modules/system-disaster-recovery/build.gradle
copy to modules/system-disaster-recovery-api/build.gradle
index 333b8e7938..4c93ec2153 100644
--- a/modules/system-disaster-recovery/build.gradle
+++ b/modules/system-disaster-recovery-api/build.gradle
@@ -26,14 +26,11 @@ dependencies {
implementation project(':ignite-core')
implementation project(':ignite-network-api')
implementation project(':ignite-vault')
- implementation project(':ignite-cluster-management')
implementation libs.jetbrains.annotations
testImplementation libs.hamcrest.core
testImplementation libs.mockito.core
testImplementation libs.mockito.junit
- testImplementation testFixtures(project(":ignite-core"))
- testImplementation testFixtures(project(":ignite-cluster-management"))
}
-description = 'ignite-system-disaster-recovery'
+description = 'ignite-system-disaster-recovery-api'
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
similarity index 100%
rename from
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
rename to
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
similarity index 100%
copy from
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
copy to
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/storage/ClusterResetStorage.java
similarity index 61%
rename from
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
rename to
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/storage/ClusterResetStorage.java
index c6f0b0fa07..82ca25f3d8 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
+++
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/storage/ClusterResetStorage.java
@@ -15,17 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.disaster.system.message;
+package org.apache.ignite.internal.disaster.system.storage;
-import org.apache.ignite.internal.network.annotations.MessageGroup;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import org.jetbrains.annotations.Nullable;
/**
- * Message Group for disaster recovery of system groups.
+ * Storage used by the cluster reset tools.
*/
-@MessageGroup(groupType = 15, groupName = "SystemDisasterRecoveryMessages")
-public class SystemDisasterRecoveryMessageGroup {
+public interface ClusterResetStorage {
/**
- * Message type for {@link ResetClusterMessage}.
+ * Reads {@link ResetClusterMessage}; returns {@code null} if it's not
saved.
*/
- public static final short RESET_CLUSTER = 1;
+ @Nullable ResetClusterMessage readResetClusterMessage();
+
+ /**
+ * Removes saved {@link ResetClusterMessage}.
+ */
+ void removeResetClusterMessage();
}
diff --git a/modules/system-disaster-recovery/build.gradle
b/modules/system-disaster-recovery/build.gradle
index 333b8e7938..e25c4e31fc 100644
--- a/modules/system-disaster-recovery/build.gradle
+++ b/modules/system-disaster-recovery/build.gradle
@@ -19,9 +19,10 @@ apply from: "$rootDir/buildscripts/java-core.gradle"
apply from: "$rootDir/buildscripts/publishing.gradle"
apply from: "$rootDir/buildscripts/java-junit5.gradle"
apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
+apply from: "$rootDir/buildscripts/java-integration-test.gradle"
dependencies {
- annotationProcessor project(":ignite-network-annotation-processor")
+ api project(':ignite-system-disaster-recovery-api')
implementation project(':ignite-core')
implementation project(':ignite-network-api')
@@ -34,6 +35,16 @@ dependencies {
testImplementation libs.mockito.junit
testImplementation testFixtures(project(":ignite-core"))
testImplementation testFixtures(project(":ignite-cluster-management"))
+ testImplementation libs.jetbrains.annotations
+
+ integrationTestImplementation
project(':ignite-system-disaster-recovery-api')
+ integrationTestImplementation project(':ignite-cluster-management')
+ integrationTestImplementation project(':ignite-vault')
+ integrationTestImplementation project(':ignite-network-api')
+ integrationTestImplementation testFixtures(project(':ignite-core'))
+ integrationTestImplementation testFixtures(project(':ignite-api'))
+ integrationTestImplementation testFixtures(project(':ignite-runner'))
+ integrationTestImplementation libs.jetbrains.annotations
}
description = 'ignite-system-disaster-recovery'
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
new file mode 100644
index 0000000000..3cdf5bd9ee
--- /dev/null
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.app.IgniteServerImpl;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+class ItCmgDisasterRecoveryTest extends ClusterPerTestIntegrationTest {
+ @Override
+ protected int initialNodes() {
+ return 0;
+ }
+
+ @Test
+ void repairWhenCmgWas1Node() throws Exception {
+ cluster.startAndInit(3, paramsBuilder -> {
+ paramsBuilder.cmgNodeNames(nodeNames(0));
+ paramsBuilder.metaStorageNodeNames(nodeNames(1));
+ // Node with index 2 will host neither of voting sets.
+ });
+ waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+ // This makes the CMG majority go away.
+ cluster.stopNode(0);
+
+ IgniteImpl igniteImpl1BeforeRestart = igniteImpl(1);
+
+ assertThatCmgHasNoMajority(igniteImpl1BeforeRestart);
+
+ initiateCmgRepairVia(igniteImpl1BeforeRestart, 1);
+
+ IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
+ waitTillCmgHasMajority(restartedIgniteImpl1);
+
+ assertResetClusterMessageIsNotPresentAt(restartedIgniteImpl1);
+
assertResetClusterMessageIsNotPresentAt(waitTillNodeRestartsInternally(2));
+ }
+
+ private void waitTillClusterStateIsSavedToVaultOnConductor(int nodeIndex)
throws InterruptedException {
+ assertTrue(waitForCondition(
+ () -> new
SystemDisasterRecoveryStorage(igniteImpl(nodeIndex).vault()).readClusterState()
!= null,
+ SECONDS.toMillis(10)
+ ));
+ }
+
+ private static void assertThatCmgHasNoMajority(IgniteImpl
igniteImpl1BeforeRestart) {
+
assertThat(igniteImpl1BeforeRestart.logicalTopologyService().logicalTopologyOnLeader(),
willTimeoutIn(1, SECONDS));
+ }
+
+ private static void waitTillCmgHasMajority(IgniteImpl
restartedIgniteImpl1) {
+
assertThat(restartedIgniteImpl1.logicalTopologyService().logicalTopologyOnLeader(),
willCompleteSuccessfully());
+ }
+
+ private void initiateCmgRepairVia(IgniteImpl conductor, int...
newCmgIndexes) {
+ // TODO: IGNITE-22812 - initiate repair via CLI.
+
+ CompletableFuture<Void> initiationFuture =
conductor.systemDisasterRecoveryManager()
+ .resetCluster(List.of(nodeNames(newCmgIndexes)));
+ assertThat(initiationFuture, willCompleteSuccessfully());
+ }
+
+ private String[] nodeNames(int... nodeIndexes) {
+ return IntStream.of(nodeIndexes)
+ .mapToObj(cluster::nodeName)
+ .toArray(String[]::new);
+ }
+
+ private IgniteImpl waitTillNodeRestartsInternally(int nodeIndex) throws
InterruptedException {
+ // restartOrShutdownFuture() becomes non-null when restart or shutdown
is initiated; we know it's restart.
+
+ assertTrue(waitForCondition(() -> restartOrShutdownFuture(nodeIndex)
!= null, SECONDS.toMillis(20)));
+ assertThat(restartOrShutdownFuture(nodeIndex),
willCompleteSuccessfully());
+
+ return unwrapIgniteImpl(cluster.server(nodeIndex).api());
+ }
+
+ @Nullable
+ private CompletableFuture<Void> restartOrShutdownFuture(int nodeIndex) {
+ return ((IgniteServerImpl)
cluster.server(nodeIndex)).restartOrShutdownFuture();
+ }
+
+ private static void assertResetClusterMessageIsNotPresentAt(IgniteImpl
ignite) {
+ assertThat(new
SystemDisasterRecoveryStorage(ignite.vault()).readResetClusterMessage(),
is(nullValue()));
+ }
+
+ @Test
+ void repairWhenCmgWas3Nodes() throws Exception {
+ cluster.startAndInit(6, paramsBuilder -> {
+ paramsBuilder.cmgNodeNames(nodeNames(0, 1, 2));
+ paramsBuilder.metaStorageNodeNames(nodeNames(2, 3, 4));
+ // Node with index 5 will host neither of voting sets.
+ });
+ waitTillClusterStateIsSavedToVaultOnConductor(2);
+
+ // Stop the majority of CMG.
+ IntStream.of(0, 1).parallel().forEach(cluster::stopNode);
+
+ IgniteImpl igniteImpl2BeforeRestart = igniteImpl(2);
+
+ assertThatCmgHasNoMajority(igniteImpl2BeforeRestart);
+
+ initiateCmgRepairVia(igniteImpl2BeforeRestart, 2, 3, 4);
+
+ IgniteImpl restartedIgniteImpl2 = waitTillNodeRestartsInternally(2);
+ waitTillCmgHasMajority(restartedIgniteImpl2);
+
+ // TODO: IGNITE-23096 - remove after the hang is fixed.
+ waitTillNodesRestartInProcess(3, 4, 5);
+ }
+
+ private void waitTillNodesRestartInProcess(int... nodeIndexes) throws
InterruptedException {
+ for (int i : nodeIndexes) {
+ waitTillNodeRestartsInternally(i);
+ }
+ }
+
+ @Test
+ void repairedClusterCanJoinBlankNodes() throws Exception {
+ cluster.startAndInit(2, paramsBuilder -> {
+ paramsBuilder.cmgNodeNames(nodeNames(0));
+ paramsBuilder.metaStorageNodeNames(nodeNames(1));
+ });
+ waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+ cluster.stopNode(0);
+
+ initiateCmgRepairVia(igniteImpl(1), 1);
+
+ // Doing this wait to make sure that blank node will be able to
connect at least someone. If we don't do this, the new node
+ // will still be able to connect, but this will happen on Scalecube's
initial sync retry, and we don't want to wait for it
+ // in our test.
+ waitTillNodeRestartsInternally(1);
+
+ IgniteImpl node2 = unwrapIgniteImpl(cluster.startNode(2));
+
+ assertTrue(waitForCondition(
+ () ->
node2.logicalTopologyService().localLogicalTopology().nodes().stream()
+ .anyMatch(n -> node2.name().equals(n.name())),
+ SECONDS.toMillis(10)
+ ));
+ }
+
+ @Test
+ void repairIsPossibleWhenAllNodesWaitForCmgMajorityOnJoin() throws
Exception {
+ cluster.startAndInit(3, paramsBuilder -> {
+ paramsBuilder.cmgNodeNames(nodeNames(0));
+ paramsBuilder.metaStorageNodeNames(nodeNames(1));
+ // Node with index 2 will host neither of voting sets.
+ });
+ waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+ cluster.stopNode(0);
+
+ IntStream.of(1, 2).parallel().forEach(this::restartPartially);
+
+ initiateCmgRepairVia(((IgniteServerImpl)
cluster.server(1)).igniteImpl(), 1);
+
+ IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
+ waitTillCmgHasMajority(restartedIgniteImpl1);
+
+ // TODO: IGNITE-23096 - remove after the hang is fixed.
+ waitTillNodeRestartsInternally(2);
+ }
+
+ private void restartPartially(int index) {
+ cluster.stopNode(index);
+ cluster.startEmbeddedNode(index);
+ }
+
+ @Test
+ void nodesThatSawNoReparationHaveSeparatePhysicalTopology() throws
Exception {
+ cluster.startAndInit(2, paramsBuilder -> {
+ paramsBuilder.cmgNodeNames(nodeNames(0));
+ paramsBuilder.metaStorageNodeNames(nodeNames(1));
+ });
+ waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+ // This makes the CMG majority go away.
+ cluster.stopNode(0);
+
+ initiateCmgRepairVia(igniteImpl(1), 1);
+
+ IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
+ waitTillCmgHasMajority(restartedIgniteImpl1);
+
+ // Starting the node that did not see the repair.
+ cluster.startEmbeddedNode(0);
+
+ assertFalse(
+ waitForCondition(() ->
restartedIgniteImpl1.clusterNodes().size() > 1, SECONDS.toMillis(3)),
+ "Nodes from different clusters were able to establish a
connection"
+ );
+ }
+}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdService.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
similarity index 53%
rename from
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdService.java
rename to
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
index 9a001d5d8c..d8138db464 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdService.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
@@ -15,35 +15,46 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.cluster.management;
+package org.apache.ignite.internal.disaster.system;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
-import
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
+import org.apache.ignite.internal.cluster.management.ClusterIdStore;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
/**
* Used to handle volatile information about cluster ID used to restrict which
nodes can connect this one and vice versa.
*/
-public class ClusterIdService extends ClusterIdHolder implements
IgniteComponent {
- private final ClusterStateStorage clusterStateStorage;
+public class ClusterIdService implements ClusterIdSupplier, ClusterIdStore,
IgniteComponent {
+ private final SystemDisasterRecoveryStorage storage;
- public ClusterIdService(ClusterStateStorage clusterStateStorage) {
- this.clusterStateStorage = clusterStateStorage;
+ private volatile @Nullable UUID clusterId;
+ private volatile @Nullable UUID clusterIdOverride;
+
+ public ClusterIdService(VaultManager vault) {
+ storage = new SystemDisasterRecoveryStorage(vault);
}
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
- var clusterStateManager = new
ClusterStateStorageManager(clusterStateStorage);
-
- ClusterState clusterState = clusterStateManager.getClusterState();
+ ClusterState clusterState = storage.readClusterState();
if (clusterState != null) {
clusterId(clusterState.clusterTag().clusterId());
}
+ ResetClusterMessage resetClusterMessage =
storage.readResetClusterMessage();
+ if (resetClusterMessage != null) {
+ this.clusterIdOverride = resetClusterMessage.clusterId();
+ }
+
return nullCompletedFuture();
}
@@ -51,4 +62,19 @@ public class ClusterIdService extends ClusterIdHolder
implements IgniteComponent
public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
return nullCompletedFuture();
}
+
+ @Override
+ public UUID clusterId() {
+ UUID override = clusterIdOverride;
+ if (override != null) {
+ return override;
+ }
+
+ return clusterId;
+ }
+
+ @Override
+ public void clusterId(UUID newClusterId) {
+ clusterId = newClusterId;
+ }
}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
index 0cb06071fb..ea2779a244 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.disaster.system;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
/**
@@ -26,9 +27,17 @@ import
org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
*/
public interface SystemDisasterRecoveryManager {
/**
- * Marks this node as initialized.
+ * Saves cluster state to make sure it can be used to initiate CMG/MG
repair.
+ *
+ * @param clusterState State to save.
+ */
+ void saveClusterState(ClusterState clusterState);
+
+ /**
+ * Marks this node as a node that saw initial configuration application.
After this happens, the initial configuration
+ * in CMG is not needed anymore and can be disposed.
*/
- void markNodeInitialized();
+ void markInitConfigApplied();
/**
* Initiates cluster reset.
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
index 8db5db4a08..90299e3272 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
@@ -22,7 +22,6 @@ import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.CompletableFutures.allOf;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -40,19 +39,15 @@ import java.util.concurrent.Executor;
import org.apache.ignite.internal.cluster.management.ClusterState;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
-import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
-import
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
-import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.util.CompletableFutures;
-import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.network.ClusterNode;
@@ -60,20 +55,16 @@ import org.apache.ignite.network.ClusterNode;
* Implementation of {@link SystemDisasterRecoveryManager}.
*/
public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecoveryManager, IgniteComponent {
- private static final String NODE_INITIALIZED_VAULT_KEY =
"systemRecovery.nodeInitialized";
- private static final String RESET_CLUSTER_MESSAGE_VAULT_KEY =
"systemRecovery.resetClusterMessage";
-
private final String thisNodeName;
private final TopologyService topologyService;
private final MessagingService messagingService;
- private final VaultManager vaultManager;
private final ServerRestarter restarter;
- private final ClusterStateStorageManager clusterStateStorageManager;
-
private final SystemDisasterRecoveryMessagesFactory messagesFactory = new
SystemDisasterRecoveryMessagesFactory();
private static final CmgMessagesFactory cmgMessagesFactory = new
CmgMessagesFactory();
+ private final SystemDisasterRecoveryStorage storage;
+
/** This executor spawns a thread per task and should only be used for
very rare tasks. */
private final Executor restartExecutor;
@@ -83,17 +74,14 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
TopologyService topologyService,
MessagingService messagingService,
VaultManager vaultManager,
- ServerRestarter restarter,
- ClusterStateStorage clusterStateStorage
+ ServerRestarter restarter
) {
this.thisNodeName = thisNodeName;
this.topologyService = topologyService;
this.messagingService = messagingService;
- this.vaultManager = vaultManager;
this.restarter = restarter;
- clusterStateStorageManager = new
ClusterStateStorageManager(clusterStateStorage);
-
+ storage = new SystemDisasterRecoveryStorage(vaultManager);
restartExecutor = new ThreadPerTaskExecutor(thisNodeName +
"-restart-");
}
@@ -111,7 +99,7 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
private void handleResetClusterMessage(ResetClusterMessage message,
ClusterNode sender, long correlationId) {
restartExecutor.execute(() -> {
- vaultManager.put(new ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY),
toBytes(message));
+ storage.saveResetClusterMessage(message);
messagingService.respond(sender, successResponseMessage(),
correlationId)
.thenRunAsync(() -> {
@@ -122,7 +110,7 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
});
}
- private SuccessResponseMessage successResponseMessage() {
+ private static SuccessResponseMessage successResponseMessage() {
return cmgMessagesFactory.successResponseMessage().build();
}
@@ -132,8 +120,13 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
}
@Override
- public void markNodeInitialized() {
- vaultManager.put(new ByteArray(NODE_INITIALIZED_VAULT_KEY), new
byte[]{1});
+ public void saveClusterState(ClusterState clusterState) {
+ storage.saveClusterState(clusterState);
+ }
+
+ @Override
+ public void markInitConfigApplied() {
+ storage.markInitConfigApplied();
}
@Override
@@ -152,7 +145,7 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
Collection<ClusterNode> nodesInTopology = topologyService.allMembers();
ensureAllProposedCmgNodesAreInTopology(proposedCmgConsistentIds,
nodesInTopology);
- ensureNodeIsInitialized();
+ ensureInitConfigApplied();
ClusterState clusterState = ensureClusterStateIsPresent();
ResetClusterMessage message = buildResetClusterMessage(
@@ -188,10 +181,9 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
return responseFutures;
}
- private void ensureNodeIsInitialized() {
- VaultEntry initializedEntry = vaultManager.get(new
ByteArray(NODE_INITIALIZED_VAULT_KEY));
- if (initializedEntry == null) {
- throw new ClusterResetException("Node is not initialized and
cannot serve as a cluster reset conductor.");
+ private void ensureInitConfigApplied() {
+ if (!storage.isInitConfigApplied()) {
+ throw new ClusterResetException("Initial configuration is not
applied and cannot serve as a cluster reset conductor.");
}
}
@@ -222,7 +214,7 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
}
private ClusterState ensureClusterStateIsPresent() {
- ClusterState clusterState =
clusterStateStorageManager.getClusterState();
+ ClusterState clusterState = storage.readClusterState();
if (clusterState == null) {
throw new ClusterResetException("Node does not have cluster
state.");
}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
new file mode 100644
index 0000000000..200a19b99b
--- /dev/null
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import org.apache.ignite.internal.disaster.system.storage.ClusterResetStorage;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage used by tools for disaster recovery of system groups.
+ */
+public class SystemDisasterRecoveryStorage implements ClusterResetStorage {
+ private static final ByteArray INIT_CONFIG_APPLIED_VAULT_KEY = new
ByteArray("systemRecovery.initConfigApplied");
+ private static final ByteArray CLUSTER_STATE_VAULT_KEY = new
ByteArray("systemRecovery.clusterState");
+ private static final ByteArray RESET_CLUSTER_MESSAGE_VAULT_KEY = new
ByteArray("systemRecovery.resetClusterMessage");
+
+ private final VaultManager vault;
+
+ /** Constructor. */
+ public SystemDisasterRecoveryStorage(VaultManager vault) {
+ this.vault = vault;
+ }
+
+ @Override
+ public @Nullable ResetClusterMessage readResetClusterMessage() {
+ return readFromVault(RESET_CLUSTER_MESSAGE_VAULT_KEY);
+ }
+
+ @Override
+ public void removeResetClusterMessage() {
+ vault.remove(RESET_CLUSTER_MESSAGE_VAULT_KEY);
+ }
+
+ @Nullable ClusterState readClusterState() {
+ return readFromVault(CLUSTER_STATE_VAULT_KEY);
+ }
+
+ private <T> @Nullable T readFromVault(ByteArray key) {
+ VaultEntry entry = vault.get(key);
+ return entry != null ? ByteUtils.fromBytes(entry.value()) : null;
+ }
+
+ void saveClusterState(ClusterState clusterState) {
+ vault.put(CLUSTER_STATE_VAULT_KEY, ByteUtils.toBytes(clusterState));
+ }
+
+ boolean isInitConfigApplied() {
+ VaultEntry appliedEntry = vault.get(INIT_CONFIG_APPLIED_VAULT_KEY);
+ return appliedEntry != null;
+ }
+
+ void markInitConfigApplied() {
+ vault.put(INIT_CONFIG_APPLIED_VAULT_KEY, BYTE_EMPTY_ARRAY);
+ }
+
+ void saveResetClusterMessage(ResetClusterMessage message) {
+ vault.put(RESET_CLUSTER_MESSAGE_VAULT_KEY, ByteUtils.toBytes(message));
+ }
+}
diff --git
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index 013ec2b1cc..84cbd7c7d6 100644
---
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -25,11 +25,14 @@ import static
org.apache.ignite.internal.cluster.management.ClusterTag.randomClu
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
@@ -56,9 +59,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.cluster.management.ClusterState;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
-import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
-import
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
-import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
@@ -96,8 +96,9 @@ import org.mockito.junit.jupiter.MockitoExtension;
class SystemDisasterRecoveryManagerImplTest extends BaseIgniteAbstractTest {
private static final String CLUSTER_NAME = "cluster";
- private static final String NODE_INITIALIZED_VAULT_KEY =
"systemRecovery.nodeInitialized";
- private static final String RESET_CLUSTER_MESSAGE_VAULT_KEY =
"systemRecovery.resetClusterMessage";
+ private static final ByteArray INIT_CONFIG_APPLIED_VAULT_KEY = new
ByteArray("systemRecovery.initConfigApplied");
+ private static final ByteArray CLUSTER_STATE_VAULT_KEY = new
ByteArray("systemRecovery.clusterState");
+ private static final ByteArray RESET_CLUSTER_MESSAGE_VAULT_KEY = new
ByteArray("systemRecovery.resetClusterMessage");
@WorkDirectory
private Path workDir;
@@ -115,9 +116,6 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
@Mock
private ServerRestarter restarter;
- private final ClusterStateStorage clusterStateStorage = new
TestClusterStateStorage();
- private final ClusterStateStorageManager clusterStateStorageManager = new
ClusterStateStorageManager(clusterStateStorage);
-
private SystemDisasterRecoveryManagerImpl manager;
private final ComponentContext componentContext = new ComponentContext();
@@ -154,8 +152,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
topologyService,
messagingService,
vaultManager,
- restarter,
- clusterStateStorage
+ restarter
);
assertThat(manager.startAsync(componentContext),
willCompleteSuccessfully());
}
@@ -167,12 +164,23 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
}
@Test
- void marksNodeInitialized() {
- manager.markNodeInitialized();
+ void marksInitConfigApplied() {
+ manager.markInitConfigApplied();
+
+ VaultEntry entry = vaultManager.get(INIT_CONFIG_APPLIED_VAULT_KEY);
+ assertThat(entry, is(notNullValue()));
+ assertThat(entry.value(), is(notNullValue()));
+ }
+
+ @Test
+ void savesClusterState() {
+ manager.saveClusterState(usualClusterState);
- VaultEntry entry = vaultManager.get(new
ByteArray(NODE_INITIALIZED_VAULT_KEY));
+ VaultEntry entry = vaultManager.get(CLUSTER_STATE_VAULT_KEY);
assertThat(entry, is(notNullValue()));
- assertThat(entry.value(), is(new byte[]{1}));
+
+ ClusterState savedState = fromBytes(entry.value());
+ assertThat(savedState, is(equalTo(usualClusterState)));
}
@Test
@@ -210,7 +218,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
@Test
void resetClusterRequiresClusterState() {
when(topologyService.allMembers()).thenReturn(List.of(thisNode));
- marksNodeInitialized();
+ markinitConfigApplied();
ClusterResetException ex = assertWillThrow(
manager.resetCluster(List.of(thisNodeName)),
@@ -221,7 +229,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
}
@Test
- void resetClusterRequiresNodeToBeInitialized() {
+ void resetClusterRequiresInitConfigToBeApplied() {
when(topologyService.allMembers()).thenReturn(List.of(thisNode));
putClusterState();
@@ -230,15 +238,15 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
ClusterResetException.class,
10, SECONDS
);
- assertThat(ex.getMessage(), is("Node is not initialized and cannot
serve as a cluster reset conductor."));
+ assertThat(ex.getMessage(), is("Initial configuration is not applied
and cannot serve as a cluster reset conductor."));
}
private void putClusterState() {
- clusterStateStorageManager.putClusterState(usualClusterState);
+ vaultManager.put(CLUSTER_STATE_VAULT_KEY, toBytes(usualClusterState));
}
- private void makeNodeInitialized() {
- vaultManager.put(new ByteArray(NODE_INITIALIZED_VAULT_KEY), new
byte[]{1});
+ private void markinitConfigApplied() {
+ vaultManager.put(INIT_CONFIG_APPLIED_VAULT_KEY, BYTE_EMPTY_ARRAY);
}
@Test
@@ -271,7 +279,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
}
private void prepareNodeStateForClusterReset() {
- makeNodeInitialized();
+ markinitConfigApplied();
putClusterState();
}
@@ -364,7 +372,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()),
conductor, 0L);
waitTillResetClusterMessageGetsSavedToVault();
- VaultEntry entry = vaultManager.get(new
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY));
+ VaultEntry entry = vaultManager.get(RESET_CLUSTER_MESSAGE_VAULT_KEY);
assertThat(entry, is(notNullValue()));
ResetClusterMessage savedMessage = fromBytes(entry.value());
@@ -373,7 +381,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
}
private void waitTillResetClusterMessageGetsSavedToVault() throws
InterruptedException {
- assertTrue(waitForCondition(() -> vaultManager.get(new
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)) != null, 10_000));
+ assertTrue(waitForCondition(() ->
vaultManager.get(RESET_CLUSTER_MESSAGE_VAULT_KEY) != null, 10_000));
}
private NetworkMessageHandler extractMessageHandler() {
@@ -416,7 +424,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
InOrder inOrder = inOrder(messagingService, vaultManager);
- inOrder.verify(vaultManager, timeout(SECONDS.toMillis(10))).put(eq(new
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)), any());
+ inOrder.verify(vaultManager,
timeout(SECONDS.toMillis(10))).put(eq(RESET_CLUSTER_MESSAGE_VAULT_KEY), any());
inOrder.verify(messagingService,
timeout(SECONDS.toMillis(10))).respond(eq(conductor), messageCaptor.capture(),
eq(123L));
assertThat(messageCaptor.getValue(),
instanceOf(SuccessResponseMessage.class));
@@ -433,9 +441,9 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
InOrder inOrder = inOrder(messagingService, vaultManager, restarter);
- inOrder.verify(vaultManager).put(eq(new
ByteArray(RESET_CLUSTER_MESSAGE_VAULT_KEY)), any());
- inOrder.verify(messagingService).respond(eq(conductor),
messageCaptor.capture(), eq(123L));
- inOrder.verify(restarter).initiateRestart();
+ inOrder.verify(vaultManager,
timeout(SECONDS.toMillis(10))).put(eq(RESET_CLUSTER_MESSAGE_VAULT_KEY), any());
+ inOrder.verify(messagingService,
timeout(SECONDS.toMillis(10))).respond(eq(conductor), messageCaptor.capture(),
eq(123L));
+ inOrder.verify(restarter,
timeout(SECONDS.toMillis(10))).initiateRestart();
assertThat(messageCaptor.getValue(),
instanceOf(SuccessResponseMessage.class));
}
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 3b98808ca9..fb4e317081 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -151,6 +151,7 @@ dependencies {
integrationTestImplementation project(':ignite-system-view')
integrationTestImplementation project(':ignite-partition-replicator')
integrationTestImplementation project(':ignite-configuration-root')
+ integrationTestImplementation project(':ignite-system-disaster-recovery')
integrationTestImplementation(testFixtures(project))
integrationTestImplementation(testFixtures(project(':ignite-core')))
integrationTestImplementation(testFixtures(project(':ignite-table')))
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 9d26e72700..5f547dac2d 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -127,6 +127,7 @@ import
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationSt
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
@@ -1194,6 +1195,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
cmgManager = new ClusterManagementGroupManager(
vaultManager,
+ new SystemDisasterRecoveryStorage(vaultManager),
clusterService,
clusterInitializer,
raftManager,
diff --git a/settings.gradle b/settings.gradle
index b6a2e84ec4..1f1ffaf2c2 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -90,6 +90,7 @@ include(":ignite-low-watermark")
include(":ignite-partition-replicator")
include(':ignite-configuration-root')
include(':ignite-system-disaster-recovery')
+include(':ignite-system-disaster-recovery-api')
project(":ignite-examples").projectDir = file('examples')
project(":ignite-dev-utilities").projectDir = file('dev-utilities')
@@ -165,6 +166,7 @@ project(":ignite-low-watermark").projectDir =
file('modules/low-watermark')
project(":ignite-partition-replicator").projectDir =
file('modules/partition-replicator')
project(":ignite-configuration-root").projectDir =
file('modules/configuration-root')
project(":ignite-system-disaster-recovery").projectDir =
file('modules/system-disaster-recovery')
+project(":ignite-system-disaster-recovery-api").projectDir =
file('modules/system-disaster-recovery-api')
ext.isCiServer = System.getenv().containsKey("IGNITE_CI")