This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a09d3bbcfd [Fix][Rest-API] Submit or stop job from an inactive master 
node  (#6217)
a09d3bbcfd is described below

commit a09d3bbcfdeb2c13fa3fdb05f14b0b85f0b01f29
Author: Guangdong Liu <[email protected]>
AuthorDate: Wed Jan 17 22:54:02 2024 +0800

    [Fix][Rest-API] Submit or stop job from an inactive master node  (#6217)
---
 .../engine/e2e/ClusterSeaTunnelContainer.java      | 464 +++++++++++++++++++++
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java | 330 ---------------
 .../src/test/resources/cluster/hazelcast.yaml      |  44 ++
 .../server/rest/RestHttpGetCommandProcessor.java   |  67 +--
 .../server/rest/RestHttpPostCommandProcessor.java  |  72 ++--
 5 files changed, 589 insertions(+), 388 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
new file mode 100644
index 0000000000..a349a9f812
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import io.restassured.response.Response;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.restassured.RestAssured.given;
+import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ClusterSeaTunnelContainer extends SeaTunnelContainer {
+    private static final String JDK_DOCKER_IMAGE = "openjdk:8";
+    private static final String SERVER_SHELL = "seatunnel-cluster.sh";
+
+    private GenericContainer<?> secondServer;
+
+    private final Network NETWORK = Network.newNetwork();
+
+    private static final String jobName = "test测试";
+    private static final String paramJobName = "param_test测试";
+
+    private static final String http = "http://";;
+
+    private static final String colon = ":";
+
+    private static final String confFile = "/fakesource_to_console.conf";
+
+    private static final Path binPath = Paths.get(SEATUNNEL_HOME, "bin", 
SERVER_SHELL);
+    private static final Path config = Paths.get(SEATUNNEL_HOME, "config");
+    private static final Path hadoopJar =
+            Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar");
+
+    @Override
+    @BeforeEach
+    public void startUp() throws Exception {
+
+        server = createServer("server");
+        secondServer = createServer("secondServer");
+
+        // check cluster
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            Response response =
+                                    given().get(
+                                                    http
+                                                            + server.getHost()
+                                                            + colon
+                                                            + 
server.getFirstMappedPort()
+                                                            + 
"/hazelcast/rest/cluster");
+                            response.then().statusCode(200);
+                            Assertions.assertEquals(
+                                    2, 
response.jsonPath().getList("members").size());
+                        });
+    }
+
+    @Override
+    @AfterEach
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (secondServer != null) {
+            secondServer.close();
+        }
+    }
+
+    @Test
+    public void testSubmitJob() {
+        AtomicInteger i = new AtomicInteger();
+
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            Response response =
+                                    i.get() == 0
+                                            ? submitJob(container, "BATCH", 
jobName, paramJobName)
+                                            : submitJob(container, "BATCH", 
jobName, null);
+                            if (i.get() == 0) {
+                                response.then()
+                                        .statusCode(200)
+                                        .body("jobName", 
equalTo(paramJobName));
+                            } else {
+                                
response.then().statusCode(200).body("jobName", equalTo(jobName));
+                            }
+                            String jobId = 
response.getBody().jsonPath().getString("jobId");
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () -> {
+                                                given().get(
+                                                                http
+                                                                        + 
container.getHost()
+                                                                        + colon
+                                                                        + 
container
+                                                                               
 .getFirstMappedPort()
+                                                                        + 
RestConstant
+                                                                               
 .FINISHED_JOBS_INFO
+                                                                        + 
"/FINISHED")
+                                                        .then()
+                                                        .statusCode(200)
+                                                        .body(
+                                                                "[" + i.get() 
+ "].jobName",
+                                                                equalTo(
+                                                                        
i.get() == 0
+                                                                               
 ? paramJobName
+                                                                               
 : jobName))
+                                                        .body(
+                                                                "[" + i.get() 
+ "].errorMsg",
+                                                                equalTo(null))
+                                                        .body(
+                                                                "[" + i.get() 
+ "].jobDag.jobId",
+                                                                
equalTo(Long.parseLong(jobId)))
+                                                        .body(
+                                                                "["
+                                                                        + 
i.get()
+                                                                        + 
"].metrics.SourceReceivedCount",
+                                                                equalTo("100"))
+                                                        .body(
+                                                                "["
+                                                                        + 
i.get()
+                                                                        + 
"].metrics.SinkWriteCount",
+                                                                equalTo("100"))
+                                                        .body(
+                                                                "[" + i.get() 
+ "].jobStatus",
+                                                                
equalTo("FINISHED"));
+
+                                                // test for without status 
parameter.
+                                                given().get(
+                                                                http
+                                                                        + 
container.getHost()
+                                                                        + colon
+                                                                        + 
container
+                                                                               
 .getFirstMappedPort()
+                                                                        + 
RestConstant
+                                                                               
 .FINISHED_JOBS_INFO)
+                                                        .then()
+                                                        .statusCode(200)
+                                                        .body(
+                                                                "[" + i.get() 
+ "].jobName",
+                                                                equalTo(
+                                                                        
i.get() == 0
+                                                                               
 ? paramJobName
+                                                                               
 : jobName))
+                                                        .body(
+                                                                "[" + i.get() 
+ "].errorMsg",
+                                                                equalTo(null))
+                                                        .body(
+                                                                "[" + i.get() 
+ "].jobDag.jobId",
+                                                                
equalTo(Long.parseLong(jobId)))
+                                                        .body(
+                                                                "["
+                                                                        + 
i.get()
+                                                                        + 
"].metrics.SourceReceivedCount",
+                                                                equalTo("100"))
+                                                        .body(
+                                                                "["
+                                                                        + 
i.get()
+                                                                        + 
"].metrics.SinkWriteCount",
+                                                                equalTo("100"))
+                                                        .body(
+                                                                "[" + i.get() 
+ "].jobStatus",
+                                                                
equalTo("FINISHED"));
+                                            });
+
+                            i.getAndIncrement();
+                        });
+    }
+
+    @Test
+    public void testStartWithSavePointWithoutJobId() {
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            Response response =
+                                    submitJob("BATCH", container, true, 
jobName, paramJobName);
+                            response.then()
+                                    .statusCode(400)
+                                    .body(
+                                            "message",
+                                            equalTo(
+                                                    "Please provide jobId when 
start with save point."));
+                        });
+    }
+
+    @Test
+    public void testStopJob() {
+
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            String jobId =
+                                    submitJob(container, "STREAMING", jobName, 
paramJobName)
+                                            .getBody()
+                                            .jsonPath()
+                                            .getString("jobId");
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
container
+                                                                               
     .getFirstMappedPort()
+                                                                            + 
RestConstant
+                                                                               
     .RUNNING_JOB_URL
+                                                                            + 
"/"
+                                                                            + 
jobId)
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body("jobStatus", 
equalTo("RUNNING")));
+
+                            String parameters =
+                                    "{"
+                                            + "\"jobId\":"
+                                            + jobId
+                                            + ","
+                                            + "\"isStopWithSavePoint\":true}";
+
+                            given().body(parameters)
+                                    .post(
+                                            http
+                                                    + container.getHost()
+                                                    + colon
+                                                    + 
container.getFirstMappedPort()
+                                                    + 
RestConstant.STOP_JOB_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobId", equalTo(jobId));
+
+                            Awaitility.await()
+                                    .atMost(6, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
container
+                                                                               
     .getFirstMappedPort()
+                                                                            + 
RestConstant
+                                                                               
     .FINISHED_JOBS_INFO
+                                                                            + 
"/SAVEPOINT_DONE")
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body("[0].jobId", 
equalTo(jobId)));
+
+                            String jobId2 =
+                                    submitJob(container, "STREAMING", jobName, 
paramJobName)
+                                            .getBody()
+                                            .jsonPath()
+                                            .getString("jobId");
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
container
+                                                                               
     .getFirstMappedPort()
