This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b4ef06f08e [CI] Fix engine client not close (#6241)
b4ef06f08e is described below
commit b4ef06f08eda258e9bbc19a2d75467600a0f78eb
Author: Eric <[email protected]>
AuthorDate: Thu Jan 18 17:19:58 2024 +0800
[CI] Fix engine client not close (#6241)
---
.../engine/e2e/ClusterFaultToleranceIT.java | 16 +-
.../e2e/ClusterFaultToleranceTwoPipelineIT.java | 12 +-
.../org/apache/seatunnel/engine/e2e/ClusterIT.java | 2 +-
.../seatunnel/engine/e2e/JobExecutionIT.java | 196 +++++++++++----------
.../seatunnel/engine/e2e/SeaTunnelSlotIT.java | 4 +-
.../apache/seatunnel/engine/e2e/TextHeaderIT.java | 2 +-
.../seatunnel/engine/client/SeaTunnelClient.java | 6 +-
7 files changed, 120 insertions(+), 118 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index 747dee9478..7724b88c9d 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -138,7 +138,7 @@ public class ClusterFaultToleranceIT {
log.warn("========================clean test
resource====================");
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -271,7 +271,7 @@ public class ClusterFaultToleranceIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -370,7 +370,7 @@ public class ClusterFaultToleranceIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -488,7 +488,7 @@ public class ClusterFaultToleranceIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -590,7 +590,7 @@ public class ClusterFaultToleranceIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -709,7 +709,7 @@ public class ClusterFaultToleranceIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -916,7 +916,7 @@ public class ClusterFaultToleranceIT {
log.warn(
"==========================================Clean test
resource ========================================");
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -1134,7 +1134,7 @@ public class ClusterFaultToleranceIT {
log.info(
"==========================================Clean test
resource ========================================");
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index d9b8ddcedf..67352f9d91 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -143,7 +143,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
Assertions.assertEquals(testRowNumber * testParallelism * 2,
fileLineNumberFromDir);
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -285,7 +285,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -392,7 +392,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
Assertions.assertEquals(testRowNumber * testParallelism * 2,
fileLineNumberFromDir);
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -528,7 +528,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -638,7 +638,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -764,7 +764,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
index 5ff1b3005c..ced1065731 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
@@ -77,7 +77,7 @@ public class ClusterIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 0dc4d7ba12..f67dd22385 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -64,11 +64,11 @@ public class JobExecutionIT {
public void testSayHello() {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-
- String msg = "Hello world";
- String s = engineClient.printMessageToMaster(msg);
- Assertions.assertEquals(msg, s);
+ try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig))
{
+ String msg = "Hello world";
+ String s = engineClient.printMessageToMaster(msg);
+ Assertions.assertEquals(msg, s);
+ }
}
@Test
@@ -80,22 +80,23 @@ public class JobExecutionIT {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- ClientJobExecutionEnvironment jobExecutionEnv =
- engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
-
- final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-
- CompletableFuture<JobStatus> objectCompletableFuture =
-
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
-
- await().atMost(600000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () ->
- Assertions.assertTrue(
- objectCompletableFuture.isDone()
- && JobStatus.FINISHED.equals(
-
objectCompletableFuture.get())));
+ try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig))
{
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
+
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<JobStatus> objectCompletableFuture =
+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+
+ await().atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+ objectCompletableFuture.isDone()
+ &&
JobStatus.FINISHED.equals(
+
objectCompletableFuture.get())));
+ }
}
@Test
@@ -107,25 +108,26 @@ public class JobExecutionIT {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- ClientJobExecutionEnvironment jobExecutionEnv =
- engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
-
- final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- JobStatus jobStatus1 = clientJobProxy.getJobStatus();
- Assertions.assertFalse(jobStatus1.isEndState());
- CompletableFuture<JobStatus> objectCompletableFuture =
-
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
- Thread.sleep(1000);
- clientJobProxy.cancelJob();
-
- await().atMost(20000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () ->
- Assertions.assertTrue(
- objectCompletableFuture.isDone()
- && JobStatus.CANCELED.equals(
-
objectCompletableFuture.get())));
+ try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig))
{
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
+
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ JobStatus jobStatus1 = clientJobProxy.getJobStatus();
+ Assertions.assertFalse(jobStatus1.isEndState());
+ CompletableFuture<JobStatus> objectCompletableFuture =
+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+ Thread.sleep(1000);
+ clientJobProxy.cancelJob();
+
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+ objectCompletableFuture.isDone()
+ &&
JobStatus.CANCELED.equals(
+
objectCompletableFuture.get())));
+ }
}
@Test
@@ -136,18 +138,19 @@ public class JobExecutionIT {
jobConfig.setName("fake_to_console_error");
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- ClientJobExecutionEnvironment jobExecutionEnv =
- engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
- final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- CompletableFuture<JobStatus> completableFuture =
-
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
- await().atMost(600000, TimeUnit.MILLISECONDS)
- .untilAsserted(() ->
Assertions.assertTrue(completableFuture.isDone()));
-
- JobResult result = clientJobProxy.getJobResultCache();
- Assertions.assertEquals(result.getStatus(), JobStatus.FAILED);
-
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
+ try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig))
{
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ CompletableFuture<JobStatus> completableFuture =
+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+ await().atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() ->
Assertions.assertTrue(completableFuture.isDone()));
+
+ JobResult result = clientJobProxy.getJobResultCache();
+ Assertions.assertEquals(result.getStatus(), JobStatus.FAILED);
+
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
+ }
}
@Test
@@ -155,21 +158,22 @@ public class JobExecutionIT {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-
- ClientJobProxy newClientJobProxy =
-
engineClient.createJobClient().getJobProxy(System.currentTimeMillis());
- CompletableFuture<JobStatus> waitForJobCompleteFuture =
-
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);
-
- await().atMost(20000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.UNKNOWABLE,
waitForJobCompleteFuture.get()));
-
- Assertions.assertEquals(
- "UNKNOWABLE",
engineClient.getJobClient().getJobStatus(System.currentTimeMillis()));
+ try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig))
{
+ ClientJobProxy newClientJobProxy =
+
engineClient.createJobClient().getJobProxy(System.currentTimeMillis());
+ CompletableFuture<JobStatus> waitForJobCompleteFuture =
+
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);
+
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.UNKNOWABLE,
waitForJobCompleteFuture.get()));
+
+ Assertions.assertEquals(
+ "UNKNOWABLE",
+
engineClient.getJobClient().getJobStatus(System.currentTimeMillis()));
+ }
}
@Test
@@ -181,18 +185,19 @@ public class JobExecutionIT {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- ClientJobExecutionEnvironment jobExecutionEnv =
- engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
-
- final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-
- Assertions.assertEquals(clientJobProxy.waitForJobComplete(),
JobStatus.FINISHED);
- await().atMost(65, TimeUnit.SECONDS)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.UNKNOWABLE,
clientJobProxy.getJobStatus()));
+ try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig))
{
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
+
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ Assertions.assertEquals(clientJobProxy.waitForJobComplete(),
JobStatus.FINISHED);
+ await().atMost(65, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.UNKNOWABLE,
clientJobProxy.getJobStatus()));
+ }
}
@AfterEach
@@ -211,21 +216,22 @@ public class JobExecutionIT {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- ClientJobExecutionEnvironment jobExecutionEnv =
- engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
-
- final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-
- CompletableFuture<JobStatus> objectCompletableFuture =
-
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
-
- await().atMost(600000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () ->
- Assertions.assertTrue(
- objectCompletableFuture.isDone()
- && JobStatus.FAILED.equals(
-
objectCompletableFuture.get())));
+ try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig))
{
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
+
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<JobStatus> objectCompletableFuture =
+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+
+ await().atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+ objectCompletableFuture.isDone()
+ && JobStatus.FAILED.equals(
+
objectCompletableFuture.get())));
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
index 8f7b459c48..33a7be8914 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
@@ -82,7 +82,7 @@ public class SeaTunnelSlotIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
@@ -134,7 +134,7 @@ public class SeaTunnelSlotIT {
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
index 09a0cc472a..ca9851eb71 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
@@ -161,7 +161,7 @@ public class TextHeaderIT {
log.info("========================clean test
resource====================");
} finally {
if (engineClient != null) {
- engineClient.shutdown();
+ engineClient.close();
}
if (node1 != null) {
node1.shutdown();
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 3bf13e1d5f..22aa0ffd13 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -39,7 +39,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
-public class SeaTunnelClient implements SeaTunnelClientInstance {
+public class SeaTunnelClient implements SeaTunnelClientInstance, AutoCloseable
{
private final SeaTunnelHazelcastClient hazelcastClient;
@Getter private final JobClient jobClient;
@@ -87,10 +87,6 @@ public class SeaTunnelClient implements
SeaTunnelClientInstance {
SeaTunnelPrintMessageCodec::decodeResponse);
}
- public void shutdown() {
- hazelcastClient.shutdown();
- }
-
/**
* get job status and the tasks status
*