This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9ddaa63de [Feature][SeaTunnel Engine] Add test to test stream job
recover when all cluster node done and restart (#3503)
9ddaa63de is described below
commit 9ddaa63def2a33fe4772e7ed80f78193f5cb2883
Author: TaoZex <[email protected]>
AuthorDate: Wed Dec 7 17:31:22 2022 +0800
[Feature][SeaTunnel Engine] Add test to test stream job recover when all
cluster node done and restart (#3503)
* [Feature][SeaTunnel Engine] Add test to test stream job recover when all
cluster node done and restart
---
.../engine/e2e/ClusterFaultToleranceIT.java | 118 +++++++++++++++++++++
1 file changed, 118 insertions(+)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index ad8b09175..5ca2683d0 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -38,6 +38,7 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
@@ -608,4 +609,121 @@ public class ClusterFaultToleranceIT {
}
}
}
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ @Test
+ @Disabled("Wait for open Imap storage")
+ public void testStreamJobRestoreInAllNodeDown() throws ExecutionException,
InterruptedException {
+ String testCaseName = "testStreamJobRestoreInAllNodeDown";
+ String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreInAllNodeDown";
+ long testRowNumber = 1000;
+ int testParallelism = 6;
+ HazelcastInstanceImpl node1 = null;
+ HazelcastInstanceImpl node2 = null;
+ HazelcastInstanceImpl node3 = null;
+ SeaTunnelClient engineClient = null;
+
+ try {
+ node1 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node2 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node3 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ // waiting all node added to cluster
+ HazelcastInstanceImpl finalNode = node1;
+ Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+
+ Common.setDeployMode(DeployMode.CLIENT);
+ ImmutablePair<String, String> testResources =
+ createTestResources(testCaseName, JobMode.STREAMING,
testRowNumber, testParallelism);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testCaseName);
+
+ ClientConfig clientConfig =
ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(
+ TestUtils.getClusterName(testClusterName));
+ engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+ ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
+ });
+
+ Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ // Wait some tasks commit finished, and we can get
rows from the sink target dir
+ Thread.sleep(2000);
+
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+
Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+
FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
+ });
+
+ Thread.sleep(5000);
+ // shutdown all node
+ node1.shutdown();
+ node2.shutdown();
+ node3.shutdown();
+
+ Thread.sleep(10000);
+
+ node1 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node2 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node3 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ // waiting all node added to cluster
+ HazelcastInstanceImpl restoreFinalNode = node1;
+ Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(3,
restoreFinalNode.getCluster().getMembers().size()));
+
+ Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ // Wait job write all rows in file
+ Thread.sleep(2000);
+
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+
Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+ testRowNumber * testParallelism ==
FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ });
+
+ // sleep 10s and expect the job don't write more rows.
+ Thread.sleep(10000);
+ clientJobProxy.cancelJob();
+
+ Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+ objectCompletableFuture.isDone() &&
JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
+ // check the final rows
+ Long fileLineNumberFromDir =
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+ Assertions.assertEquals(testRowNumber * testParallelism,
fileLineNumberFromDir);
+
+ } finally {
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+
+ if (node2 != null) {
+ node2.shutdown();
+ }
+
+ if (node3 != null) {
+ node3.shutdown();
+ }
+ }
+ }
}