This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d776fae524b IGNITE-26864 Use REST API directly instead of CLI (#6863)
d776fae524b is described below

commit d776fae524baa5732b9f52ffc2ac6abd174162a1
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Mon Nov 3 12:49:31 2025 +0300

    IGNITE-26864 Use REST API directly instead of CLI (#6863)
---
 .../disaster/system/ItCmgDisasterRecoveryTest.java |   5 +-
 .../ItMetastorageGroupDisasterRecoveryTest.java    |   5 +-
 .../system/ItSystemGroupDisasterRecoveryTest.java  |   8 +-
 .../system/SystemDisasterRecoveryClient.java       | 141 +++++++--------------
 4 files changed, 53 insertions(+), 106 deletions(-)

diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
index 307e9849ab3..443938a7a55 100644
--- 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.disaster.system;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryClient.initiateClusterReset;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -82,8 +83,8 @@ class ItCmgDisasterRecoveryTest extends 
ItSystemGroupDisasterRecoveryTest {
         assertThat(ignite.logicalTopologyService().logicalTopologyOnLeader(), 
willCompleteSuccessfully());
     }
 
-    private void initiateCmgRepairVia(int conductorIndex, int... 
newCmgIndexes) throws InterruptedException {
-        recoveryClient.initiateClusterReset("localhost", 
cluster.httpPort(conductorIndex), null, nodeNames(newCmgIndexes));
+    private void initiateCmgRepairVia(int conductorIndex, int... 
newCmgIndexes) {
+        initiateClusterReset("localhost", cluster.httpPort(conductorIndex), 
null, nodeNames(newCmgIndexes));
     }
 
     @Test
diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
index d6a6762f320..a1de9074859 100644
--- 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.disaster.system;
 import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryClient.initiateClusterReset;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
@@ -92,8 +93,8 @@ class ItMetastorageGroupDisasterRecoveryTest extends 
ItSystemGroupDisasterRecove
         assertThat(ignite.metaStorageManager().get(new ByteArray("abc")), 
willCompleteSuccessfully());
     }
 
-    private void initiateMgRepairVia(int conductorIndex, int 
mgReplicationFactor, int... newCmgIndexes) throws InterruptedException {
-        recoveryClient.initiateClusterReset("localhost", 
cluster.httpPort(conductorIndex), mgReplicationFactor, 
nodeNames(newCmgIndexes));
+    private void initiateMgRepairVia(int conductorIndex, int 
mgReplicationFactor, int... newCmgIndexes) {
+        initiateClusterReset("localhost", cluster.httpPort(conductorIndex), 
mgReplicationFactor, nodeNames(newCmgIndexes));
     }
 
     @Test
diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
index 53ee20cbf1e..ed5ed9368c2 100644
--- 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
@@ -42,8 +42,6 @@ import org.jetbrains.annotations.Nullable;
  * Base for tests of CMG and Metastorage group disaster recovery.
  */
 abstract class ItSystemGroupDisasterRecoveryTest extends 
ClusterPerTestIntegrationTest {
-    final SystemDisasterRecoveryClient recoveryClient = new 
SystemDisasterRecoveryClient();
-
     @Override
     protected int initialNodes() {
         return 0;
@@ -123,15 +121,15 @@ abstract class ItSystemGroupDisasterRecoveryTest extends 
ClusterPerTestIntegrati
         waitTillNodeRestartsInternally(oldClusterNodeIndex);
     }
 
-    final void initiateMigration(int oldClusterNodeIndex, int 
newClusterNodeIndex) throws Exception {
+    final void initiateMigration(int oldClusterNodeIndex, int 
newClusterNodeIndex) {
         // Starting the node that did not see the repair.
         cluster.startEmbeddedNode(oldClusterNodeIndex);
 
         initiateMigrationToNewCluster(oldClusterNodeIndex, 
newClusterNodeIndex);
     }
 
-    final void initiateMigrationToNewCluster(int nodeMissingRepairIndex, int 
repairedNodeIndex) throws Exception {
-        recoveryClient.initiateMigration(
+    final void initiateMigrationToNewCluster(int nodeMissingRepairIndex, int 
repairedNodeIndex) {
+        SystemDisasterRecoveryClient.initiateMigration(
                 "localhost",
                 cluster.httpPort(nodeMissingRepairIndex),
                 "localhost",
diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryClient.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryClient.java
index 7d0f49ef4de..181b9faa687 100644
--- 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryClient.java
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryClient.java
@@ -17,135 +17,82 @@
 
 package org.apache.ignite.internal.disaster.system;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.internal.util.ArrayUtils.concat;
-
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Stream;
-import org.apache.ignite.internal.cli.Main;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.rest.client.api.ClusterManagementApi;
+import org.apache.ignite.rest.client.api.RecoveryApi;
+import org.apache.ignite.rest.client.invoker.ApiClient;
+import org.apache.ignite.rest.client.invoker.ApiException;
+import org.apache.ignite.rest.client.model.ClusterState;
+import org.apache.ignite.rest.client.model.MigrateRequest;
+import org.apache.ignite.rest.client.model.ResetClusterRequest;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Used to run system disaster recovery CLI commands.
  */
-@SuppressWarnings("UseOfProcessBuilder")
 class SystemDisasterRecoveryClient {
     private static final IgniteLogger LOG = 
Loggers.forClass(SystemDisasterRecoveryClient.class);
 
-    void initiateClusterReset(
+    static void initiateClusterReset(
             String httpHost,
             int httpPort,
             @Nullable Integer metastorageReplicationFactor,
             String... newCmgNodeNames
-    ) throws InterruptedException {
+    ) {
         LOG.info(
-                "Initiating cluster reset via {}:{}, new CMG {}, metastorage 
replication Factor {}",
+                "Initiating cluster reset via {}:{}, new CMG {}, metastorage 
replication factor {}",
                 httpHost,
                 httpPort,
                 List.of(newCmgNodeNames),
                 metastorageReplicationFactor
         );
 
-        String[] args = {Main.class.getName(), "recovery", "cluster", "reset", 
"--url", "http://"; + httpHost + ":" + httpPort};
-
-        if (newCmgNodeNames.length > 0) {
-            args = concat(args, "--cluster-management-group", String.join(",", 
newCmgNodeNames));
-        }
-
-        if (metastorageReplicationFactor != null) {
-            args = concat(args, "--metastorage-replication-factor", 
String.valueOf(metastorageReplicationFactor));
-        }
-
-        executeWithSameJavaBinaryAndClasspath(args);
-    }
-
-    private static void executeWithSameJavaBinaryAndClasspath(String... args) 
throws InterruptedException {
-        String javaBinaryPath = 
ProcessHandle.current().info().command().orElseThrow();
-        String javaClassPath = System.getProperty("java.class.path");
-
-        LOG.info("Java binary is {}, classpath is {}", javaBinaryPath, 
javaClassPath);
-
-        String[] fullArgs = Stream.concat(Stream.of(javaBinaryPath, "-cp", 
javaClassPath), Arrays.stream(args))
-                .toArray(String[]::new);
-
-        //noinspection UseOfProcessBuilder
-        ProcessBuilder processBuilder = new ProcessBuilder(fullArgs);
-        executeProcessFrom(processBuilder);
-    }
-
-    private static void executeProcessFrom(ProcessBuilder processBuilder) 
throws InterruptedException {
         try {
-            Process process = processBuilder.start();
-
-            if (!process.waitFor(30, SECONDS)) {
-                throw new RuntimeException("Process did not finish in time, 
stdout so far '" + stdoutString(process, false)
-                        + "', stderr so far '" + stderrString(process, false) 
+ "'");
+            RecoveryApi recoveryApi = new 
RecoveryApi(createApiClient(httpHost, httpPort));
+            ResetClusterRequest resetRequest = new ResetClusterRequest()
+                    .cmgNodeNames(List.of(newCmgNodeNames))
+                    
.metastorageReplicationFactor(metastorageReplicationFactor);
+
+            recoveryApi.resetCluster(resetRequest);
+        } catch (ApiException e) {
+            if (e.getCause() instanceof IOException) {
+                LOG.info("Node has gone, this most probably means that cluster 
repair is initiated and the node restarts.");
+            } else {
+                throw new RuntimeException(e);
             }
-            if (process.exitValue() != 0) {
-                throw new RuntimeException("Return code " + process.exitValue()
-                        + ", stdout: " + stdoutString(process) + ", stderr: " 
+ stderrString(process));
-            }
-
-            LOG.info("stdout is '{}'", stdoutString(process));
-            LOG.info("stderr is '{}'", stderrString(process));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
         }
     }
 
-    private static String stdoutString(Process process) {
-        return stdoutString(process, true);
-    }
-
-    private static String stdoutString(Process process, boolean readFully) {
-        try (InputStream stdout = process.getInputStream()) {
-            return new String(streamContent(stdout, readFully), UTF_8);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static String stderrString(Process process) {
-        return stderrString(process, true);
-    }
-
-    private static String stderrString(Process process, boolean readFully) {
-        try (InputStream stderr = process.getErrorStream()) {
-            return new String(streamContent(stderr, readFully), UTF_8);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static byte[] streamContent(InputStream is, boolean readFully) 
throws IOException {
-        if (readFully) {
-            return is.readAllBytes();
-        } else {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    static void initiateMigration(String oldHttpHost, int oldHttpPort, String 
newHttpHost, int newHttpPort) {
+        LOG.info("Initiating migration, old {}:{}, new {}:{}", oldHttpHost, 
oldHttpPort, newHttpHost, newHttpPort);
 
-            while (is.available() > 0) {
-                baos.write(is.read());
+        try {
+            ClusterManagementApi clusterManagementApi = new 
ClusterManagementApi(createApiClient(newHttpHost, newHttpPort));
+            ClusterState clusterState = clusterManagementApi.clusterState();
+
+            RecoveryApi recoveryApi = new 
RecoveryApi(createApiClient(oldHttpHost, oldHttpPort));
+            MigrateRequest migrateRequest = new MigrateRequest()
+                    .cmgNodes(clusterState.getCmgNodes())
+                    .metaStorageNodes(clusterState.getMsNodes())
+                    .version(clusterState.getIgniteVersion())
+                    .clusterId(clusterState.getClusterTag().getClusterId())
+                    .clusterName(clusterState.getClusterTag().getClusterName())
+                    .formerClusterIds(clusterState.getFormerClusterIds());
+
+            recoveryApi.migrate(migrateRequest);
+        } catch (ApiException e) {
+            if (e.getCause() instanceof IOException) {
+                LOG.info("Node has gone, this most probably means that 
migration is initiated and the node restarts.");
+            } else {
+                throw new RuntimeException(e);
             }
-
-            return baos.toByteArray();
         }
     }
 
-    void initiateMigration(String oldHttpHost, int oldHttpPort, String 
newHttpHost, int newHttpPort) throws InterruptedException {
-        LOG.info("Initiating migration, old {}:{}, new {}:{}", oldHttpHost, 
oldHttpPort, newHttpHost, newHttpPort);
-
-        executeWithSameJavaBinaryAndClasspath(
-                Main.class.getName(),
-                "recovery", "cluster", "migrate",
-                "--old-cluster-url", "http://"; + oldHttpHost + ":" + 
oldHttpPort,
-                "--new-cluster-url", "http://"; + newHttpHost + ":" + 
newHttpPort
-        );
+    private static ApiClient createApiClient(String httpHost, int httpPort) {
+        return new ApiClient().setBasePath("http://"; + httpHost + ":" + 
httpPort);
     }
 }

Reply via email to