This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 12ed49133d [Feature][ZETA REST API] Can submit job with rest api to 
Zeta master node automaticly (#5950)
12ed49133d is described below

commit 12ed49133dc8d4a50b60eb5c9f4c66dcfb09c254
Author: Guangdong Liu <[email protected]>
AuthorDate: Wed Dec 13 12:30:58 2023 +0800

    [Feature][ZETA REST API] Can submit job with rest api to Zeta master node 
automaticly (#5950)
---
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java | 486 ++++++++++++---------
 .../server/rest/RestHttpGetCommandProcessor.java   |  23 +-
 .../server/rest/RestHttpPostCommandProcessor.java  |  63 ++-
 3 files changed, 357 insertions(+), 215 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index fedfa4ce25..2f53e9a475 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -36,10 +36,14 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
 import io.restassured.response.Response;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 import static io.restassured.RestAssured.given;
@@ -53,14 +57,19 @@ public class RestApiIT {
 
     private static ClientJobProxy clientJobProxy;
 
-    private static HazelcastInstanceImpl hazelcastInstance;
+    private static HazelcastInstanceImpl node1;
+
+    private static HazelcastInstanceImpl node2;
 
     @BeforeEach
     void beforeClass() throws Exception {
         String testClusterName = TestUtils.getClusterName("RestApiIT");
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
         seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
-        hazelcastInstance = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+        node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+        node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
         String filePath = 
TestUtils.getResource("stream_fakesource_to_file.conf");
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("fake_to_file");
@@ -83,242 +92,317 @@ public class RestApiIT {
 
     @Test
     public void testGetRunningJobById() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOB_URL
-                                + "/"
-                                + clientJobProxy.getJobId())
-                .then()
-                .statusCode(200)
-                .body("jobName", equalTo("fake_to_file"))
-                .body("jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            given().get(
+                                            HOST
+                                                    + instance.getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOB_URL
+                                                    + "/"
+                                                    + 
clientJobProxy.getJobId())
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobName", equalTo("fake_to_file"))
+                                    .body("jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testGetRunningJobs() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOBS_URL)
-                .then()
-                .statusCode(200)
-                .body("[0].jobName", equalTo("fake_to_file"))
-                .body("[0].jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            given().get(
+                                            HOST
+                                                    + instance.getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOBS_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("[0].jobName", 
equalTo("fake_to_file"))
+                                    .body("[0].jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testSystemMonitoringInformation() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.SYSTEM_MONITORING_INFORMATION)
-                .then()
-                .assertThat()
-                .time(lessThan(5000L))
-                .statusCode(200);
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            given().get(
+                                            HOST
+                                                    + instance.getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.SYSTEM_MONITORING_INFORMATION)
+                                    .then()
+                                    .assertThat()
+                                    .time(lessThan(5000L))
+                                    .statusCode(200);
+                        });
     }
 
     @Test
     public void testSubmitJob() {
-        Response response = submitJob("BATCH");
-        response.then().statusCode(200).body("jobName", equalTo("test测试"));
-        String jobId = response.getBody().jsonPath().getString("jobId");
-        SeaTunnelServer seaTunnelServer =
-                (SeaTunnelServer)
-                        hazelcastInstance
-                                .node
-                                .getNodeExtension()
-                                .createExtensionServices()
-                                .get(Constant.SEATUNNEL_SERVICE_NAME);
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.RUNNING,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.FINISHED,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            Response response = submitJob(instance, "BATCH");
+                            response.then().statusCode(200).body("jobName", 
equalTo("test测试"));
+                            String jobId = 
response.getBody().jsonPath().getString("jobId");
+                            SeaTunnelServer seaTunnelServer = null;
+
+                            for (HazelcastInstance hazelcastInstance :
+                                    Hazelcast.getAllHazelcastInstances()) {
+                                SeaTunnelServer server =
+                                        (SeaTunnelServer)
+                                                ((HazelcastInstanceProxy) 
hazelcastInstance)
+                                                        .getOriginal()
+                                                        .node
+                                                        .getNodeExtension()
+                                                        
.createExtensionServices()
+                                                        
.get(Constant.SEATUNNEL_SERVICE_NAME);
+
+                                if (server.isMasterNode()) {
+                                    seaTunnelServer = server;
+                                }
+                            }
+
+                            SeaTunnelServer finalSeaTunnelServer = 
seaTunnelServer;
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    Assertions.assertEquals(
+                                                            JobStatus.FINISHED,
+                                                            
finalSeaTunnelServer
+                                                                    
.getCoordinatorService()
+                                                                    
.getJobStatus(
+                                                                            
Long.parseLong(
+                                                                               
     jobId))));
+                        });
     }
 
     @Test
     public void testStopJob() {
-        String jobId = 
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
-        SeaTunnelServer seaTunnelServer =
-                (SeaTunnelServer)
-                        hazelcastInstance
-                                .node
-                                .getNodeExtension()
-                                .createExtensionServices()
-                                .get(Constant.SEATUNNEL_SERVICE_NAME);
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.RUNNING,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
-
-        String parameters = "{" + "\"jobId\":" + jobId + "," + 
"\"isStopWithSavePoint\":true}";
-
-        given().body(parameters)
-                .post(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.STOP_JOB_URL)
-                .then()
-                .statusCode(200)
-                .body("jobId", equalTo(jobId));
 
-        Awaitility.await()
-                .atMost(6, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.SAVEPOINT_DONE,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            String jobId =
+                                    submitJob(instance, "STREAMING")
+                                            .getBody()
+                                            .jsonPath()
+                                            .getString("jobId");
+                            SeaTunnelServer seaTunnelServer = null;
 
-        String jobId2 = 
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
+                            for (HazelcastInstance hazelcastInstance :
+                                    Hazelcast.getAllHazelcastInstances()) {
+                                SeaTunnelServer server =
+                                        (SeaTunnelServer)
+                                                ((HazelcastInstanceProxy) 
hazelcastInstance)
+                                                        .getOriginal()
+                                                        .node
+                                                        .getNodeExtension()
+                                                        
.createExtensionServices()
+                                                        
.get(Constant.SEATUNNEL_SERVICE_NAME);
 
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.RUNNING,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId2))));
-        parameters = "{" + "\"jobId\":" + jobId2 + "," + 
"\"isStopWithSavePoint\":false}";
-
-        given().body(parameters)
-                .post(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.STOP_JOB_URL)
-                .then()
-                .statusCode(200)
-                .body("jobId", equalTo(jobId2));
+                                if (server.isMasterNode()) {
+                                    seaTunnelServer = server;
+                                }
+                            }
 
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.CANCELED,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId2))));
+                            SeaTunnelServer finalSeaTunnelServer = 
seaTunnelServer;
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    Assertions.assertEquals(
+                                                            JobStatus.RUNNING,
+                                                            
finalSeaTunnelServer
+                                                                    
.getCoordinatorService()
+                                                                    
.getJobStatus(
+                                                                            
Long.parseLong(
+                                                                               
     jobId))));
