This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new faad7e2992 IGNITE-23579 Improve diagnostics of CMG/MG disaster
recovery (#4658)
faad7e2992 is described below
commit faad7e2992cab6c54495f21f1dc6e991983bbe15
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 30 19:51:23 2024 +0400
IGNITE-23579 Improve diagnostics of CMG/MG disaster recovery (#4658)
---
.../cluster/ItSystemDisasterRecoveryCliTest.java | 12 +++----
.../management/ClusterManagementGroupManager.java | 10 +++---
.../ignite/internal/app/IgniteServerImpl.java | 37 ++++++++++++++++++----
.../java/org/apache/ignite/internal/Cluster.java | 8 +++++
.../disaster/system/ItCmgDisasterRecoveryTest.java | 6 ++--
.../ItMetastorageGroupDisasterRecoveryTest.java | 5 +--
.../system/ItSystemGroupDisasterRecoveryTest.java | 16 +++++-----
7 files changed, 63 insertions(+), 31 deletions(-)
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/cluster/ItSystemDisasterRecoveryCliTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/cluster/ItSystemDisasterRecoveryCliTest.java
index 1250042d18..143641a8a2 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/cluster/ItSystemDisasterRecoveryCliTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/cluster/ItSystemDisasterRecoveryCliTest.java
@@ -30,17 +30,17 @@ import org.jetbrains.annotations.Nullable;
abstract class ItSystemDisasterRecoveryCliTest extends CliIntegrationTest {
static void waitTillNodeRestartsInternally(int nodeIndex) throws
InterruptedException {
- // restartOrShutdownFuture() becomes non-null when restart or shutdown
is initiated; we know it's restart.
+ // restartFuture() becomes non-null when restart is initiated.
assertTrue(
- waitForCondition(() -> restartOrShutdownFuture(nodeIndex) !=
null, SECONDS.toMillis(20)),
- "Node did not attempt to be restarted (or shut down) in time"
+ waitForCondition(() -> restartFuture(nodeIndex) != null,
SECONDS.toMillis(20)),
+ "Node did not attempt to be restarted in time"
);
- assertThat(restartOrShutdownFuture(nodeIndex),
willCompleteSuccessfully());
+ assertThat(restartFuture(nodeIndex), willCompleteSuccessfully());
}
@Nullable
- private static CompletableFuture<Void> restartOrShutdownFuture(int
nodeIndex) {
- return ((IgniteServerImpl)
CLUSTER.server(nodeIndex)).restartOrShutdownFuture();
+ private static CompletableFuture<Void> restartFuture(int nodeIndex) {
+ return ((IgniteServerImpl) CLUSTER.server(nodeIndex)).restartFuture();
}
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index d2b2eb023c..2a19f38675 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -437,7 +437,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
// Since we recovered state we do not supply a new
initialClusterConfig.
return startCmgRaftServiceWithEvents(localState.cmgNodeNames(), null)
- .thenCompose(service -> joinCluster(service,
localState.clusterTag()));
+ .thenCompose(service -> validateAgainstCluster(service,
localState.clusterTag()));
}
/**
@@ -526,7 +526,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
localStateStorage.saveLocalState(localState);
- return joinCluster(service, state.clusterTag());
+ return validateAgainstCluster(service, state.clusterTag());
});
}
@@ -721,12 +721,12 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
}
}
- private CompletableFuture<CmgRaftService> joinCluster(CmgRaftService
service, ClusterTag clusterTag) {
+ private CompletableFuture<CmgRaftService>
validateAgainstCluster(CmgRaftService service, ClusterTag clusterTag) {
return service.startJoinCluster(clusterTag, nodeAttributes)
.thenApply(v -> service)
.whenComplete((v, e) -> {
if (e == null) {
- LOG.info("Successfully joined the cluster [name={}]",
clusterTag.clusterName());
+ LOG.info("Successfully validated against the cluster
[name={}]", clusterTag.clusterName());
joinFuture.complete(null);
} else {
@@ -826,7 +826,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
localStateStorage.saveLocalState(localState);
- return joinCluster(service, state.clusterTag());
+ return validateAgainstCluster(service, state.clusterTag());
});
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
index 058282e298..ddd85812df 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.app;
import static java.lang.System.lineSeparator;
+import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.function.Function.identity;
@@ -119,6 +120,14 @@ public class IgniteServerImpl implements IgniteServer {
@Nullable
private CompletableFuture<Void> restartOrShutdownFuture;
+ /**
+ * Future of the last internal restart future ({@code null} if no internal
restarts where made).
+ *
+ * <p>Guarded by {@link #restartOrShutdownMutex}.
+ */
+ @Nullable
+ private CompletableFuture<Void> restartFuture;
+
/**
* Gets set to {@code true} when the node shutdown is initiated. This
disallows restarts.
*/
@@ -256,6 +265,8 @@ public class IgniteServerImpl implements IgniteServer {
}
result = chainRestartOrShutdownAction(() ->
doRestartAsync(instance));
+
+ restartFuture = result;
}
return result;
@@ -266,10 +277,11 @@ public class IgniteServerImpl implements IgniteServer {
*/
private CompletableFuture<Void>
chainRestartOrShutdownAction(Supplier<CompletableFuture<Void>> action) {
CompletableFuture<Void> result = (restartOrShutdownFuture == null ?
nullCompletedFuture() : restartOrShutdownFuture)
+ // Suppress exceptions to make sure previous errors will not
affect this step.
+ .handle((res, ex) -> null)
.thenCompose(unused -> action.get());
- // Suppress exceptions to make sure following futures can be executed.
- restartOrShutdownFuture = result.handle((res, ex) -> null);
+ restartOrShutdownFuture = result;
return result;
}
@@ -278,6 +290,7 @@ public class IgniteServerImpl implements IgniteServer {
// TODO: IGNITE-23006 - limit the wait to acquire the write lock with
a timeout.
return attachmentLock.detachedAsync(() -> {
synchronized (igniteChangeMutex) {
+ LOG.info("Setting Ignite ref to null as restart is initiated
[name={}]", nodeName);
this.ignite = null;
}
this.joinFuture = null;
@@ -290,14 +303,19 @@ public class IgniteServerImpl implements IgniteServer {
@Override
public CompletableFuture<Void> shutdownAsync() {
- shutDown = true;
-
// We don't use attachmentLock here so that users see
NodeStoppingException immediately (instead of pausing their operations
// forever which would happen if the lock was used).
CompletableFuture<Void> result;
synchronized (restartOrShutdownMutex) {
+ if (shutDown) {
+ // Someone has already invoked shutdown, so
#restartOrShutdownFuture is about shutdown, let's simply return it.
+ return requireNonNull(restartOrShutdownFuture);
+ }
+
+ shutDown = true;
+
result = chainRestartOrShutdownAction(this::doShutdownAsync);
}
@@ -324,6 +342,7 @@ public class IgniteServerImpl implements IgniteServer {
try {
return instance.stopAsync().thenRun(() -> {
synchronized (igniteChangeMutex) {
+ LOG.info("Setting Ignite ref to null as shutdown is
initiated [name={}]", nodeName);
ignite = null;
}
joinFuture = null;
@@ -378,9 +397,13 @@ public class IgniteServerImpl implements IgniteServer {
synchronized (igniteChangeMutex) {
if (shutDown) {
+ LOG.info("A new Ignite instance has started, but a
shutdown is requested, so not setting it, stopping it instead "
+ + "[name={}]", nodeName);
+
return instance.stopAsync();
}
+ LOG.info("Setting Ignite ref to new instance as it has started
[name={}]", nodeName);
ignite = instance;
}
@@ -465,12 +488,12 @@ public class IgniteServerImpl implements IgniteServer {
}
/**
- * Returns future that gets completed when restart or shutdown is complete.
+ * Returns future that gets completed when restart is complete ({@code
null} if no restart was attempted.
*/
@TestOnly
- public @Nullable CompletableFuture<Void> restartOrShutdownFuture() {
+ public @Nullable CompletableFuture<Void> restartFuture() {
synchronized (restartOrShutdownMutex) {
- return restartOrShutdownFuture;
+ return restartFuture;
}
}
}
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index 881185456d..37cd0dc4fb 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -511,10 +511,18 @@ public class Cluster {
List<IgniteServer> serversToStop = new ArrayList<>(igniteServers);
+ List<String> serverNames = serversToStop.stream()
+ .filter(Objects::nonNull)
+ .map(IgniteServer::name)
+ .collect(toList());
+ LOG.info("Shutting the cluster down [nodes={}]", serverNames);
+
Collections.fill(igniteServers, null);
Collections.fill(nodes, null);
serversToStop.parallelStream().filter(Objects::nonNull).forEach(IgniteServer::shutdown);
+
+ LOG.info("Shut the cluster down");
}
/**
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
index 0bbdeb2481..e989ded57e 100644
---
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
@@ -145,7 +145,7 @@ class ItCmgDisasterRecoveryTest extends
ItSystemGroupDisasterRecoveryTest {
}
@Test
- void nodesThatSawNoReparationHaveSeparatePhysicalTopologies() throws
Exception {
+ void nodesThatSawNoRepairHaveSeparatePhysicalTopologies() throws Exception
{
startAndInitCluster(2, new int[]{0}, new int[]{1});
waitTillClusterStateIsSavedToVaultOnConductor(1);
@@ -167,7 +167,7 @@ class ItCmgDisasterRecoveryTest extends
ItSystemGroupDisasterRecoveryTest {
}
@Test
- void migratesNodesThatSawNoReparationToNewCluster() throws Exception {
+ void migratesNodesThatSawNoRepairToNewCluster() throws Exception {
startAndInitCluster(2, new int[]{0}, new int[]{1});
waitTillClusterStateIsSavedToVaultOnConductor(1);
@@ -178,7 +178,7 @@ class ItCmgDisasterRecoveryTest extends
ItSystemGroupDisasterRecoveryTest {
}
@Test
- void migratesManyNodesThatSawNoReparationToNewCluster() throws Exception {
+ void migratesManyNodesThatSawNoRepairToNewCluster() throws Exception {
startAndInitCluster(5, new int[]{0, 1, 2}, new int[]{2, 3, 4});
waitTillClusterStateIsSavedToVaultOnConductor(2);
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
index d36f122f29..3b965544c4 100644
---
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
@@ -224,7 +225,7 @@ class ItMetastorageGroupDisasterRecoveryTest extends
ItSystemGroupDisasterRecove
((MetaStorageManagerImpl)
restartedIgniteImpl0.metaStorageManager()).disableLearnersAddition();
initiateMigration(1, 0);
- CompletableFuture<Void> ignite1RestartFuture =
waitForRestartOrShutdownFuture(1);
+ CompletableFuture<Void> ignite1RestartFuture = waitForRestartFuture(1);
// It should not be able to start: it should abstain from becoming a
leader and node 1 (the new leader) does not add it as
// a learner.
@@ -333,7 +334,7 @@ class ItMetastorageGroupDisasterRecoveryTest extends
ItSystemGroupDisasterRecove
// Starting the node that did not see the repair.
initiateMigration(1, 0);
- assertThat(waitForRestartOrShutdownFuture(1),
willCompleteSuccessfully());
+ assertThat(waitForRestartFuture(1),
willThrowWithCauseOrSuppressed(MetastorageDivergedException.class, "Metastorage
has diverged"));
// Attempt to migrate should fail.
assertThrowsWithCause(() -> cluster.server(1).api(),
MetastorageDivergedException.class, "Metastorage has diverged");
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
index 481f1c58f1..9b6694487b 100644
---
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
@@ -74,28 +74,28 @@ abstract class ItSystemGroupDisasterRecoveryTest extends
ClusterPerTestIntegrati
}
final IgniteImpl waitTillNodeRestartsInternally(int nodeIndex) throws
InterruptedException {
- // restartOrShutdownFuture() becomes non-null when restart or shutdown
is initiated; we know it's restart.
+ // restartFuture() becomes non-null when restart is initiated.
- assertThat(waitForRestartOrShutdownFuture(nodeIndex),
willCompleteSuccessfully());
+ assertThat(waitForRestartFuture(nodeIndex),
willCompleteSuccessfully());
return unwrapIgniteImpl(cluster.server(nodeIndex).api());
}
- final CompletableFuture<Void> waitForRestartOrShutdownFuture(int
nodeIndex) throws InterruptedException {
+ final CompletableFuture<Void> waitForRestartFuture(int nodeIndex) throws
InterruptedException {
assertTrue(
- waitForCondition(() -> restartOrShutdownFuture(nodeIndex) !=
null, SECONDS.toMillis(20)),
- "Node did not attempt to be restarted (or shut down) in time"
+ waitForCondition(() -> restartFuture(nodeIndex) != null,
SECONDS.toMillis(20)),
+ "Node did not attempt to be restarted in time"
);
- CompletableFuture<Void> future = restartOrShutdownFuture(nodeIndex);
+ CompletableFuture<Void> future = restartFuture(nodeIndex);
assertNotNull(future);
return future;
}
@Nullable
- private CompletableFuture<Void> restartOrShutdownFuture(int nodeIndex) {
- return ((IgniteServerImpl)
cluster.server(nodeIndex)).restartOrShutdownFuture();
+ private CompletableFuture<Void> restartFuture(int nodeIndex) {
+ return ((IgniteServerImpl) cluster.server(nodeIndex)).restartFuture();
}
static ClusterState clusterState(IgniteImpl ignite) throws
InterruptedException, ExecutionException, TimeoutException {