This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d776fae524b IGNITE-26864 Use REST API directly instead of CLI (#6863)
d776fae524b is described below
commit d776fae524baa5732b9f52ffc2ac6abd174162a1
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Mon Nov 3 12:49:31 2025 +0300
IGNITE-26864 Use REST API directly instead of CLI (#6863)
---
.../disaster/system/ItCmgDisasterRecoveryTest.java | 5 +-
.../ItMetastorageGroupDisasterRecoveryTest.java | 5 +-
.../system/ItSystemGroupDisasterRecoveryTest.java | 8 +-
.../system/SystemDisasterRecoveryClient.java | 141 +++++++--------------
4 files changed, 53 insertions(+), 106 deletions(-)
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
index 307e9849ab3..443938a7a55 100644
---
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.disaster.system;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryClient.initiateClusterReset;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -82,8 +83,8 @@ class ItCmgDisasterRecoveryTest extends
ItSystemGroupDisasterRecoveryTest {
assertThat(ignite.logicalTopologyService().logicalTopologyOnLeader(),
willCompleteSuccessfully());
}
- private void initiateCmgRepairVia(int conductorIndex, int...
newCmgIndexes) throws InterruptedException {
- recoveryClient.initiateClusterReset("localhost",
cluster.httpPort(conductorIndex), null, nodeNames(newCmgIndexes));
+ private void initiateCmgRepairVia(int conductorIndex, int...
newCmgIndexes) {
+ initiateClusterReset("localhost", cluster.httpPort(conductorIndex),
null, nodeNames(newCmgIndexes));
}
@Test
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
index d6a6762f320..a1de9074859 100644
---
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.disaster.system;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryClient.initiateClusterReset;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
@@ -92,8 +93,8 @@ class ItMetastorageGroupDisasterRecoveryTest extends
ItSystemGroupDisasterRecove
assertThat(ignite.metaStorageManager().get(new ByteArray("abc")),
willCompleteSuccessfully());
}
- private void initiateMgRepairVia(int conductorIndex, int
mgReplicationFactor, int... newCmgIndexes) throws InterruptedException {
- recoveryClient.initiateClusterReset("localhost",
cluster.httpPort(conductorIndex), mgReplicationFactor,
nodeNames(newCmgIndexes));
+ private void initiateMgRepairVia(int conductorIndex, int
mgReplicationFactor, int... newCmgIndexes) {
+ initiateClusterReset("localhost", cluster.httpPort(conductorIndex),
mgReplicationFactor, nodeNames(newCmgIndexes));
}
@Test
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
index 53ee20cbf1e..ed5ed9368c2 100644
---
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
@@ -42,8 +42,6 @@ import org.jetbrains.annotations.Nullable;
* Base for tests of CMG and Metastorage group disaster recovery.
*/
abstract class ItSystemGroupDisasterRecoveryTest extends
ClusterPerTestIntegrationTest {
- final SystemDisasterRecoveryClient recoveryClient = new
SystemDisasterRecoveryClient();
-
@Override
protected int initialNodes() {
return 0;
@@ -123,15 +121,15 @@ abstract class ItSystemGroupDisasterRecoveryTest extends
ClusterPerTestIntegrati
waitTillNodeRestartsInternally(oldClusterNodeIndex);
}
- final void initiateMigration(int oldClusterNodeIndex, int
newClusterNodeIndex) throws Exception {
+ final void initiateMigration(int oldClusterNodeIndex, int
newClusterNodeIndex) {
// Starting the node that did not see the repair.
cluster.startEmbeddedNode(oldClusterNodeIndex);
initiateMigrationToNewCluster(oldClusterNodeIndex,
newClusterNodeIndex);
}
- final void initiateMigrationToNewCluster(int nodeMissingRepairIndex, int
repairedNodeIndex) throws Exception {
- recoveryClient.initiateMigration(
+ final void initiateMigrationToNewCluster(int nodeMissingRepairIndex, int
repairedNodeIndex) {
+ SystemDisasterRecoveryClient.initiateMigration(
"localhost",
cluster.httpPort(nodeMissingRepairIndex),
"localhost",
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryClient.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryClient.java
index 7d0f49ef4de..181b9faa687 100644
---
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryClient.java
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryClient.java
@@ -17,135 +17,82 @@
package org.apache.ignite.internal.disaster.system;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.internal.util.ArrayUtils.concat;
-
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
import java.util.List;
-import java.util.stream.Stream;
-import org.apache.ignite.internal.cli.Main;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.rest.client.api.ClusterManagementApi;
+import org.apache.ignite.rest.client.api.RecoveryApi;
+import org.apache.ignite.rest.client.invoker.ApiClient;
+import org.apache.ignite.rest.client.invoker.ApiException;
+import org.apache.ignite.rest.client.model.ClusterState;
+import org.apache.ignite.rest.client.model.MigrateRequest;
+import org.apache.ignite.rest.client.model.ResetClusterRequest;
import org.jetbrains.annotations.Nullable;
/**
* Used to run system disaster recovery CLI commands.
*/
-@SuppressWarnings("UseOfProcessBuilder")
class SystemDisasterRecoveryClient {
private static final IgniteLogger LOG =
Loggers.forClass(SystemDisasterRecoveryClient.class);
- void initiateClusterReset(
+ static void initiateClusterReset(
String httpHost,
int httpPort,
@Nullable Integer metastorageReplicationFactor,
String... newCmgNodeNames
- ) throws InterruptedException {
+ ) {
LOG.info(
- "Initiating cluster reset via {}:{}, new CMG {}, metastorage
replication Factor {}",
+ "Initiating cluster reset via {}:{}, new CMG {}, metastorage
replication factor {}",
httpHost,
httpPort,
List.of(newCmgNodeNames),
metastorageReplicationFactor
);
- String[] args = {Main.class.getName(), "recovery", "cluster", "reset",
"--url", "http://" + httpHost + ":" + httpPort};
-
- if (newCmgNodeNames.length > 0) {
- args = concat(args, "--cluster-management-group", String.join(",",
newCmgNodeNames));
- }
-
- if (metastorageReplicationFactor != null) {
- args = concat(args, "--metastorage-replication-factor",
String.valueOf(metastorageReplicationFactor));
- }
-
- executeWithSameJavaBinaryAndClasspath(args);
- }
-
- private static void executeWithSameJavaBinaryAndClasspath(String... args)
throws InterruptedException {
- String javaBinaryPath =
ProcessHandle.current().info().command().orElseThrow();
- String javaClassPath = System.getProperty("java.class.path");
-
- LOG.info("Java binary is {}, classpath is {}", javaBinaryPath,
javaClassPath);
-
- String[] fullArgs = Stream.concat(Stream.of(javaBinaryPath, "-cp",
javaClassPath), Arrays.stream(args))
- .toArray(String[]::new);
-
- //noinspection UseOfProcessBuilder
- ProcessBuilder processBuilder = new ProcessBuilder(fullArgs);
- executeProcessFrom(processBuilder);
- }
-
- private static void executeProcessFrom(ProcessBuilder processBuilder)
throws InterruptedException {
try {
- Process process = processBuilder.start();
-
- if (!process.waitFor(30, SECONDS)) {
- throw new RuntimeException("Process did not finish in time,
stdout so far '" + stdoutString(process, false)
- + "', stderr so far '" + stderrString(process, false)
+ "'");
+ RecoveryApi recoveryApi = new
RecoveryApi(createApiClient(httpHost, httpPort));
+ ResetClusterRequest resetRequest = new ResetClusterRequest()
+ .cmgNodeNames(List.of(newCmgNodeNames))
+
.metastorageReplicationFactor(metastorageReplicationFactor);
+
+ recoveryApi.resetCluster(resetRequest);
+ } catch (ApiException e) {
+ if (e.getCause() instanceof IOException) {
+ LOG.info("Node has gone, this most probably means that cluster
repair is initiated and the node restarts.");
+ } else {
+ throw new RuntimeException(e);
}
- if (process.exitValue() != 0) {
- throw new RuntimeException("Return code " + process.exitValue()
- + ", stdout: " + stdoutString(process) + ", stderr: "
+ stderrString(process));
- }
-
- LOG.info("stdout is '{}'", stdoutString(process));
- LOG.info("stderr is '{}'", stderrString(process));
- } catch (IOException e) {
- throw new RuntimeException(e);
}
}
- private static String stdoutString(Process process) {
- return stdoutString(process, true);
- }
-
- private static String stdoutString(Process process, boolean readFully) {
- try (InputStream stdout = process.getInputStream()) {
- return new String(streamContent(stdout, readFully), UTF_8);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static String stderrString(Process process) {
- return stderrString(process, true);
- }
-
- private static String stderrString(Process process, boolean readFully) {
- try (InputStream stderr = process.getErrorStream()) {
- return new String(streamContent(stderr, readFully), UTF_8);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static byte[] streamContent(InputStream is, boolean readFully)
throws IOException {
- if (readFully) {
- return is.readAllBytes();
- } else {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ static void initiateMigration(String oldHttpHost, int oldHttpPort, String
newHttpHost, int newHttpPort) {
+ LOG.info("Initiating migration, old {}:{}, new {}:{}", oldHttpHost,
oldHttpPort, newHttpHost, newHttpPort);
- while (is.available() > 0) {
- baos.write(is.read());
+ try {
+ ClusterManagementApi clusterManagementApi = new
ClusterManagementApi(createApiClient(newHttpHost, newHttpPort));
+ ClusterState clusterState = clusterManagementApi.clusterState();
+
+ RecoveryApi recoveryApi = new
RecoveryApi(createApiClient(oldHttpHost, oldHttpPort));
+ MigrateRequest migrateRequest = new MigrateRequest()
+ .cmgNodes(clusterState.getCmgNodes())
+ .metaStorageNodes(clusterState.getMsNodes())
+ .version(clusterState.getIgniteVersion())
+ .clusterId(clusterState.getClusterTag().getClusterId())
+ .clusterName(clusterState.getClusterTag().getClusterName())
+ .formerClusterIds(clusterState.getFormerClusterIds());
+
+ recoveryApi.migrate(migrateRequest);
+ } catch (ApiException e) {
+ if (e.getCause() instanceof IOException) {
+ LOG.info("Node has gone, this most probably means that
migration is initiated and the node restarts.");
+ } else {
+ throw new RuntimeException(e);
}
-
- return baos.toByteArray();
}
}
- void initiateMigration(String oldHttpHost, int oldHttpPort, String
newHttpHost, int newHttpPort) throws InterruptedException {
- LOG.info("Initiating migration, old {}:{}, new {}:{}", oldHttpHost,
oldHttpPort, newHttpHost, newHttpPort);
-
- executeWithSameJavaBinaryAndClasspath(
- Main.class.getName(),
- "recovery", "cluster", "migrate",
- "--old-cluster-url", "http://" + oldHttpHost + ":" +
oldHttpPort,
- "--new-cluster-url", "http://" + newHttpHost + ":" +
newHttpPort
- );
+ private static ApiClient createApiClient(String httpHost, int httpPort) {
+ return new ApiClient().setBasePath("http://" + httpHost + ":" +
httpPort);
}
}