+
+                            String parameters =
+                                    "{"
+                                            + "\"jobId\":"
+                                            + jobId
+                                            + ","
+                                            + "\"isStopWithSavePoint\":true}";
+
+                            given().body(parameters)
+                                    .post(
+                                            HOST
+                                                    + instance.getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.STOP_JOB_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobId", equalTo(jobId));
+
+                            Awaitility.await()
+                                    .atMost(6, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    Assertions.assertEquals(
+                                                            
JobStatus.SAVEPOINT_DONE,
+                                                            
finalSeaTunnelServer
+                                                                    
.getCoordinatorService()
+                                                                    
.getJobStatus(
+                                                                            
Long.parseLong(
+                                                                               
     jobId))));
+
+                            String jobId2 =
+                                    submitJob(instance, "STREAMING")
+                                            .getBody()
+                                            .jsonPath()
+                                            .getString("jobId");
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    Assertions.assertEquals(
+                                                            JobStatus.RUNNING,
+                                                            
finalSeaTunnelServer
+                                                                    
.getCoordinatorService()
+                                                                    
.getJobStatus(
+                                                                            
Long.parseLong(
+                                                                               
     jobId2))));
+                            parameters =
+                                    "{"
+                                            + "\"jobId\":"
+                                            + jobId2
+                                            + ","
+                                            + "\"isStopWithSavePoint\":false}";
+
+                            given().body(parameters)
+                                    .post(
+                                            HOST
+                                                    + instance.getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.STOP_JOB_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobId", equalTo(jobId2));
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    Assertions.assertEquals(
+                                                            JobStatus.CANCELED,
+                                                            
finalSeaTunnelServer
+                                                                    
.getCoordinatorService()
+                                                                    
.getJobStatus(
+                                                                            
Long.parseLong(
+                                                                               
     jobId2))));
