Hisoka-X commented on code in PR #5950:
URL: https://github.com/apache/seatunnel/pull/5950#discussion_r1418212785


##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
 
     @Test
     public void testGetRunningJobById() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOB_URL
-                                + "/"
-                                + clientJobProxy.getJobId())
-                .then()
-                .statusCode(200)
-                .body("jobName", equalTo("fake_to_file"))
-                .body("jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {

Review Comment:
   ```suggestion
                           instance -> {
   ```



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
 
     @Test
     public void testGetRunningJobById() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOB_URL
-                                + "/"
-                                + clientJobProxy.getJobId())
-                .then()
-                .statusCode(200)
-                .body("jobName", equalTo("fake_to_file"))
-                .body("jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOB_URL
+                                                    + "/"
+                                                    + 
clientJobProxy.getJobId())
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobName", equalTo("fake_to_file"))
+                                    .body("jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testGetRunningJobs() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOBS_URL)
-                .then()
-                .statusCode(200)
-                .body("[0].jobName", equalTo("fake_to_file"))
-                .body("[0].jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {

Review Comment:
   ditto



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java:
##########
@@ -92,7 +95,25 @@ public void handle(HttpPostCommand httpPostCommand) {
     private SeaTunnelServer getSeaTunnelServer() {
         Map<String, Object> extensionServices =
                 
this.textCommandService.getNode().getNodeExtension().createExtensionServices();
-        return (SeaTunnelServer) 
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+        SeaTunnelServer seaTunnelServer =
+                (SeaTunnelServer) 
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+        if (!seaTunnelServer.isMasterNode()) {
+            for (HazelcastInstance hazelcastInstance : 
Hazelcast.getAllHazelcastInstances()) {
+                SeaTunnelServer seaTunnelServer1 =

Review Comment:
   ditto



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
 
     @Test
     public void testGetRunningJobById() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOB_URL
-                                + "/"
-                                + clientJobProxy.getJobId())
-                .then()
-                .statusCode(200)
-                .body("jobName", equalTo("fake_to_file"))
-                .body("jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOB_URL
+                                                    + "/"
+                                                    + 
clientJobProxy.getJobId())
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobName", equalTo("fake_to_file"))
+                                    .body("jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testGetRunningJobs() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOBS_URL)
-                .then()
-                .statusCode(200)
-                .body("[0].jobName", equalTo("fake_to_file"))
-                .body("[0].jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOBS_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("[0].jobName", 
equalTo("fake_to_file"))
+                                    .body("[0].jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testSystemMonitoringInformation() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.SYSTEM_MONITORING_INFORMATION)
-                .then()
-                .assertThat()
-                .time(lessThan(5000L))
-                .statusCode(200);
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.SYSTEM_MONITORING_INFORMATION)
+                                    .then()
+                                    .assertThat()
+                                    .time(lessThan(5000L))
+                                    .statusCode(200);
+                        });
     }
 
     @Test
     public void testSubmitJob() {
-        Response response = submitJob("BATCH");
-        response.then().statusCode(200).body("jobName", equalTo("test测试"));
-        String jobId = response.getBody().jsonPath().getString("jobId");
-        SeaTunnelServer seaTunnelServer =
-                (SeaTunnelServer)
-                        hazelcastInstance
-                                .node
-                                .getNodeExtension()
-                                .createExtensionServices()
-                                .get(Constant.SEATUNNEL_SERVICE_NAME);
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.RUNNING,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.FINISHED,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            Response response = submitJob(hazelcastInstance2, 
"BATCH");
+                            response.then().statusCode(200).body("jobName", 
equalTo("test测试"));
+                            String jobId = 
response.getBody().jsonPath().getString("jobId");
+                            SeaTunnelServer seaTunnelServer = null;
+
+                            for (HazelcastInstance hazelcastInstance :
+                                    Hazelcast.getAllHazelcastInstances()) {
+                                SeaTunnelServer seaTunnelServer1 =
+                                        (SeaTunnelServer)
+                                                ((HazelcastInstanceProxy) 
hazelcastInstance)
+                                                        .getOriginal()
+                                                        .node
+                                                        .getNodeExtension()
+                                                        
.createExtensionServices()
+                                                        
.get(Constant.SEATUNNEL_SERVICE_NAME);
+
+                                if (seaTunnelServer1.isMasterNode()) {
+                                    seaTunnelServer = seaTunnelServer1;
+                                }
+                            }
+
+                            SeaTunnelServer finalSeaTunnelServer = 
seaTunnelServer;
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    Assertions.assertEquals(
+                                                            JobStatus.FINISHED,
+                                                            
finalSeaTunnelServer
+                                                                    
.getCoordinatorService()
+                                                                    
.getJobStatus(
+                                                                            
Long.parseLong(
+                                                                               
     jobId))));
+                        });
     }
 
     @Test
     public void testStopJob() {
-        String jobId = 
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
-        SeaTunnelServer seaTunnelServer =
-                (SeaTunnelServer)
-                        hazelcastInstance
-                                .node
-                                .getNodeExtension()
-                                .createExtensionServices()
-                                .get(Constant.SEATUNNEL_SERVICE_NAME);
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.RUNNING,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
-
-        String parameters = "{" + "\"jobId\":" + jobId + "," + 
"\"isStopWithSavePoint\":true}";
-
-        given().body(parameters)
-                .post(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.STOP_JOB_URL)
-                .then()
-                .statusCode(200)
-                .body("jobId", equalTo(jobId));
 
-        Awaitility.await()
-                .atMost(6, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.FINISHED,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            String jobId =
+                                    submitJob(hazelcastInstance2, "STREAMING")
+                                            .getBody()
+                                            .jsonPath()
+                                            .getString("jobId");
+                            SeaTunnelServer seaTunnelServer = null;
 
-        String jobId2 = 
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
+                            for (HazelcastInstance hazelcastInstance :
+                                    Hazelcast.getAllHazelcastInstances()) {
+                                SeaTunnelServer seaTunnelServer1 =

Review Comment:
   ditto



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
 
     @Test
     public void testGetRunningJobById() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOB_URL
-                                + "/"
-                                + clientJobProxy.getJobId())
-                .then()
-                .statusCode(200)
-                .body("jobName", equalTo("fake_to_file"))
-                .body("jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOB_URL
+                                                    + "/"
+                                                    + 
clientJobProxy.getJobId())
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobName", equalTo("fake_to_file"))
+                                    .body("jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testGetRunningJobs() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOBS_URL)
-                .then()
-                .statusCode(200)
-                .body("[0].jobName", equalTo("fake_to_file"))
-                .body("[0].jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOBS_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("[0].jobName", 
equalTo("fake_to_file"))
+                                    .body("[0].jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testSystemMonitoringInformation() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.SYSTEM_MONITORING_INFORMATION)
-                .then()
-                .assertThat()
-                .time(lessThan(5000L))
-                .statusCode(200);
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {

Review Comment:
   ditto



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
 
     @Test
     public void testGetRunningJobById() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOB_URL
-                                + "/"
-                                + clientJobProxy.getJobId())
-                .then()
-                .statusCode(200)
-                .body("jobName", equalTo("fake_to_file"))
-                .body("jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOB_URL
+                                                    + "/"
+                                                    + 
clientJobProxy.getJobId())
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobName", equalTo("fake_to_file"))
+                                    .body("jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testGetRunningJobs() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOBS_URL)
-                .then()
-                .statusCode(200)
-                .body("[0].jobName", equalTo("fake_to_file"))
-                .body("[0].jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOBS_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("[0].jobName", 
equalTo("fake_to_file"))
+                                    .body("[0].jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testSystemMonitoringInformation() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.SYSTEM_MONITORING_INFORMATION)
-                .then()
-                .assertThat()
-                .time(lessThan(5000L))
-                .statusCode(200);
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.SYSTEM_MONITORING_INFORMATION)
+                                    .then()
+                                    .assertThat()
+                                    .time(lessThan(5000L))
+                                    .statusCode(200);
+                        });
     }
 
     @Test
     public void testSubmitJob() {
-        Response response = submitJob("BATCH");
-        response.then().statusCode(200).body("jobName", equalTo("test测试"));
-        String jobId = response.getBody().jsonPath().getString("jobId");
-        SeaTunnelServer seaTunnelServer =
-                (SeaTunnelServer)
-                        hazelcastInstance
-                                .node
-                                .getNodeExtension()
-                                .createExtensionServices()
-                                .get(Constant.SEATUNNEL_SERVICE_NAME);
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.RUNNING,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.FINISHED,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            Response response = submitJob(hazelcastInstance2, 
"BATCH");
+                            response.then().statusCode(200).body("jobName", 
equalTo("test测试"));
+                            String jobId = 
response.getBody().jsonPath().getString("jobId");
+                            SeaTunnelServer seaTunnelServer = null;
+
+                            for (HazelcastInstance hazelcastInstance :
+                                    Hazelcast.getAllHazelcastInstances()) {
+                                SeaTunnelServer seaTunnelServer1 =
+                                        (SeaTunnelServer)
+                                                ((HazelcastInstanceProxy) 
hazelcastInstance)
+                                                        .getOriginal()
+                                                        .node
+                                                        .getNodeExtension()
+                                                        
.createExtensionServices()
+                                                        
.get(Constant.SEATUNNEL_SERVICE_NAME);
+
+                                if (seaTunnelServer1.isMasterNode()) {
+                                    seaTunnelServer = seaTunnelServer1;
+                                }
+                            }
+
+                            SeaTunnelServer finalSeaTunnelServer = 
seaTunnelServer;
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    Assertions.assertEquals(
+                                                            JobStatus.FINISHED,
+                                                            
finalSeaTunnelServer
+                                                                    
.getCoordinatorService()
+                                                                    
.getJobStatus(
+                                                                            
Long.parseLong(
+                                                                               
     jobId))));
+                        });
     }
 
     @Test
     public void testStopJob() {
-        String jobId = 
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
-        SeaTunnelServer seaTunnelServer =
-                (SeaTunnelServer)
-                        hazelcastInstance
-                                .node
-                                .getNodeExtension()
-                                .createExtensionServices()
-                                .get(Constant.SEATUNNEL_SERVICE_NAME);
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.RUNNING,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
-
-        String parameters = "{" + "\"jobId\":" + jobId + "," + 
"\"isStopWithSavePoint\":true}";
-
-        given().body(parameters)
-                .post(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.STOP_JOB_URL)
-                .then()
-                .statusCode(200)
-                .body("jobId", equalTo(jobId));
 
-        Awaitility.await()
-                .atMost(6, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.FINISHED,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {

Review Comment:
   ditto



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
 
     @Test
     public void testGetRunningJobById() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOB_URL
-                                + "/"
-                                + clientJobProxy.getJobId())
-                .then()
-                .statusCode(200)
-                .body("jobName", equalTo("fake_to_file"))
-                .body("jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOB_URL
+                                                    + "/"
+                                                    + 
clientJobProxy.getJobId())
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobName", equalTo("fake_to_file"))
+                                    .body("jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testGetRunningJobs() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOBS_URL)
-                .then()
-                .statusCode(200)
-                .body("[0].jobName", equalTo("fake_to_file"))
-                .body("[0].jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOBS_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("[0].jobName", 
equalTo("fake_to_file"))
+                                    .body("[0].jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testSystemMonitoringInformation() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.SYSTEM_MONITORING_INFORMATION)
-                .then()
-                .assertThat()
-                .time(lessThan(5000L))
-                .statusCode(200);
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.SYSTEM_MONITORING_INFORMATION)
+                                    .then()
+                                    .assertThat()
+                                    .time(lessThan(5000L))
+                                    .statusCode(200);
+                        });
     }
 
     @Test
     public void testSubmitJob() {
-        Response response = submitJob("BATCH");
-        response.then().statusCode(200).body("jobName", equalTo("test测试"));
-        String jobId = response.getBody().jsonPath().getString("jobId");
-        SeaTunnelServer seaTunnelServer =
-                (SeaTunnelServer)
-                        hazelcastInstance
-                                .node
-                                .getNodeExtension()
-                                .createExtensionServices()
-                                .get(Constant.SEATUNNEL_SERVICE_NAME);
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.RUNNING,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.FINISHED,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            Response response = submitJob(hazelcastInstance2, 
"BATCH");
+                            response.then().statusCode(200).body("jobName", 
equalTo("test测试"));
+                            String jobId = 
response.getBody().jsonPath().getString("jobId");
+                            SeaTunnelServer seaTunnelServer = null;
+
+                            for (HazelcastInstance hazelcastInstance :
+                                    Hazelcast.getAllHazelcastInstances()) {
+                                SeaTunnelServer seaTunnelServer1 =

Review Comment:
   ```suggestion
                                   SeaTunnelServer server =
   ```



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
 
     @Test
     public void testGetRunningJobById() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOB_URL
-                                + "/"
-                                + clientJobProxy.getJobId())
-                .then()
-                .statusCode(200)
-                .body("jobName", equalTo("fake_to_file"))
-                .body("jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOB_URL
+                                                    + "/"
+                                                    + 
clientJobProxy.getJobId())
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobName", equalTo("fake_to_file"))
+                                    .body("jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testGetRunningJobs() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.RUNNING_JOBS_URL)
-                .then()
-                .statusCode(200)
-                .body("[0].jobName", equalTo("fake_to_file"))
-                .body("[0].jobStatus", equalTo("RUNNING"));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOBS_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("[0].jobName", 
equalTo("fake_to_file"))
+                                    .body("[0].jobStatus", equalTo("RUNNING"));
+                        });
     }
 
     @Test
     public void testSystemMonitoringInformation() {
-        given().get(
-                        HOST
-                                + hazelcastInstance
-                                        .getCluster()
-                                        .getLocalMember()
-                                        .getAddress()
-                                        .getPort()
-                                + RestConstant.SYSTEM_MONITORING_INFORMATION)
-                .then()
-                .assertThat()
-                .time(lessThan(5000L))
-                .statusCode(200);
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {
+                            given().get(
+                                            HOST
+                                                    + hazelcastInstance2
+                                                            .getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.SYSTEM_MONITORING_INFORMATION)
+                                    .then()
+                                    .assertThat()
+                                    .time(lessThan(5000L))
+                                    .statusCode(200);
+                        });
     }
 
     @Test
     public void testSubmitJob() {
-        Response response = submitJob("BATCH");
-        response.then().statusCode(200).body("jobName", equalTo("test测试"));
-        String jobId = response.getBody().jsonPath().getString("jobId");
-        SeaTunnelServer seaTunnelServer =
-                (SeaTunnelServer)
-                        hazelcastInstance
-                                .node
-                                .getNodeExtension()
-                                .createExtensionServices()
-                                .get(Constant.SEATUNNEL_SERVICE_NAME);
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.RUNNING,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.FINISHED,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
+        Arrays.asList(node2, node1)
+                .forEach(
+                        hazelcastInstance2 -> {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to