This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8ae77fd75c IGNITE-23107 Add REST API for migrating running nodes
missed during CMG repair (#4448)
8ae77fd75c is described below
commit 8ae77fd75c74bf4a269954837d92fd500639cd80
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Sep 25 18:56:03 2024 +0400
IGNITE-23107 Add REST API for migrating running nodes missed during CMG
repair (#4448)
---
.../rest/api/recovery/system/MigrateRequest.java | 118 +++++++++++++++++++++
.../recovery/system/SystemDisasterRecoveryApi.java | 18 +++-
.../handler/ClusterResetExceptionHandler.java | 3 +-
...onHandler.java => MigrateExceptionHandler.java} | 11 +-
.../system/SystemDisasterRecoveryController.java | 33 +++++-
.../SystemDisasterRecoveryControllerTest.java | 64 ++++++++++-
6 files changed, 235 insertions(+), 12 deletions(-)
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/system/MigrateRequest.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/system/MigrateRequest.java
new file mode 100644
index 0000000000..01fe2d3205
--- /dev/null
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/system/MigrateRequest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery.system;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
+
+/** Request to migrate nodes from old cluster to new (repaired). */
+@Schema(description = "Migrate nodes to new cluster.")
+public class MigrateRequest {
+ @Schema(description = "Names of the CMG node names.")
+ @IgniteToStringInclude
+ private final List<String> cmgNodes;
+
+ @Schema(description = "Names of the Metastorage node names.")
+ @IgniteToStringInclude
+ private final List<String> metaStorageNodes;
+
+ @Schema(description = "Ignite version.")
+ private final String version;
+
+ @Schema(description = "ID of the cluster.")
+ private final UUID clusterId;
+
+ @Schema(description = "Name of the cluster.")
+ private final String clusterName;
+
+ @Schema(description = "IDs the cluster had before. If CMG/Metastorage
group were never repaired, this is null.")
+ private final @Nullable List<UUID> formerClusterIds;
+
+ /** Constructor. */
+ @JsonCreator
+ public MigrateRequest(
+ @JsonProperty("cmgNodes") List<String> cmgNodes,
+ @JsonProperty("metaStorageNodes") List<String> metaStorageNodes,
+ @JsonProperty("version") String version,
+ @JsonProperty("clusterId") UUID clusterId,
+ @JsonProperty("clusterName") String clusterName,
+ @JsonProperty("formerClusterIds") @Nullable List<UUID>
formerClusterIds
+ ) {
+ Objects.requireNonNull(cmgNodes);
+ Objects.requireNonNull(metaStorageNodes);
+ Objects.requireNonNull(version);
+ Objects.requireNonNull(clusterId);
+ Objects.requireNonNull(clusterName);
+
+ this.cmgNodes = List.copyOf(cmgNodes);
+ this.metaStorageNodes = List.copyOf(metaStorageNodes);
+ this.version = version;
+ this.clusterId = clusterId;
+ this.clusterName = clusterName;
+ this.formerClusterIds = formerClusterIds == null ? null :
List.copyOf(formerClusterIds);
+ }
+
+ /** Returns names of the CMG node names. */
+ @JsonGetter("cmgNodes")
+ public List<String> cmgNodes() {
+ return cmgNodes;
+ }
+
+ /** Returns names of the Metastorage node names. */
+ @JsonGetter("metaStorageNodes")
+ public List<String> metaStorageNodes() {
+ return metaStorageNodes;
+ }
+
+ /** Returns Ignite version. */
+ @JsonGetter("version")
+ public String version() {
+ return version;
+ }
+
+ /** Returns ID of the cluster. */
+ @JsonGetter("clusterId")
+ public UUID clusterId() {
+ return clusterId;
+ }
+
+ /** Returns name of the cluster. */
+ @JsonGetter("clusterName")
+ public String clusterName() {
+ return clusterName;
+ }
+
+ /** Returns IDs the cluster had before ({@code null} if CMG/Metastorage
group were never repaired. */
+ @JsonGetter("formerClusterIds")
+ public @Nullable List<UUID> formerClusterIds() {
+ return formerClusterIds;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/system/SystemDisasterRecoveryApi.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/system/SystemDisasterRecoveryApi.java
index 6c0081f858..f67ce6e1aa 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/system/SystemDisasterRecoveryApi.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/system/SystemDisasterRecoveryApi.java
@@ -46,11 +46,27 @@ public interface SystemDisasterRecoveryApi {
description = "Initiates cluster reset to repair CMG/Metastorage
group/both."
)
@ApiResponse(responseCode = "200", description = "Cluster reset
initiated.")
+ // TODO: IGNITE-23274 - introduce a constant for common error descriptions.
@ApiResponse(responseCode = "500", description = "Internal error.",
content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
@ApiResponse(responseCode = "400", description = "Bad request.",
content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
@Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.PROBLEM_JSON)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.PROBLEM_JSON})
CompletableFuture<Void> reset(@Body ResetClusterRequest command);
+
+ @Post("migrate")
+ @Operation(
+ operationId = "migrate",
+ description = "Migrates nodes from old cluster to new (repaired)
cluster."
+ )
+ @ApiResponse(responseCode = "200", description = "Migration initiated.")
+ // TODO: IGNITE-23274 - introduce a constant for common error descriptions.
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @ApiResponse(responseCode = "400", description = "Bad request.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.PROBLEM_JSON})
+ CompletableFuture<Void> migrate(@Body MigrateRequest command);
}
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/ClusterResetExceptionHandler.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/ClusterResetExceptionHandler.java
index 4e360f22dd..cddeec1db1 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/ClusterResetExceptionHandler.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/ClusterResetExceptionHandler.java
@@ -23,7 +23,6 @@ import io.micronaut.http.HttpResponse;
import io.micronaut.http.server.exceptions.ExceptionHandler;
import jakarta.inject.Singleton;
import
org.apache.ignite.internal.disaster.system.exception.ClusterResetException;
-import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.rest.api.Problem;
import org.apache.ignite.internal.rest.constants.HttpCode;
import org.apache.ignite.internal.rest.problem.HttpProblemResponse;
@@ -32,7 +31,7 @@ import
org.apache.ignite.internal.rest.problem.HttpProblemResponse;
* Handles {@link ClusterResetException} and represents it as a rest response.
*/
@Singleton
-@Requires(classes = {IgniteInternalException.class, ExceptionHandler.class})
+@Requires(classes = ExceptionHandler.class)
public class ClusterResetExceptionHandler implements
ExceptionHandler<ClusterResetException, HttpResponse<? extends Problem>> {
@Override
public HttpResponse<? extends Problem> handle(HttpRequest request,
ClusterResetException exception) {
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/ClusterResetExceptionHandler.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/MigrateExceptionHandler.java
similarity index 77%
copy from
modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/ClusterResetExceptionHandler.java
copy to
modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/MigrateExceptionHandler.java
index 4e360f22dd..b2e9b43184 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/ClusterResetExceptionHandler.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/MigrateExceptionHandler.java
@@ -22,20 +22,19 @@ import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.server.exceptions.ExceptionHandler;
import jakarta.inject.Singleton;
-import
org.apache.ignite.internal.disaster.system.exception.ClusterResetException;
-import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.disaster.system.exception.MigrateException;
import org.apache.ignite.internal.rest.api.Problem;
import org.apache.ignite.internal.rest.constants.HttpCode;
import org.apache.ignite.internal.rest.problem.HttpProblemResponse;
/**
- * Handles {@link ClusterResetException} and represents it as a rest response.
+ * Handles {@link MigrateException} and represents it as a rest response.
*/
@Singleton
-@Requires(classes = {IgniteInternalException.class, ExceptionHandler.class})
-public class ClusterResetExceptionHandler implements
ExceptionHandler<ClusterResetException, HttpResponse<? extends Problem>> {
+@Requires(classes = ExceptionHandler.class)
+public class MigrateExceptionHandler implements
ExceptionHandler<MigrateException, HttpResponse<? extends Problem>> {
@Override
- public HttpResponse<? extends Problem> handle(HttpRequest request,
ClusterResetException exception) {
+ public HttpResponse<? extends Problem> handle(HttpRequest request,
MigrateException exception) {
return HttpProblemResponse.from(
Problem.fromHttpCode(HttpCode.BAD_REQUEST)
.detail(exception.getMessage())
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryController.java
index c23f030d2e..d6d8c59ed1 100644
---
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryController.java
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryController.java
@@ -19,22 +19,36 @@ package org.apache.ignite.internal.rest.recovery.system;
import io.micronaut.context.annotation.Requires;
import io.micronaut.http.annotation.Controller;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.internal.cluster.management.ClusterTag;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryManager;
import org.apache.ignite.internal.rest.ResourceHolder;
+import org.apache.ignite.internal.rest.api.recovery.system.MigrateRequest;
import org.apache.ignite.internal.rest.api.recovery.system.ResetClusterRequest;
import
org.apache.ignite.internal.rest.api.recovery.system.SystemDisasterRecoveryApi;
import
org.apache.ignite.internal.rest.exception.handler.ClusterResetExceptionHandler;
import
org.apache.ignite.internal.rest.exception.handler.IgniteInternalExceptionHandler;
+import
org.apache.ignite.internal.rest.exception.handler.MigrateExceptionHandler;
/**
* Controller for system groups disaster recovery.
*/
@Controller("/management/v1/recovery/cluster")
-@Requires(classes = {IgniteInternalExceptionHandler.class,
ClusterResetExceptionHandler.class})
+@Requires(classes = {
+ ClusterResetExceptionHandler.class,
+ MigrateExceptionHandler.class,
+ IgniteInternalExceptionHandler.class
+})
public class SystemDisasterRecoveryController implements
SystemDisasterRecoveryApi, ResourceHolder {
private SystemDisasterRecoveryManager systemDisasterRecoveryManager;
+ private final CmgMessagesFactory cmgMessagesFactory = new
CmgMessagesFactory();
+
public SystemDisasterRecoveryController(SystemDisasterRecoveryManager
systemDisasterRecoveryManager) {
this.systemDisasterRecoveryManager = systemDisasterRecoveryManager;
}
@@ -44,6 +58,23 @@ public class SystemDisasterRecoveryController implements
SystemDisasterRecoveryA
return
systemDisasterRecoveryManager.resetCluster(command.cmgNodeNames());
}
+ @Override
+ public CompletableFuture<Void> migrate(MigrateRequest command) {
+ return
systemDisasterRecoveryManager.migrate(migrateRequestToClusterState(command));
+ }
+
+ private ClusterState migrateRequestToClusterState(MigrateRequest command) {
+ List<UUID> formerClusterIds = command.formerClusterIds();
+
+ return cmgMessagesFactory.clusterState()
+ .cmgNodes(Set.copyOf(command.cmgNodes()))
+ .metaStorageNodes(Set.copyOf(command.metaStorageNodes()))
+ .version(command.version())
+ .clusterTag(ClusterTag.clusterTag(cmgMessagesFactory,
command.clusterName(), command.clusterId()))
+ .formerClusterIds(formerClusterIds == null ? null :
List.copyOf(formerClusterIds))
+ .build();
+ }
+
@Override
public void cleanResources() {
systemDisasterRecoveryManager = null;
diff --git
a/modules/rest/src/test/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryControllerTest.java
b/modules/rest/src/test/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryControllerTest.java
index 9249034347..c444d55153 100644
---
a/modules/rest/src/test/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryControllerTest.java
+++
b/modules/rest/src/test/java/org/apache/ignite/internal/rest/recovery/system/SystemDisasterRecoveryControllerTest.java
@@ -23,7 +23,10 @@ import static
org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
import static org.apache.ignite.internal.rest.constants.HttpCode.OK;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -42,13 +45,17 @@ import
io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import java.util.List;
import java.util.Optional;
+import java.util.UUID;
+import org.apache.ignite.internal.cluster.management.ClusterState;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryManager;
import
org.apache.ignite.internal.disaster.system.exception.ClusterResetException;
+import org.apache.ignite.internal.disaster.system.exception.MigrateException;
import org.apache.ignite.internal.rest.RestManager;
import org.apache.ignite.internal.rest.RestManagerFactory;
import org.apache.ignite.internal.rest.api.Problem;
+import org.apache.ignite.internal.rest.api.recovery.system.MigrateRequest;
import org.apache.ignite.internal.rest.api.recovery.system.ResetClusterRequest;
import
org.apache.ignite.internal.security.authentication.AuthenticationManager;
import
org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
@@ -57,6 +64,7 @@ import
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@MicronautTest
@@ -100,7 +108,8 @@ class SystemDisasterRecoveryControllerTest extends
BaseIgniteAbstractTest {
when(systemDisasterRecoveryManager.resetCluster(List.of("a", "b",
"c"))).thenReturn(nullCompletedFuture());
HttpRequest<ResetClusterRequest> post = HttpRequest.POST("/reset",
- new ResetClusterRequest(List.of("a", "b", "c")));
+ new ResetClusterRequest(List.of("a", "b", "c"))
+ );
HttpResponse<Void> response = client.toBlocking().exchange(post);
@@ -113,7 +122,58 @@ class SystemDisasterRecoveryControllerTest extends
BaseIgniteAbstractTest {
when(systemDisasterRecoveryManager.resetCluster(any())).thenReturn(failedFuture(new
ClusterResetException("Oops")));
HttpRequest<ResetClusterRequest> post = HttpRequest.POST("/reset",
- new ResetClusterRequest(List.of("a", "b", "c")));
+ new ResetClusterRequest(List.of("a", "b", "c"))
+ );
+
+ HttpClientResponseException ex =
assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().exchange(post));
+
+ assertThat(ex.getStatus().getCode(), is(BAD_REQUEST.code()));
+
+ Optional<Problem> body = ex.getResponse().getBody(Problem.class);
+ assertThat(body, isPresent());
+ assertThat(body.get().detail(), is("Oops"));
+ }
+
+ @Test
+ void initiatesMigration() {
+ ArgumentCaptor<ClusterState> stateCaptor =
ArgumentCaptor.forClass(ClusterState.class);
+
+
when(systemDisasterRecoveryManager.migrate(any())).thenReturn(nullCompletedFuture());
+
+ HttpRequest<MigrateRequest> post = HttpRequest.POST("/migrate",
migrateRequest());
+
+ HttpResponse<Void> response = client.toBlocking().exchange(post);
+
+ assertThat(response.getStatus().getCode(), is(OK.code()));
+
+ verify(systemDisasterRecoveryManager).migrate(stateCaptor.capture());
+ ClusterState capturedState = stateCaptor.getValue();
+
+ assertThat(capturedState, is(notNullValue()));
+ assertThat(capturedState.cmgNodes(), containsInAnyOrder("a", "b",
"c"));
+ assertThat(capturedState.metaStorageNodes(), contains("d"));
+ assertThat(capturedState.version(), is("3.0.0"));
+ assertThat(capturedState.clusterTag().clusterId(), is(new UUID(1, 2)));
+ assertThat(capturedState.clusterTag().clusterName(), is("cluster"));
+ assertThat(capturedState.formerClusterIds(), is(List.of(new UUID(2,
1))));
+ }
+
+ private static MigrateRequest migrateRequest() {
+ return new MigrateRequest(
+ List.of("a", "b", "c"),
+ List.of("d"),
+ "3.0.0",
+ new UUID(1, 2),
+ "cluster",
+ List.of(new UUID(2, 1))
+ );
+ }
+
+ @Test
+ void migratePassesMigrateExceptionToClient() {
+
when(systemDisasterRecoveryManager.migrate(any())).thenReturn(failedFuture(new
MigrateException("Oops")));
+
+ HttpRequest<MigrateRequest> post = HttpRequest.POST("/migrate",
migrateRequest());
HttpClientResponseException ex =
assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().exchange(post));