This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9ddaa63de [Feature][SeaTunnel Engine] Add test to test stream job 
recover when all cluster node done and restart (#3503)
9ddaa63de is described below

commit 9ddaa63def2a33fe4772e7ed80f78193f5cb2883
Author: TaoZex <[email protected]>
AuthorDate: Wed Dec 7 17:31:22 2022 +0800

    [Feature][SeaTunnel Engine] Add test to test stream job recover when all 
cluster node done and restart (#3503)
    
    * [Feature][SeaTunnel Engine] Add test to test stream job recover when all 
cluster node done and restart
---
 .../engine/e2e/ClusterFaultToleranceIT.java        | 118 +++++++++++++++++++++
 1 file changed, 118 insertions(+)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index ad8b09175..5ca2683d0 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -38,6 +38,7 @@ import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
 
@@ -608,4 +609,121 @@ public class ClusterFaultToleranceIT {
             }
         }
     }
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    @Test
+    @Disabled("Wait for open Imap storage")
+    public void testStreamJobRestoreInAllNodeDown() throws ExecutionException, 
InterruptedException {
+        String testCaseName = "testStreamJobRestoreInAllNodeDown";
+        String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRestoreInAllNodeDown";
+        long testRowNumber = 1000;
+        int testParallelism = 6;
+        HazelcastInstanceImpl node1 = null;
+        HazelcastInstanceImpl node2 = null;
+        HazelcastInstanceImpl node3 = null;
+        SeaTunnelClient engineClient = null;
+
+        try {
+            node1 = SeaTunnelServerStarter.createHazelcastInstance(
+                    TestUtils.getClusterName(testClusterName));
+
+            node2 = SeaTunnelServerStarter.createHazelcastInstance(
+                    TestUtils.getClusterName(testClusterName));
+
+            node3 = SeaTunnelServerStarter.createHazelcastInstance(
+                    TestUtils.getClusterName(testClusterName));
+
+            // waiting all node added to cluster
+            HazelcastInstanceImpl finalNode = node1;
+            Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+
+            Common.setDeployMode(DeployMode.CLIENT);
+            ImmutablePair<String, String> testResources =
+                    createTestResources(testCaseName, JobMode.STREAMING, 
testRowNumber, testParallelism);
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName(testCaseName);
+
+            ClientConfig clientConfig = 
ConfigProvider.locateAndGetClientConfig();
+            clientConfig.setClusterName(
+                    TestUtils.getClusterName(testClusterName));
+            engineClient = new SeaTunnelClient(clientConfig);
+            JobExecutionEnvironment jobExecutionEnv =
+                    
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+            ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+            CompletableFuture<JobStatus> objectCompletableFuture = 
CompletableFuture.supplyAsync(() -> {
+                return clientJobProxy.waitForJobComplete();
+            });
+
+            Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(() -> {
+                        // Wait some tasks commit finished, and we can get 
rows from the sink target dir
+                        Thread.sleep(2000);
+                        
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                        
Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+                                
FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
+                    });
+
+            Thread.sleep(5000);
+            // shutdown all node
+            node1.shutdown();
+            node2.shutdown();
+            node3.shutdown();
+
+            Thread.sleep(10000);
+
+            node1 = SeaTunnelServerStarter.createHazelcastInstance(
+                    TestUtils.getClusterName(testClusterName));
+
+            node2 = SeaTunnelServerStarter.createHazelcastInstance(
+                    TestUtils.getClusterName(testClusterName));
+
+            node3 = SeaTunnelServerStarter.createHazelcastInstance(
+                    TestUtils.getClusterName(testClusterName));
+
+            // waiting all node added to cluster
+            HazelcastInstanceImpl restoreFinalNode = node1;
+            Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(() -> Assertions.assertEquals(3, 
restoreFinalNode.getCluster().getMembers().size()));
+
+            Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(() -> {
+                        // Wait job write all rows in file
+                        Thread.sleep(2000);
+                        
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                        
Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+                                testRowNumber * testParallelism == 
FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                    });
+
+            // sleep 10s and expect the job don't write more rows.
+            Thread.sleep(10000);
+            clientJobProxy.cancelJob();
+
+            Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(() -> Assertions.assertTrue(
+                            objectCompletableFuture.isDone() && 
JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
+            // check the final rows
+            Long fileLineNumberFromDir = 
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+            Assertions.assertEquals(testRowNumber * testParallelism, 
fileLineNumberFromDir);
+
+        } finally {
+            if (engineClient != null) {
+                engineClient.shutdown();
+            }
+
+            if (node1 != null) {
+                node1.shutdown();
+            }
+
+            if (node2 != null) {
+                node2.shutdown();
+            }
+
+            if (node3 != null) {
+                node3.shutdown();
+            }
+        }
+    }
 }

Reply via email to