This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b4ef06f08e [CI] Fix engine client not close  (#6241)
b4ef06f08e is described below

commit b4ef06f08eda258e9bbc19a2d75467600a0f78eb
Author: Eric <[email protected]>
AuthorDate: Thu Jan 18 17:19:58 2024 +0800

    [CI] Fix engine client not close  (#6241)
---
 .../engine/e2e/ClusterFaultToleranceIT.java        |  16 +-
 .../e2e/ClusterFaultToleranceTwoPipelineIT.java    |  12 +-
 .../org/apache/seatunnel/engine/e2e/ClusterIT.java |   2 +-
 .../seatunnel/engine/e2e/JobExecutionIT.java       | 196 +++++++++++----------
 .../seatunnel/engine/e2e/SeaTunnelSlotIT.java      |   4 +-
 .../apache/seatunnel/engine/e2e/TextHeaderIT.java  |   2 +-
 .../seatunnel/engine/client/SeaTunnelClient.java   |   6 +-
 7 files changed, 120 insertions(+), 118 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index 747dee9478..7724b88c9d 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -138,7 +138,7 @@ public class ClusterFaultToleranceIT {
             log.warn("========================clean test 
resource====================");
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -271,7 +271,7 @@ public class ClusterFaultToleranceIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -370,7 +370,7 @@ public class ClusterFaultToleranceIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -488,7 +488,7 @@ public class ClusterFaultToleranceIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -590,7 +590,7 @@ public class ClusterFaultToleranceIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -709,7 +709,7 @@ public class ClusterFaultToleranceIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -916,7 +916,7 @@ public class ClusterFaultToleranceIT {
             log.warn(
                     "==========================================Clean test 
resource ========================================");
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -1134,7 +1134,7 @@ public class ClusterFaultToleranceIT {
             log.info(
                     "==========================================Clean test 
resource ========================================");
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index d9b8ddcedf..67352f9d91 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -143,7 +143,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
             Assertions.assertEquals(testRowNumber * testParallelism * 2, 
fileLineNumberFromDir);
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -285,7 +285,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -392,7 +392,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
             Assertions.assertEquals(testRowNumber * testParallelism * 2, 
fileLineNumberFromDir);
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -528,7 +528,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -638,7 +638,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -764,7 +764,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
index 5ff1b3005c..ced1065731 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
@@ -77,7 +77,7 @@ public class ClusterIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 0dc4d7ba12..f67dd22385 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -64,11 +64,11 @@ public class JobExecutionIT {
     public void testSayHello() {
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-
-        String msg = "Hello world";
-        String s = engineClient.printMessageToMaster(msg);
-        Assertions.assertEquals(msg, s);
+        try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) 
{
+            String msg = "Hello world";
+            String s = engineClient.printMessageToMaster(msg);
+            Assertions.assertEquals(msg, s);
+        }
     }
 
     @Test
@@ -80,22 +80,23 @@ public class JobExecutionIT {
 
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        ClientJobExecutionEnvironment jobExecutionEnv =
-                engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
-
-        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-
-        CompletableFuture<JobStatus> objectCompletableFuture =
-                
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
-
-        await().atMost(600000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertTrue(
-                                        objectCompletableFuture.isDone()
-                                                && JobStatus.FINISHED.equals(
-                                                        
objectCompletableFuture.get())));
+        try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) 
{
+            ClientJobExecutionEnvironment jobExecutionEnv =
+                    engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
+
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+            CompletableFuture<JobStatus> objectCompletableFuture =
+                    
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+
+            await().atMost(600000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertTrue(
+                                            objectCompletableFuture.isDone()
+                                                    && 
JobStatus.FINISHED.equals(
+                                                            
objectCompletableFuture.get())));
+        }
     }
 
     @Test
@@ -107,25 +108,26 @@ public class JobExecutionIT {
 
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        ClientJobExecutionEnvironment jobExecutionEnv =
-                engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
-
-        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-        JobStatus jobStatus1 = clientJobProxy.getJobStatus();
-        Assertions.assertFalse(jobStatus1.isEndState());
-        CompletableFuture<JobStatus> objectCompletableFuture =
-                
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
-        Thread.sleep(1000);
-        clientJobProxy.cancelJob();
-
-        await().atMost(20000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertTrue(
-                                        objectCompletableFuture.isDone()
-                                                && JobStatus.CANCELED.equals(
-                                                        
objectCompletableFuture.get())));
+        try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) 
{
+            ClientJobExecutionEnvironment jobExecutionEnv =
+                    engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
+
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            JobStatus jobStatus1 = clientJobProxy.getJobStatus();
+            Assertions.assertFalse(jobStatus1.isEndState());
+            CompletableFuture<JobStatus> objectCompletableFuture =
+                    
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+            Thread.sleep(1000);
+            clientJobProxy.cancelJob();
+
+            await().atMost(20000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertTrue(
+                                            objectCompletableFuture.isDone()
+                                                    && 
JobStatus.CANCELED.equals(
+                                                            
objectCompletableFuture.get())));
+        }
     }
 
     @Test
@@ -136,18 +138,19 @@ public class JobExecutionIT {
         jobConfig.setName("fake_to_console_error");
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        ClientJobExecutionEnvironment jobExecutionEnv =
-                engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
-        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-        CompletableFuture<JobStatus> completableFuture =
-                
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
-        await().atMost(600000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> 
Assertions.assertTrue(completableFuture.isDone()));
-
-        JobResult result = clientJobProxy.getJobResultCache();
-        Assertions.assertEquals(result.getStatus(), JobStatus.FAILED);
-        
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
+        try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) 
{
+            ClientJobExecutionEnvironment jobExecutionEnv =
+                    engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            CompletableFuture<JobStatus> completableFuture =
+                    
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+            await().atMost(600000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(() -> 
Assertions.assertTrue(completableFuture.isDone()));
+
+            JobResult result = clientJobProxy.getJobResultCache();
+            Assertions.assertEquals(result.getStatus(), JobStatus.FAILED);
+            
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
+        }
     }
 
     @Test
@@ -155,21 +158,22 @@ public class JobExecutionIT {
 
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-
-        ClientJobProxy newClientJobProxy =
-                
engineClient.createJobClient().getJobProxy(System.currentTimeMillis());
-        CompletableFuture<JobStatus> waitForJobCompleteFuture =
-                
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);
-
-        await().atMost(20000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.UNKNOWABLE, 
waitForJobCompleteFuture.get()));
-
-        Assertions.assertEquals(
-                "UNKNOWABLE", 
engineClient.getJobClient().getJobStatus(System.currentTimeMillis()));
+        try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) 
{
+            ClientJobProxy newClientJobProxy =
+                    
engineClient.createJobClient().getJobProxy(System.currentTimeMillis());
+            CompletableFuture<JobStatus> waitForJobCompleteFuture =
+                    
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);
+
+            await().atMost(20000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            JobStatus.UNKNOWABLE, 
waitForJobCompleteFuture.get()));
+
+            Assertions.assertEquals(
+                    "UNKNOWABLE",
+                    
engineClient.getJobClient().getJobStatus(System.currentTimeMillis()));
+        }
     }
 
     @Test
@@ -181,18 +185,19 @@ public class JobExecutionIT {
 
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        ClientJobExecutionEnvironment jobExecutionEnv =
-                engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
-
-        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-
-        Assertions.assertEquals(clientJobProxy.waitForJobComplete(), 
JobStatus.FINISHED);
-        await().atMost(65, TimeUnit.SECONDS)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.UNKNOWABLE, 
clientJobProxy.getJobStatus()));
+        try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) 
{
+            ClientJobExecutionEnvironment jobExecutionEnv =
+                    engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
+
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+            Assertions.assertEquals(clientJobProxy.waitForJobComplete(), 
JobStatus.FINISHED);
+            await().atMost(65, TimeUnit.SECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            JobStatus.UNKNOWABLE, 
clientJobProxy.getJobStatus()));
+        }
     }
 
     @AfterEach