+                        });
     }
 
     @Test
     public void testStartWithSavePointWithoutJobId() {
-        Response response = submitJob("BATCH", true);
-        response.then()
-                .statusCode(400)
-                .body("message", equalTo("Please provide jobId when start with 
save point."));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            Response response = submitJob("BATCH", instance, 
true);
+                            response.then()
+                                    .statusCode(400)
+                                    .body(
+                                            "message",
+                                            equalTo(
+                                                    "Please provide jobId when 
start with save point."));
+                        });
     }
 
     @Test
     public void testEncryptConfig() {
-        String config =
-                "{\n"
-                        + "    \"env\": {\n"
-                        + "        \"execution.parallelism\": 1,\n"
-                        + "        \"shade.identifier\":\"base64\"\n"
-                        + "    },\n"
-                        + "    \"source\": [\n"
-                        + "        {\n"
-                        + "            \"plugin_name\": \"MySQL-CDC\",\n"
-                        + "            \"schema\" : {\n"
-                        + "                \"fields\": {\n"
-                        + "                    \"name\": \"string\",\n"
-                        + "                    \"age\": \"int\"\n"
-                        + "                }\n"
-                        + "            },\n"
-                        + "            \"result_table_name\": \"fake\",\n"
-                        + "            \"parallelism\": 1,\n"
-                        + "            \"hostname\": \"127.0.0.1\",\n"
-                        + "            \"username\": \"seatunnel\",\n"
-                        + "            \"password\": \"seatunnel_password\",\n"
-                        + "            \"table-name\": \"inventory_vwyw0n\"\n"
-                        + "        }\n"
-                        + "    ],\n"
-                        + "    \"transform\": [\n"
-                        + "    ],\n"
-                        + "    \"sink\": [\n"
-                        + "        {\n"
-                        + "            \"plugin_name\": \"Clickhouse\",\n"
-                        + "            \"host\": \"localhost:8123\",\n"
-                        + "            \"database\": \"default\",\n"
-                        + "            \"table\": \"fake_all\",\n"
-                        + "            \"username\": \"seatunnel\",\n"
-                        + "            \"password\": \"seatunnel_password\"\n"
-                        + "        }\n"
-                        + "    ]\n"
-                        + "}";
-        given().body(config)
-                .post(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.ENCRYPT_CONFIG)
-                .then()
-                .statusCode(200)
-                .body("source[0].result_table_name", equalTo("fake"))
-                .body("source[0].username", equalTo("c2VhdHVubmVs"))
-                .body("source[0].password", 
equalTo("c2VhdHVubmVsX3Bhc3N3b3Jk"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            String config =
+                                    "{\n"
+                                            + "    \"env\": {\n"
+                                            + "        
\"execution.parallelism\": 1,\n"
+                                            + "        
\"shade.identifier\":\"base64\"\n"
+                                            + "    },\n"
+                                            + "    \"source\": [\n"
+                                            + "        {\n"
+                                            + "            \"plugin_name\": 
\"MySQL-CDC\",\n"
+                                            + "            \"schema\" : {\n"
+                                            + "                \"fields\": {\n"
+                                            + "                    \"name\": 
\"string\",\n"
+                                            + "                    \"age\": 
\"int\"\n"
+                                            + "                }\n"
+                                            + "            },\n"
+                                            + "            
\"result_table_name\": \"fake\",\n"
+                                            + "            \"parallelism\": 
1,\n"
+                                            + "            \"hostname\": 
\"127.0.0.1\",\n"
+                                            + "            \"username\": 
\"seatunnel\",\n"
+                                            + "            \"password\": 
\"seatunnel_password\",\n"
+                                            + "            \"table-name\": 
\"inventory_vwyw0n\"\n"
+                                            + "        }\n"
+                                            + "    ],\n"
+                                            + "    \"transform\": [\n"
+                                            + "    ],\n"
+                                            + "    \"sink\": [\n"
+                                            + "        {\n"
+                                            + "            \"plugin_name\": 
\"Clickhouse\",\n"
+                                            + "            \"host\": 
\"localhost:8123\",\n"
+                                            + "            \"database\": 
\"default\",\n"
+                                            + "            \"table\": 
\"fake_all\",\n"
+                                            + "            \"username\": 
\"seatunnel\",\n"
+                                            + "            \"password\": 
\"seatunnel_password\"\n"
+                                            + "        }\n"
+                                            + "    ]\n"
+                                            + "}";
+                            given().body(config)
+                                    .post(
+                                            HOST
+                                                    + instance.getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.ENCRYPT_CONFIG)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("source[0].result_table_name", 
equalTo("fake"))
+                                    .body("source[0].username", 
equalTo("c2VhdHVubmVs"))
+                                    .body(
+                                            "source[0].password",
+                                            
equalTo("c2VhdHVubmVsX3Bhc3N3b3Jk"));
+                        });
     }
 
     @AfterEach
     void afterClass() {
-        if (hazelcastInstance != null) {
-            hazelcastInstance.shutdown();
+        if (node1 != null) {
+            node1.shutdown();
+        }
+        if (node2 != null) {
+            node2.shutdown();
         }
     }
 
-    private Response submitJob(String jobMode) {
-        return submitJob(jobMode, false);
+    private Response submitJob(HazelcastInstanceImpl hazelcastInstance, String 
jobMode) {
+        return submitJob(jobMode, hazelcastInstance, false);
     }
 
-    private Response submitJob(String jobMode, boolean isStartWithSavePoint) {
+    private Response submitJob(
+            String jobMode, HazelcastInstanceImpl hazelcastInstance, boolean 
isStartWithSavePoint) {
         String requestBody =
                 "{\n"
                         + "    \"env\": {\n"
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 484482322c..6ee5b46e83 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -35,6 +35,9 @@ import 
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.cluster.Cluster;
 import com.hazelcast.cluster.Member;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
 import com.hazelcast.internal.ascii.TextCommandService;
 import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
 import com.hazelcast.internal.ascii.rest.HttpGetCommand;
@@ -211,7 +214,25 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
     private SeaTunnelServer getSeaTunnelServer() {
         Map<String, Object> extensionServices =
                 
this.textCommandService.getNode().getNodeExtension().createExtensionServices();
-        return (SeaTunnelServer) 
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+        SeaTunnelServer seaTunnelServer =
+                (SeaTunnelServer) 
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+        if (!seaTunnelServer.isMasterNode()) {
+            for (HazelcastInstance hazelcastInstance : 
Hazelcast.getAllHazelcastInstances()) {
+                SeaTunnelServer seaTunnelServer1 =
+                        (SeaTunnelServer)
+                                ((HazelcastInstanceProxy) hazelcastInstance)
+                                        .getOriginal()
+                                        .node
+                                        .getNodeExtension()
+                                        .createExtensionServices()
+                                        .get(Constant.SEATUNNEL_SERVICE_NAME);
+
+                if (seaTunnelServer1.isMasterNode()) {
+                    return seaTunnelServer1;
+                }
+            }
+        }
+        return seaTunnelServer;
     }
 
     private JsonObject convertToJson(JobInfo jobInfo, long jobId) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index 02158bbf34..9aa1e8c2e7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -32,6 +32,9 @@ import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor;
 import org.apache.seatunnel.engine.server.utils.RestUtil;
 
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
 import com.hazelcast.internal.ascii.TextCommandService;
 import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
 import com.hazelcast.internal.ascii.rest.HttpPostCommand;
@@ -92,7 +95,25 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
     private SeaTunnelServer getSeaTunnelServer() {
         Map<String, Object> extensionServices =
                 
this.textCommandService.getNode().getNodeExtension().createExtensionServices();
-        return (SeaTunnelServer) 
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+        SeaTunnelServer seaTunnelServer =
+                (SeaTunnelServer) 
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+        if (!seaTunnelServer.isMasterNode()) {
+            for (HazelcastInstance hazelcastInstance : 
Hazelcast.getAllHazelcastInstances()) {
+                seaTunnelServer =
+                        (SeaTunnelServer)
+                                ((HazelcastInstanceProxy) hazelcastInstance)
+                                        .getOriginal()
+                                        .node
+                                        .getNodeExtension()
+                                        .createExtensionServices()
+                                        .get(Constant.SEATUNNEL_SERVICE_NAME);
+
+                if (seaTunnelServer.isMasterNode()) {
+                    return seaTunnelServer;
+                }
+            }
+        }
+        return seaTunnelServer;
     }
 
     private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
@@ -115,19 +136,15 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
                                 ? 
Long.parseLong(requestParams.get(RestConstant.JOB_ID))
                                 : null);
         JobImmutableInformation jobImmutableInformation = 
restJobExecutionEnvironment.build();
-        CoordinatorService coordinatorService = 
getSeaTunnelServer().getCoordinatorService();
-        Data data =
-                textCommandService
-                        .getNode()
-                        .nodeEngine
-                        .getSerializationService()
-                        .toData(jobImmutableInformation);
-        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
-                coordinatorService.submitJob(
-                        Long.parseLong(jobConfig.getJobContext().getJobId()), 
data);
-        voidPassiveCompletableFuture.join();
 
-        Long jobId = restJobExecutionEnvironment.getJobId();
+        SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
+        Long jobId =
+                submitJob(
+                        seaTunnelServer,
+                        jobImmutableInformation,
+                        jobConfig,
+                        restJobExecutionEnvironment);
+
         this.prepareResponse(
                 httpPostCommand,
                 new JsonObject()
@@ -187,4 +204,24 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
         }
         return requestBodyJsonNode;
     }
+
+    private Long submitJob(
+            SeaTunnelServer seaTunnelServer,
+            JobImmutableInformation jobImmutableInformation,
+            JobConfig jobConfig,
+            RestJobExecutionEnvironment restJobExecutionEnvironment) {
+        CoordinatorService coordinatorService = 
seaTunnelServer.getCoordinatorService();
+        Data data =
+                textCommandService
+                        .getNode()
+                        .nodeEngine
+                        .getSerializationService()
+                        .toData(jobImmutableInformation);
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+                coordinatorService.submitJob(
+                        Long.parseLong(jobConfig.getJobContext().getJobId()), 
data);
+        voidPassiveCompletableFuture.join();
+
+        return restJobExecutionEnvironment.getJobId();
+    }
 }


Reply via email to