This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cc53fb617f IGNITE-22896 REST API for initiating MG repair (#4547)
cc53fb617f is described below
commit cc53fb617f729c78629a0bd76e6fb05f8db9e6cd
Author: Phillippko <[email protected]>
AuthorDate: Tue Oct 15 23:45:34 2024 +0900
IGNITE-22896 REST API for initiating MG repair (#4547)
---
.../api/recovery/system/ResetClusterRequest.java | 33 ++++++++++++----
.../system/SystemDisasterRecoveryController.java | 9 ++++-
.../SystemDisasterRecoveryControllerTest.java | 39 ++++++++++++++++++-
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
.../system/SystemDisasterRecoveryManager.java | 5 ++-
.../system/SystemDisasterRecoveryManagerImpl.java | 44 ++++++++++++++++------
.../SystemDisasterRecoveryManagerImplTest.java | 29 ++++++++++++++
7 files changed, 136 insertions(+), 24 deletions(-)
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/system/ResetClusterRequest.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/system/ResetClusterRequest.java
index 978f5c3da2..a6172e7bbd 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/system/ResetClusterRequest.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/system/ResetClusterRequest.java
@@ -22,33 +22,52 @@ import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.List;
-import java.util.Objects;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
/** Request to reset cluster. */
@Schema(description = "Reset cluster.")
public class ResetClusterRequest {
- @Schema(description = "Names of the proposed CMG nodes.")
+ @Schema(description = "Names of the proposed CMG nodes. Optional if
Metastorage replication factor is specified, then "
+ + "current CMG nodes will be used.")
@IgniteToStringInclude
- private final List<String> cmgNodeNames;
+ private final @Nullable List<String> cmgNodeNames;
+
+ @Schema(description = "Number of nodes in the Raft voting member set for
Metastorage.")
+ @IgniteToStringInclude
+ private final @Nullable Integer metastorageReplicationFactor;
/** Constructor. */
@JsonCreator
- public ResetClusterRequest(@JsonProperty("cmgNodeNames") List<String>
cmgNodeNames) {
- Objects.requireNonNull(cmgNodeNames);
+ public ResetClusterRequest(
+ @JsonProperty("cmgNodeNames") @Nullable List<String> cmgNodeNames,
+ @JsonProperty("metastorageReplicationFactor") @Nullable Integer
metastorageReplicationFactor
+ ) {
+ this.cmgNodeNames = cmgNodeNames == null ? null :
List.copyOf(cmgNodeNames);
- this.cmgNodeNames = List.copyOf(cmgNodeNames);
+ this.metastorageReplicationFactor = metastorageReplicationFactor;
}
/** Returns names of the proposed CMG nodes. */
@JsonGetter("cmgNodeNames")
- public List<String> cmgNodeNames() {
+ public @Nullable List<String> cmgNodeNames() {
return cmgNodeNames;
}
+ /** Returns number of nodes in the Raft voting member set for Metastorage.
*/
+ @JsonGetter("metastorageReplicationFactor")
+ public @Nullable Integer metastorageReplicationFactor() {
+ return metastorageReplicationFactor;
+ }
+
@Override
public String toString() {
return S.toString(this);
}
+
+ /** If Metastorage repair should be done along with cluster reset. */
+ public boolean metastorageRepairRequested() {
+ return metastorageReplicationFactor != null;
+ }
}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryController.java
index c982e7ec75..6d37ce4a21 100644
---
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryController.java
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryController.java
@@ -61,7 +61,14 @@ public class SystemDisasterRecoveryController implements
SystemDisasterRecoveryA
public CompletableFuture<Void> reset(ResetClusterRequest command) {
LOG.info("Reset command is {}", command);
- return
systemDisasterRecoveryManager.resetCluster(command.cmgNodeNames());
+ if (command.metastorageRepairRequested()) {
+ return
systemDisasterRecoveryManager.resetClusterRepairingMetastorage(
+ command.cmgNodeNames(),
+ command.metastorageReplicationFactor()
+ );
+ } else {
+ return
systemDisasterRecoveryManager.resetCluster(command.cmgNodeNames());
+ }
}
@Override
diff --git
a/modules/rest/src/test/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryControllerTest.java
b/modules/rest/src/test/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryControllerTest.java
index dcdb5f5a70..5b81f5f9e5 100644
---
a/modules/rest/src/test/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryControllerTest.java
+++
b/modules/rest/src/test/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryControllerTest.java
@@ -85,7 +85,7 @@ class SystemDisasterRecoveryControllerTest extends
BaseIgniteAbstractTest {
when(systemDisasterRecoveryManager.resetCluster(List.of("a", "b",
"c"))).thenReturn(nullCompletedFuture());
HttpRequest<ResetClusterRequest> post = HttpRequest.POST("/reset",
- new ResetClusterRequest(List.of("a", "b", "c"))
+ new ResetClusterRequest(List.of("a", "b", "c"), null)
);
HttpResponse<Void> response = client.toBlocking().exchange(post);
@@ -94,12 +94,47 @@ class SystemDisasterRecoveryControllerTest extends
BaseIgniteAbstractTest {
verify(systemDisasterRecoveryManager).resetCluster(List.of("a", "b",
"c"));
}
+ @Test
+ void initiatesCmgRepairRepairingMetastorageWithCmgNodeNamesSpecified() {
+ int replicationFactor = 1;
+ List<String> cmgNodeNames = List.of("a", "b", "c");
+
+
when(systemDisasterRecoveryManager.resetClusterRepairingMetastorage(cmgNodeNames,
replicationFactor))
+ .thenReturn(nullCompletedFuture());
+
+ HttpRequest<ResetClusterRequest> post = HttpRequest.POST("/reset",
+ new ResetClusterRequest(cmgNodeNames, replicationFactor)
+ );
+
+ HttpResponse<Void> response = client.toBlocking().exchange(post);
+
+ assertThat(response.getStatus().getCode(), is(OK.code()));
+
verify(systemDisasterRecoveryManager).resetClusterRepairingMetastorage(cmgNodeNames,
replicationFactor);
+ }
+
+ @Test
+ void initiatesCmgRepairRepairingMetastorageWithCmgNodeNamesNotSpecified() {
+ int replicationFactor = 1;
+
+
when(systemDisasterRecoveryManager.resetClusterRepairingMetastorage(null,
replicationFactor))
+ .thenReturn(nullCompletedFuture());
+
+ HttpRequest<ResetClusterRequest> post = HttpRequest.POST("/reset",
+ new ResetClusterRequest(null, replicationFactor)
+ );
+
+ HttpResponse<Void> response = client.toBlocking().exchange(post);
+
+ assertThat(response.getStatus().getCode(), is(OK.code()));
+
verify(systemDisasterRecoveryManager).resetClusterRepairingMetastorage(null,
replicationFactor);
+ }
+
@Test
void resetClusterPassesClusterResetExceptionToClient() {
when(systemDisasterRecoveryManager.resetCluster(any())).thenReturn(failedFuture(new
ClusterResetException("Oops")));
HttpRequest<ResetClusterRequest> post = HttpRequest.POST("/reset",
- new ResetClusterRequest(List.of("a", "b", "c"))
+ new ResetClusterRequest(List.of("a", "b", "c"), null)
);
HttpClientResponseException ex =
assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().exchange(post));
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1cb06bc363..81833723ed 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -712,6 +712,7 @@ public class IgniteImpl implements Ignite {
vaultMgr,
restarter,
metaStorageMgr,
+ cmgMgr,
clusterIdService
);
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
index 18dcb47b75..c99eb3308b 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
@@ -23,6 +23,7 @@ import
org.apache.ignite.internal.cluster.management.ClusterState;
import
org.apache.ignite.internal.disaster.system.exception.ClusterResetException;
import org.apache.ignite.internal.disaster.system.exception.MigrateException;
import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import org.jetbrains.annotations.Nullable;
/**
* Manages disaster recovery of system groups, namely the Cluster Management
Group (CMG) and the Metastorage group (MG).
@@ -52,11 +53,11 @@ public interface SystemDisasterRecoveryManager {
/**
* Initiates cluster reset. CMG will be reset and Metastorage will be
repaired.
*
- * @param proposedCmgNodeNames Names of the nodes that will be the new CMG
nodes.
+ * @param proposedCmgNodeNames Names of the nodes that will be the new CMG
nodes. If not specified, current CMG nodes will be used.
* @param metastorageReplicationFactor Number of nodes in the Raft voting
member set for Metastorage.
* @return Future completing with the result of the operation ({@link
ClusterResetException} in case of error related to reset logic).
*/
- CompletableFuture<Void> resetClusterRepairingMetastorage(List<String>
proposedCmgNodeNames, int metastorageReplicationFactor);
+ CompletableFuture<Void> resetClusterRepairingMetastorage(@Nullable
List<String> proposedCmgNodeNames, int metastorageReplicationFactor);
/**
* Migrates nodes missed during CMG repair to the new cluster (which is
the result of the repair). To do so, sends the
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
index 90401cc5e8..a4f9fcf78c 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.disaster.system;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
@@ -37,6 +38,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.ClusterState;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
@@ -84,6 +86,8 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
/** This executor spawns a thread per task and should only be used for
very rare tasks. */
private final Executor restartExecutor;
+ private final ClusterManagementGroupManager cmgManager;
+
/** Constructor. */
public SystemDisasterRecoveryManagerImpl(
String thisNodeName,
@@ -92,6 +96,7 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
VaultManager vaultManager,
ServerRestarter restarter,
MetastorageGroupMaintenance metastorageGroupMaintenance,
+ ClusterManagementGroupManager cmgManager,
ClusterIdSupplier clusterIdSupplier
) {
this.thisNodeName = thisNodeName;
@@ -99,6 +104,7 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
this.messagingService = messagingService;
this.restarter = restarter;
this.metastorageGroupMaintenance = metastorageGroupMaintenance;
+ this.cmgManager = cmgManager;
this.clusterIdSupplier = clusterIdSupplier;
storage = new SystemDisasterRecoveryStorage(vaultManager);
@@ -195,19 +201,24 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
@Override
public CompletableFuture<Void> resetCluster(List<String>
proposedCmgNodeNames) {
+ if (proposedCmgNodeNames == null) {
+ return failedFuture(new ClusterResetException("Proposed CMG node
names can't be null."));
+ }
+
return resetClusterInternal(proposedCmgNodeNames, null);
}
@Override
public CompletableFuture<Void> resetClusterRepairingMetastorage(
- List<String> proposedCmgNodeNames,
+ @Nullable List<String> proposedCmgNodeNames,
int metastorageReplicationFactor
) {
- return resetClusterInternal(proposedCmgNodeNames,
metastorageReplicationFactor);
+ return proposedCmgNodeNamesOrCurrentIfNull(proposedCmgNodeNames)
+ .thenCompose(cmgNodeNames ->
resetClusterInternal(cmgNodeNames, metastorageReplicationFactor));
}
private CompletableFuture<Void> resetClusterInternal(
- List<String> proposedCmgNodeNames,
+ Collection<String> proposedCmgNodeNames,
@Nullable Integer metastorageReplicationFactor
) {
try {
@@ -217,19 +228,22 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
}
}
- private CompletableFuture<Void> doResetCluster(List<String>
proposedCmgNodeNames, @Nullable Integer metastorageReplicationFactor) {
- ensureReplicationFactorIsPositiveIfGiven(metastorageReplicationFactor);
+ private CompletableFuture<Void> doResetCluster(
+ Collection<String> proposedCmgNodeNames,
+ @Nullable Integer metastorageReplicationFactor
+ ) {
+ Collection<ClusterNode> nodesInTopology = topologyService.allMembers();
ensureNoRepetitions(proposedCmgNodeNames);
ensureContainsThisNodeName(proposedCmgNodeNames);
- Collection<ClusterNode> nodesInTopology = topologyService.allMembers();
ensureAllProposedCmgNodesAreInTopology(proposedCmgNodeNames,
nodesInTopology);
+
+ ensureReplicationFactorIsPositiveIfGiven(metastorageReplicationFactor);
ensureReplicationFactorFitsTopologyIfGiven(metastorageReplicationFactor,
nodesInTopology);
ensureInitConfigApplied();
ClusterState clusterState = ensureClusterStateIsPresent();
-
ResetClusterMessage message = buildResetClusterMessageForReset(
proposedCmgNodeNames,
clusterState,
@@ -256,19 +270,25 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
}, restartExecutor);
}
+ private CompletableFuture<Collection<String>>
proposedCmgNodeNamesOrCurrentIfNull(@Nullable List<String>
proposedCmgNodeNames) {
+ return proposedCmgNodeNames != null
+ ? completedFuture(proposedCmgNodeNames)
+ : cmgManager.clusterState().thenApply(ClusterState::cmgNodes);
+ }
+
private static void ensureReplicationFactorIsPositiveIfGiven(@Nullable
Integer metastorageReplicationFactor) {
if (metastorageReplicationFactor != null &&
metastorageReplicationFactor <= 0) {
throw new ClusterResetException("Metastorage replication factor
must be positive.");
}
}
- private static void ensureNoRepetitions(List<String> proposedCmgNodeNames)
{
+ private static void ensureNoRepetitions(Collection<String>
proposedCmgNodeNames) {
if (new HashSet<>(proposedCmgNodeNames).size() !=
proposedCmgNodeNames.size()) {
throw new ClusterResetException("New CMG node names have
repetitions: " + proposedCmgNodeNames + ".");
}
}
- private void ensureContainsThisNodeName(List<String> proposedCmgNodeNames)
{
+ private void ensureContainsThisNodeName(Collection<String>
proposedCmgNodeNames) {
if (!proposedCmgNodeNames.contains(thisNodeName)) {
throw new ClusterResetException("Current node is not contained in
the new CMG, so it cannot conduct a cluster reset.");
}
@@ -281,7 +301,7 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
}
private static void ensureAllProposedCmgNodesAreInTopology(
- List<String> proposedCmgNodeNames,
+ Collection<String> proposedCmgNodeNames,
Collection<ClusterNode> nodesInTopology
) {
Set<String> namesOfNodesInTopology =
nodesInTopology.stream().map(ClusterNode::name).collect(toSet());
@@ -358,7 +378,7 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
private static boolean enoughResponsesAreSuccesses(
boolean repairMg,
- List<String> proposedCmgNodeNames,
+ Collection<String> proposedCmgNodeNames,
Map<String, CompletableFuture<NetworkMessage>> responseFutures
) {
if (repairMg) {
@@ -369,7 +389,7 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
}
private static boolean isMajorityOfCmgAreSuccesses(
- List<String> proposedCmgNodeNames,
+ Collection<String> proposedCmgNodeNames,
Map<String, CompletableFuture<NetworkMessage>> responseFutures
) {
Set<String> newCmgNodesSet = new HashSet<>(proposedCmgNodeNames);
diff --git
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index bb6689a317..98719f97b5 100644
---
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -46,6 +46,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
@@ -60,6 +61,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.ClusterState;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
@@ -173,6 +175,10 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
lenient().when(messagingService.respond(any(ClusterNode.class),
any(NetworkMessage.class), anyLong()))
.thenReturn(nullCompletedFuture());
+ ClusterManagementGroupManager cmgManager =
mock(ClusterManagementGroupManager.class);
+
+
lenient().when(cmgManager.clusterState()).thenReturn(completedFuture(usualClusterState));
+
manager = new SystemDisasterRecoveryManagerImpl(
thisNodeName,
topologyService,
@@ -180,6 +186,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
vaultManager,
restarter,
metastorageMaintenance,
+ cmgManager,
new ConstantClusterIdSupplier(clusterId)
);
assertThat(manager.startAsync(componentContext),
willCompleteSuccessfully());
@@ -237,6 +244,26 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
assertThat(ex.getMessage(), is("Some of proposed CMG nodes are not
online: [abc]."));
}
+ @Test
+ void resetClusterRepairingCmgUsesCurrentCmgNodesIfNotSpecified() {
+ int replicationFactor = 1;
+
+ ArgumentCaptor<ResetClusterMessage> messageCaptor =
ArgumentCaptor.forClass(ResetClusterMessage.class);
+
+ when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2,
node3));
+ prepareNodeStateForClusterReset();
+
+ when(messagingService.invoke(any(ClusterNode.class),
any(ResetClusterMessage.class), anyLong()))
+ .thenReturn(completedFuture(successResponseMessage));
+
+ CompletableFuture<Void> future =
manager.resetClusterRepairingMetastorage(null, replicationFactor);
+ assertThat(future, willCompleteSuccessfully());
+
+ verify(messagingService).invoke(eq(thisNode), messageCaptor.capture(),
anyLong());
+
+ assertThat(messageCaptor.getValue().newCmgNodes(),
is(usualClusterState.cmgNodes()));
+ }
+
@ParameterizedTest
@EnumSource(ResetCluster.class)
void resetClusterRequiresClusterState(ResetCluster action) {
@@ -620,6 +647,8 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
@ParameterizedTest
@ValueSource(ints = {0, -1})
void resetClusterWithMgRequiresPositiveMgReplicationFactor(int
metastorageReplicationFactor) {
+ when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2,
node3));
+
ClusterResetException ex = assertWillThrow(
manager.resetClusterRepairingMetastorage(List.of(thisNodeName),
metastorageReplicationFactor),
ClusterResetException.class