+                                                                            + 
RestConstant
+                                                                               
     .RUNNING_JOB_URL
+                                                                            + 
"/"
+                                                                            + 
jobId2)
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body("jobStatus", 
equalTo("RUNNING")));
+                            parameters =
+                                    "{"
+                                            + "\"jobId\":"
+                                            + jobId2
+                                            + ","
+                                            + "\"isStopWithSavePoint\":false}";
+
+                            given().body(parameters)
+                                    .post(
+                                            http
+                                                    + container.getHost()
+                                                    + colon
+                                                    + 
container.getFirstMappedPort()
+                                                    + 
RestConstant.STOP_JOB_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobId", equalTo(jobId2));
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
container
+                                                                               
     .getFirstMappedPort()
+                                                                            + 
RestConstant
+                                                                               
     .FINISHED_JOBS_INFO
+                                                                            + 
"/CANCELED")
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body("[0].jobId", 
equalTo(jobId2)));
+                        });
+    }
+
+    private Response submitJob(
+            GenericContainer<?> container, String jobMode, String jobName, 
String paramJobName) {
+        return submitJob(jobMode, container, false, jobName, paramJobName);
+    }
+
+    private Response submitJob(
+            String jobMode,
+            GenericContainer<?> container,
+            boolean isStartWithSavePoint,
+            String jobName,
+            String paramJobName) {
+        String requestBody =
+                "{\n"
+                        + "    \"env\": {\n"
+                        + "        \"job.name\": \""
+                        + jobName
+                        + "\",\n"
+                        + "        \"job.mode\": \""
+                        + jobMode
+                        + "\"\n"
+                        + "    },\n"
+                        + "    \"source\": [\n"
+                        + "        {\n"
+                        + "            \"plugin_name\": \"FakeSource\",\n"
+                        + "            \"result_table_name\": \"fake\",\n"
+                        + "            \"row.num\": 100,\n"
+                        + "            \"schema\": {\n"
+                        + "                \"fields\": {\n"
+                        + "                    \"name\": \"string\",\n"
+                        + "                    \"age\": \"int\",\n"
+                        + "                    \"card\": \"int\"\n"
+                        + "                }\n"
+                        + "            }\n"
+                        + "        }\n"
+                        + "    ],\n"
+                        + "    \"transform\": [\n"
+                        + "    ],\n"
+                        + "    \"sink\": [\n"
+                        + "        {\n"
+                        + "            \"plugin_name\": \"Console\",\n"
+                        + "            \"source_table_name\": [\"fake\"]\n"
+                        + "        }\n"
+                        + "    ]\n"
+                        + "}";
+        String parameters = null;
+        if (paramJobName != null) {
+            parameters = "jobName=" + paramJobName;
+        }
+        if (isStartWithSavePoint) {
+            parameters = parameters + "&isStartWithSavePoint=true";
+        }
+        Response response =
+                given().body(requestBody)
+                        .header("Content-Type", "application/json; 
charset=utf-8")
+                        .post(
+                                parameters == null
+                                        ? http
+                                                + container.getHost()
+                                                + colon
+                                                + 
container.getFirstMappedPort()
+                                                + RestConstant.SUBMIT_JOB_URL
+                                        : http
+                                                + container.getHost()
+                                                + colon
+                                                + 
container.getFirstMappedPort()
+                                                + RestConstant.SUBMIT_JOB_URL
+                                                + "?"
+                                                + parameters);
+        return response;
+    }
+
+    private GenericContainer<?> createServer(String networkAlias)
+            throws IOException, InterruptedException {
+        GenericContainer<?> server =
+                new GenericContainer<>(getDockerImage())
+                        .withNetwork(NETWORK)
+                        .withEnv("TZ", "UTC")
+                        
.withCommand(ContainerUtil.adaptPathForWin(binPath.toString()))
+                        .withNetworkAliases(networkAlias)
+                        .withExposedPorts()
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(
+                                                "seatunnel-engine:" + 
JDK_DOCKER_IMAGE)))
+                        .waitingFor(Wait.forListeningPort());
+        copySeaTunnelStarterToContainer(server);
+        server.setExposedPorts(Collections.singletonList(5801));
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
+                config.toString());
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/"),
+                config.toString());
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
+                hadoopJar.toString());
+        server.start();
+        // execute extra commands
+        executeExtraCommands(server);
+        ContainerUtil.copyConnectorJarToContainer(
+                server,
+                confFile,
+                getConnectorModulePath(),
+                getConnectorNamePrefix(),
+                getConnectorType(),
+                SEATUNNEL_HOME);
+
+        return server;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index e64243be79..2d2fc5d96e 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -20,12 +20,10 @@ package org.apache.seatunnel.engine.e2e;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
 import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