@@ -211,21 +216,22 @@ public class JobExecutionIT {
 
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        ClientJobExecutionEnvironment jobExecutionEnv =
-                engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
-
-        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-
-        CompletableFuture<JobStatus> objectCompletableFuture =
-                
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
-
-        await().atMost(600000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertTrue(
-                                        objectCompletableFuture.isDone()
-                                                && JobStatus.FAILED.equals(
-                                                        
objectCompletableFuture.get())));
+        try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) 
{
+            ClientJobExecutionEnvironment jobExecutionEnv =
+                    engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
+
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+            CompletableFuture<JobStatus> objectCompletableFuture =
+                    
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+
+            await().atMost(600000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertTrue(
+                                            objectCompletableFuture.isDone()
+                                                    && JobStatus.FAILED.equals(
+                                                            
objectCompletableFuture.get())));
+        }
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
index 8f7b459c48..33a7be8914 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
@@ -82,7 +82,7 @@ public class SeaTunnelSlotIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
@@ -134,7 +134,7 @@ public class SeaTunnelSlotIT {
 
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
 
             if (node1 != null) {
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
index 09a0cc472a..ca9851eb71 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
@@ -161,7 +161,7 @@ public class TextHeaderIT {
             log.info("========================clean test 
resource====================");
         } finally {
             if (engineClient != null) {
-                engineClient.shutdown();
+                engineClient.close();
             }
             if (node1 != null) {
                 node1.shutdown();
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 3bf13e1d5f..22aa0ffd13 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -39,7 +39,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
-public class SeaTunnelClient implements SeaTunnelClientInstance {
+public class SeaTunnelClient implements SeaTunnelClientInstance, AutoCloseable 
{
     private final SeaTunnelHazelcastClient hazelcastClient;
     @Getter private final JobClient jobClient;
 
@@ -87,10 +87,6 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance {
                 SeaTunnelPrintMessageCodec::decodeResponse);
     }
 
-    public void shutdown() {
-        hazelcastClient.shutdown();
-    }
-
     /**
      * get job status and the tasks status
      *

Reply via email to