-import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 import org.apache.seatunnel.engine.server.rest.RestConstant;
 
@@ -36,16 +34,11 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import com.hazelcast.client.config.ClientConfig;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
-import io.restassured.response.Response;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static io.restassured.RestAssured.given;
 import static org.hamcrest.Matchers.equalTo;
@@ -64,9 +57,6 @@ public class RestApiIT {
 
     private static SeaTunnelClient engineClient;
 
-    private static final String jobName = "test测试";
-    private static final String paramJobName = "param_test测试";
-
     @BeforeEach
     void beforeClass() throws Exception {
         String testClusterName = TestUtils.getClusterName("RestApiIT");
@@ -155,248 +145,6 @@ public class RestApiIT {
                         });
     }
 
-    @Test
-    public void testSubmitJob() {
-        AtomicInteger i = new AtomicInteger();
-
-        Arrays.asList(node2, node1)
-                .forEach(
-                        instance -> {
-                            Response response =
-                                    i.get() == 0
-                                            ? submitJob(instance, "BATCH", 
jobName, paramJobName)
-                                            : submitJob(instance, "BATCH", 
jobName, null);
-                            if (i.get() == 0) {
-                                response.then()
-                                        .statusCode(200)
-                                        .body("jobName", 
equalTo(paramJobName));
-                            } else {
-                                
response.then().statusCode(200).body("jobName", equalTo(jobName));
-                            }
-                            String jobId = 
response.getBody().jsonPath().getString("jobId");
-                            SeaTunnelServer seaTunnelServer = null;
-
-                            for (HazelcastInstance hazelcastInstance :
-                                    Hazelcast.getAllHazelcastInstances()) {
-                                SeaTunnelServer server =
-                                        (SeaTunnelServer)
-                                                ((HazelcastInstanceProxy) 
hazelcastInstance)
-                                                        .getOriginal()
-                                                        .node
-                                                        .getNodeExtension()
-                                                        
.createExtensionServices()
-                                                        
.get(Constant.SEATUNNEL_SERVICE_NAME);
-
-                                if (server.isMasterNode()) {
-                                    seaTunnelServer = server;
-                                }
-                            }
-
-                            SeaTunnelServer finalSeaTunnelServer = 
seaTunnelServer;
-                            Awaitility.await()
-                                    .atMost(2, TimeUnit.MINUTES)
-                                    .untilAsserted(
-                                            () ->
-                                                    Assertions.assertEquals(
-                                                            JobStatus.FINISHED,
-                                                            
finalSeaTunnelServer
-                                                                    
.getCoordinatorService()
-                                                                    
.getJobStatus(
-                                                                            
Long.parseLong(
-                                                                               
     jobId))));
-
-                            given().get(
-                                            HOST
-                                                    + instance.getCluster()
-                                                            .getLocalMember()
-                                                            .getAddress()
-                                                            .getPort()
-                                                    + 
RestConstant.FINISHED_JOBS_INFO
-                                                    + "/FINISHED")
-                                    .then()
-                                    .statusCode(200)
-                                    .body(
-                                            "[" + i.get() + "].jobName",
-                                            equalTo(i.get() == 0 ? 
paramJobName : jobName))
-                                    .body("[" + i.get() + "].errorMsg", 
equalTo(null))
-                                    .body(
-                                            "[" + i.get() + "].jobDag.jobId",
-                                            equalTo(Long.parseLong(jobId)))
-                                    .body(
-                                            "[" + i.get() + 
"].metrics.SourceReceivedCount",
-                                            equalTo("100"))
-                                    .body(
-                                            "[" + i.get() + 
"].metrics.SinkWriteCount",
-                                            equalTo("100"))
-                                    .body("[" + i.get() + "].jobStatus", 
equalTo("FINISHED"));
-
-                            // test for without status parameter.
-                            given().get(
-                                            HOST
-                                                    + instance.getCluster()
-                                                            .getLocalMember()
-                                                            .getAddress()
-                                                            .getPort()
-                                                    + 
RestConstant.FINISHED_JOBS_INFO)
-                                    .then()
-                                    .statusCode(200)
-                                    .body(
-                                            "[" + i.get() + "].jobName",
-                                            equalTo(i.get() == 0 ? 
paramJobName : jobName))
-                                    .body("[" + i.get() + "].errorMsg", 
equalTo(null))
-                                    .body(
-                                            "[" + i.get() + "].jobDag.jobId",
-                                            equalTo(Long.parseLong(jobId)))
-                                    .body(
-                                            "[" + i.get() + 
"].metrics.SourceReceivedCount",
-                                            equalTo("100"))
-                                    .body(
-                                            "[" + i.get() + 
"].metrics.SinkWriteCount",
-                                            equalTo("100"))
-                                    .body("[" + i.get() + "].jobStatus", 
equalTo("FINISHED"));
-                            i.getAndIncrement();
-                        });
-    }
-
-    @Test
-    public void testStopJob() {
-
-        Arrays.asList(node2, node1)
-                .forEach(
-                        instance -> {
-                            String jobId =
-                                    submitJob(instance, "STREAMING", jobName, 
paramJobName)
-                                            .getBody()
-                                            .jsonPath()
-                                            .getString("jobId");
-                            SeaTunnelServer seaTunnelServer = null;
-
-                            for (HazelcastInstance hazelcastInstance :
-                                    Hazelcast.getAllHazelcastInstances()) {
-                                SeaTunnelServer server =
-                                        (SeaTunnelServer)
-                                                ((HazelcastInstanceProxy) 
hazelcastInstance)
-                                                        .getOriginal()
-                                                        .node
-                                                        .getNodeExtension()
-                                                        
.createExtensionServices()
-                                                        
.get(Constant.SEATUNNEL_SERVICE_NAME);
-
-                                if (server.isMasterNode()) {
-                                    seaTunnelServer = server;
-                                }
-                            }
-
-                            SeaTunnelServer finalSeaTunnelServer = 
seaTunnelServer;
-                            Awaitility.await()
-                                    .atMost(2, TimeUnit.MINUTES)
-                                    .untilAsserted(
-                                            () ->
-                                                    Assertions.assertEquals(
-                                                            JobStatus.RUNNING,
-                                                            
finalSeaTunnelServer
-                                                                    
.getCoordinatorService()
-                                                                    
.getJobStatus(
-                                                                            
Long.parseLong(
-                                                                               
     jobId))));
-
-                            String parameters =
-                                    "{"
-                                            + "\"jobId\":"
-                                            + jobId
-                                            + ","
-                                            + "\"isStopWithSavePoint\":true}";
-
-                            given().body(parameters)
-                                    .post(
-                                            HOST
-                                                    + instance.getCluster()
-                                                            .getLocalMember()
-                                                            .getAddress()
-                                                            .getPort()
-                                                    + 
RestConstant.STOP_JOB_URL)
-                                    .then()
-                                    .statusCode(200)
-                                    .body("jobId", equalTo(jobId));
-
-                            Awaitility.await()
-                                    .atMost(6, TimeUnit.MINUTES)
-                                    .untilAsserted(
-                                            () ->
-                                                    Assertions.assertEquals(
-                                                            
JobStatus.SAVEPOINT_DONE,
-                                                            
finalSeaTunnelServer
-                                                                    
.getCoordinatorService()
-                                                                    
.getJobStatus(
-                                                                            
Long.parseLong(
-                                                                               
     jobId))));
-
-                            String jobId2 =
-                                    submitJob(instance, "STREAMING", jobName, 
paramJobName)
-                                            .getBody()
-                                            .jsonPath()
-                                            .getString("jobId");
-
-                            Awaitility.await()
-                                    .atMost(2, TimeUnit.MINUTES)
-                                    .untilAsserted(
-                                            () ->
-                                                    Assertions.assertEquals(
-                                                            JobStatus.RUNNING,
-                                                            
finalSeaTunnelServer
-                                                                    
.getCoordinatorService()
-                                                                    
.getJobStatus(
-                                                                            
Long.parseLong(
-                                                                               
     jobId2))));
-                            parameters =
-                                    "{"
-                                            + "\"jobId\":"
-                                            + jobId2
-                                            + ","
-                                            + "\"isStopWithSavePoint\":false}";
-
-                            given().body(parameters)
-                                    .post(
-                                            HOST
-                                                    + instance.getCluster()
-                                                            .getLocalMember()
-                                                            .getAddress()
-                                                            .getPort()
-                                                    + 
RestConstant.STOP_JOB_URL)
-                                    .then()
-                                    .statusCode(200)
-                                    .body("jobId", equalTo(jobId2));
-
-                            Awaitility.await()
-                                    .atMost(2, TimeUnit.MINUTES)
-                                    .untilAsserted(
-                                            () ->
-                                                    Assertions.assertEquals(
-                                                            JobStatus.CANCELED,
-                                                            
finalSeaTunnelServer
-                                                                    
.getCoordinatorService()
-                                                                    
.getJobStatus(
-                                                                            
Long.parseLong(
-                                                                               
     jobId2))));
-                        });
-    }
-
-    @Test
-    public void testStartWithSavePointWithoutJobId() {
-        Arrays.asList(node2, node1)
-                .forEach(
-                        instance -> {
-                            Response response =
-                                    submitJob("BATCH", instance, true, 
jobName, paramJobName);
-                            response.then()
-                                    .statusCode(400)
-                                    .body(
-                                            "message",
-                                            equalTo(
-                                                    "Please provide jobId when 
start with save point."));
-                        });
-    }
-
     @Test
     public void testEncryptConfig() {
         Arrays.asList(node2, node1)
@@ -469,82 +217,4 @@ public class RestApiIT {
             node2.shutdown();
         }
     }
-
-    private Response submitJob(
-            HazelcastInstanceImpl hazelcastInstance,
-            String jobMode,
-            String jobName,
-            String paramJobName) {
-        return submitJob(jobMode, hazelcastInstance, false, jobName, 
paramJobName);
-    }
-
-    private Response submitJob(
-            String jobMode,
-            HazelcastInstanceImpl hazelcastInstance,
-            boolean isStartWithSavePoint,
-            String jobName,
-            String paramJobName) {
-        String requestBody =
-                "{\n"
-                        + "    \"env\": {\n"
-                        + "        \"job.name\": \""
-                        + jobName
-                        + "\",\n"
-                        + "        \"job.mode\": \""
-                        + jobMode
-                        + "\"\n"
-                        + "    },\n"
-                        + "    \"source\": [\n"
-                        + "        {\n"
-                        + "            \"plugin_name\": \"FakeSource\",\n"
-                        + "            \"result_table_name\": \"fake\",\n"
-                        + "            \"row.num\": 100,\n"
-                        + "            \"schema\": {\n"
-                        + "                \"fields\": {\n"
-                        + "                    \"name\": \"string\",\n"
-                        + "                    \"age\": \"int\",\n"
-                        + "                    \"card\": \"int\"\n"
-                        + "                }\n"
-                        + "            }\n"
-                        + "        }\n"
-                        + "    ],\n"
-                        + "    \"transform\": [\n"
-                        + "    ],\n"
-                        + "    \"sink\": [\n"
-                        + "        {\n"
-                        + "            \"plugin_name\": \"Console\",\n"
-                        + "            \"source_table_name\": [\"fake\"]\n"
-                        + "        }\n"
-                        + "    ]\n"
-                        + "}";
-        String parameters = null;
-        if (paramJobName != null) {
-            parameters = "jobName=" + paramJobName;
-        }
-        if (isStartWithSavePoint) {
-            parameters = parameters + "&isStartWithSavePoint=true";
-        }
-        Response response =
-                given().body(requestBody)
-                        .header("Content-Type", "application/json; 
charset=utf-8")
-                        .post(
-                                parameters == null
-                                        ? HOST
-                                                + hazelcastInstance
-                                                        .getCluster()
-                                                        .getLocalMember()
-                                                        .getAddress()
-                                                        .getPort()
-                                                + RestConstant.SUBMIT_JOB_URL
-                                        : HOST
-                                                + hazelcastInstance
-                                                        .getCluster()
-                                                        .getLocalMember()
-                                                        .getAddress()
-                                                        .getPort()
-                                                + RestConstant.SUBMIT_JOB_URL
-                                                + "?"
-                                                + parameters);
-        return response;
-    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/hazelcast.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/hazelcast.yaml
new file mode 100644
index 0000000000..45879e2fd6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/hazelcast.yaml
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+  cluster-name: seatunnel
+  network:
+    rest-api:
+      enabled: true
+      endpoint-groups:
+        CLUSTER_WRITE:
+          enabled: true
+        DATA:
+          enabled: true
+    join:
+      tcp-ip:
+        enabled: true
+        member-list:
+          - secondServer
+          - server
+    port:
+      auto-increment: true
+      port-count: 100
+      port: 5801
+  properties:
+    hazelcast.invocation.max.retry.count: 100
+    hazelcast.invocation.retry.pause.millis: 1000
+    hazelcast.tcp.join.port.try.count: 30
+    hazelcast.slow.operation.detector.stacktrace.logging.enabled: true
+    hazelcast.logging.type: log4j2
+    hazelcast.operation.generic.thread.count: 200
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 370dd71953..6d9619abbf 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -34,14 +34,13 @@ import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
 import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
 import 
org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
+import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
+import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.cluster.Cluster;
 import com.hazelcast.cluster.Member;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
 import com.hazelcast.internal.ascii.TextCommandService;
 import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
 import com.hazelcast.internal.ascii.rest.HttpGetCommand;
@@ -224,11 +223,24 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
                         .map(
                                 jobState -> {
                                     Long jobId = jobState.getJobId();
-                                    String jobMetrics =
-                                            getSeaTunnelServer()
-                                                    .getCoordinatorService()
-                                                    .getJobMetrics(jobId)
-                                                    .toJsonString();
+                                    SeaTunnelServer seaTunnelServer = 
getSeaTunnelServer();
+                                    String jobMetrics;
+                                    if (seaTunnelServer == null) {
+                                        jobMetrics =
+                                                (String)
+                                                        
NodeEngineUtil.sendOperationToMasterNode(
+                                                                        
getNode().nodeEngine,
+                                                                        new 
GetJobMetricsOperation(
+                                                                               
 jobId))
+                                                                .join();
+                                    } else {
+                                        jobMetrics =
+                                                seaTunnelServer
+                                                        
.getCoordinatorService()
+                                                        .getJobMetrics(jobId)
+                                                        .toJsonString();
+                                    }
+
                                     JobDAGInfo jobDAGInfo = 
finishedJobDAGInfo.get(jobId);
 
                                     return convertToJson(
@@ -293,20 +305,7 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
         SeaTunnelServer seaTunnelServer =
                 (SeaTunnelServer) 
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
         if (!seaTunnelServer.isMasterNode()) {
-            for (HazelcastInstance hazelcastInstance : 
Hazelcast.getAllHazelcastInstances()) {
-                SeaTunnelServer seaTunnelServer1 =
-                        (SeaTunnelServer)
-                                ((HazelcastInstanceProxy) hazelcastInstance)
-                                        .getOriginal()
-                                        .node
-                                        .getNodeExtension()
-                                        .createExtensionServices()
-                                        .get(Constant.SEATUNNEL_SERVICE_NAME);
-
-                if (seaTunnelServer1.isMasterNode()) {
-                    return seaTunnelServer1;
-                }
-            }
+            return null;
         }
         return seaTunnelServer;
     }
@@ -334,9 +333,27 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
                         classLoader,
                         jobImmutableInformation.getLogicalDag());
 
-        String jobMetrics =
-                
getSeaTunnelServer().getCoordinatorService().getJobMetrics(jobId).toJsonString();
-        JobStatus jobStatus = 
getSeaTunnelServer().getCoordinatorService().getJobStatus(jobId);
+        SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
+        String jobMetrics;
+        JobStatus jobStatus;
+        if (seaTunnelServer == null) {
+            jobMetrics =
+                    (String)
+                            NodeEngineUtil.sendOperationToMasterNode(
+                                            getNode().nodeEngine, new 
GetJobMetricsOperation(jobId))
+                                    .join();
+            jobStatus =
+                    JobStatus.values()[
+                            (int)
+                                    NodeEngineUtil.sendOperationToMasterNode(
+                                                    getNode().nodeEngine,
+                                                    new 
GetJobStatusOperation(jobId))
+                                            .join()];
+        } else {
+            jobMetrics =
+                    
seaTunnelServer.getCoordinatorService().getJobMetrics(jobId).toJsonString();
+            jobStatus = 
seaTunnelServer.getCoordinatorService().getJobStatus(jobId);
+        }
 
         jobInfoJson
                 .add(RestConstant.JOB_ID, String.valueOf(jobId))
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index 465942c6f4..6f09e0aadf 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -32,13 +32,14 @@ import 
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.CoordinatorService;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor;
+import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
+import org.apache.seatunnel.engine.server.operation.SavePointJobOperation;
+import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 import org.apache.seatunnel.engine.server.utils.RestUtil;
 
 import org.apache.commons.lang.StringUtils;
 
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
 import com.hazelcast.internal.ascii.TextCommandService;
 import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
 import com.hazelcast.internal.ascii.rest.HttpPostCommand;
@@ -102,20 +103,7 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
         SeaTunnelServer seaTunnelServer =
                 (SeaTunnelServer) 
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
         if (!seaTunnelServer.isMasterNode()) {
-            for (HazelcastInstance hazelcastInstance : 
Hazelcast.getAllHazelcastInstances()) {
-                seaTunnelServer =
-                        (SeaTunnelServer)
-                                ((HazelcastInstanceProxy) hazelcastInstance)
-                                        .getOriginal()
-                                        .node
-                                        .getNodeExtension()
-                                        .createExtensionServices()
-                                        .get(Constant.SEATUNNEL_SERVICE_NAME);
-
-                if (seaTunnelServer.isMasterNode()) {
-                    return seaTunnelServer;
-                }
-            }
+            return null;
         }
         return seaTunnelServer;
     }
@@ -146,14 +134,21 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
                                 ? 
Long.parseLong(requestParams.get(RestConstant.JOB_ID))
                                 : null);
         JobImmutableInformation jobImmutableInformation = 
restJobExecutionEnvironment.build();
-
+        Long jobId = jobImmutableInformation.getJobId();
         SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
-        Long jobId =
-                submitJob(
-                        seaTunnelServer,
-                        jobImmutableInformation,
-                        jobConfig,
-                        restJobExecutionEnvironment);
+        if (seaTunnelServer == null) {
+
+            NodeEngineUtil.sendOperationToMasterNode(
+                            getNode().nodeEngine,
+                            new SubmitJobOperation(
+                                    jobImmutableInformation.getJobId(),
+                                    
getNode().nodeEngine.toData(jobImmutableInformation)))
+                    .join();
+
+        } else {
+
+            submitJob(seaTunnelServer, jobImmutableInformation, jobConfig);
+        }
 
         this.prepareResponse(
                 httpPostCommand,
@@ -174,12 +169,26 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
                     
Boolean.parseBoolean(map.get(RestConstant.IS_STOP_WITH_SAVE_POINT).toString());
         }
 
-        CoordinatorService coordinatorService = 
getSeaTunnelServer().getCoordinatorService();
+        SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
+        if (seaTunnelServer == null) {
+            if (isStopWithSavePoint) {
+                NodeEngineUtil.sendOperationToMasterNode(
+                                getNode().nodeEngine, new 
SavePointJobOperation(jobId))
+                        .join();
+            } else {
+                NodeEngineUtil.sendOperationToMasterNode(
+                                getNode().nodeEngine, new 
CancelJobOperation(jobId))
+                        .join();
+            }
 
-        if (isStopWithSavePoint) {
-            coordinatorService.savePoint(jobId);
         } else {
-            coordinatorService.cancelJob(jobId);
+            CoordinatorService coordinatorService = 
getSeaTunnelServer().getCoordinatorService();
+
+            if (isStopWithSavePoint) {
+                coordinatorService.savePoint(jobId);
+            } else {
+                coordinatorService.cancelJob(jobId);
+            }
         }
 
         this.prepareResponse(
@@ -215,11 +224,10 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
         return requestBodyJsonNode;
     }
 
-    private Long submitJob(
+    private void submitJob(
             SeaTunnelServer seaTunnelServer,
             JobImmutableInformation jobImmutableInformation,
-            JobConfig jobConfig,
-            RestJobExecutionEnvironment restJobExecutionEnvironment) {
+            JobConfig jobConfig) {
         CoordinatorService coordinatorService = 
seaTunnelServer.getCoordinatorService();
         Data data =
                 textCommandService
@@ -231,7 +239,5 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
                 coordinatorService.submitJob(
                         Long.parseLong(jobConfig.getJobContext().getJobId()), 
data);
         voidPassiveCompletableFuture.join();
-
-        return restJobExecutionEnvironment.getJobId();
     }
 }

Reply via email to