This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 852479609 [CI][Zeta] fix ci error (#4028)
852479609 is described below

commit 8524796090d88b7f9fb2374ede9a4a592412bf1b
Author: Eric <[email protected]>
AuthorDate: Thu Feb 2 10:59:54 2023 +0800

    [CI][Zeta] fix ci error (#4028)
    
    * fix ci error
    
    * fix zeta bug
    
    * fix ci error
---
 .../engine/e2e/ClusterFaultToleranceIT.java        | 112 ++++++---------------
 .../e2e/ClusterFaultToleranceTwoPipelineIT.java    | 100 ++++++------------
 .../src/test/resources/hazelcast.yaml              |   4 +-
 .../engine/server/dag/physical/PhysicalPlan.java   |   1 +
 .../imap/storage/file/disruptor/WALDisruptor.java  |   2 +-
 5 files changed, 65 insertions(+), 154 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index 269b69524..b04e1363d 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -65,15 +65,14 @@ public class ClusterFaultToleranceIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testBatchJobRunOkIn3Node() throws ExecutionException, 
InterruptedException {
-        String testCaseName = "testBatchJobRunOkIn3Node";
-        String testClusterName = 
"ClusterFaultToleranceIT_testBatchJobRunOkIn3Node";
+    public void testBatchJobRunOkIn2Node() throws ExecutionException, 
InterruptedException {
+        String testCaseName = "testBatchJobRunOkIn2Node";
+        String testClusterName = 
"ClusterFaultToleranceIT_testBatchJobRunOkIn2Node";
         long testRowNumber = 1000;
         int testParallelism = 6;
 
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -84,12 +83,10 @@ public class ClusterFaultToleranceIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -129,10 +126,6 @@ public class ClusterFaultToleranceIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 
@@ -172,14 +165,13 @@ public class ClusterFaultToleranceIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testStreamJobRunOkIn3Node() throws ExecutionException, 
InterruptedException {
-        String testCaseName = "testStreamJobRunOkIn3Node";
-        String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRunOkIn3Node";
+    public void testStreamJobRunOkIn2Node() throws ExecutionException, 
InterruptedException {
+        String testCaseName = "testStreamJobRunOkIn2Node";
+        String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRunOkIn2Node";
         long testRowNumber = 1000;
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -189,12 +181,10 @@ public class ClusterFaultToleranceIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -212,7 +202,7 @@ public class ClusterFaultToleranceIT {
 
             CompletableFuture<JobStatus> objectCompletableFuture = 
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
 
-            Awaitility.await().atMost(3, TimeUnit.MINUTES)
+            Awaitility.await().atMost(2, TimeUnit.MINUTES)
                 .untilAsserted(() -> {
                     Thread.sleep(2000);
                     
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
@@ -241,23 +231,18 @@ public class ClusterFaultToleranceIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testBatchJobRestoreIn3NodeWorkerDown() throws 
ExecutionException, InterruptedException {
-        String testCaseName = "testBatchJobRestoreIn3NodeWorkerDown";
-        String testClusterName = 
"ClusterFaultToleranceIT_testBatchJobRestoreIn3NodeWorkerDown";
+    public void testBatchJobRestoreIn2NodeWorkerDown() throws 
ExecutionException, InterruptedException {
+        String testCaseName = "testBatchJobRestoreIn2NodeWorkerDown";
+        String testClusterName = 
"ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeWorkerDown";
         long testRowNumber = 1000;
         int testParallelism = 2;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -267,12 +252,10 @@ public class ClusterFaultToleranceIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -321,23 +304,18 @@ public class ClusterFaultToleranceIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testStreamJobRestoreIn3NodeWorkerDown() throws 
ExecutionException, InterruptedException {
-        String testCaseName = "testStreamJobRestoreIn3NodeWorkerDown";
-        String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRestoreIn3NodeWorkerDown";
+    public void testStreamJobRestoreIn2NodeWorkerDown() throws 
ExecutionException, InterruptedException {
+        String testCaseName = "testStreamJobRestoreIn2NodeWorkerDown";
+        String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeWorkerDown";
         long testRowNumber = 1000;
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -347,12 +325,10 @@ public class ClusterFaultToleranceIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -416,23 +392,18 @@ public class ClusterFaultToleranceIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testBatchJobRestoreIn3NodeMasterDown() throws 
ExecutionException, InterruptedException {
-        String testCaseName = "testBatchJobRestoreIn3NodeMasterDown";
-        String testClusterName = 
"ClusterFaultToleranceIT_testBatchJobRestoreIn3NodeMasterDown";
+    public void testBatchJobRestoreIn2NodeMasterDown() throws 
ExecutionException, InterruptedException {
+        String testCaseName = "testBatchJobRestoreIn2NodeMasterDown";
+        String testClusterName = 
"ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeMasterDown";
         long testRowNumber = 1000;
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -442,12 +413,10 @@ public class ClusterFaultToleranceIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -496,23 +465,18 @@ public class ClusterFaultToleranceIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testStreamJobRestoreIn3NodeMasterDown() throws 
ExecutionException, InterruptedException {
-        String testCaseName = "testStreamJobRestoreIn3NodeMasterDown";
-        String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRestoreIn3NodeMasterDown";
+    public void testStreamJobRestoreIn2NodeMasterDown() throws 
ExecutionException, InterruptedException {
+        String testCaseName = "testStreamJobRestoreIn2NodeMasterDown";
+        String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeMasterDown";
         long testRowNumber = 1000;
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -522,12 +486,10 @@ public class ClusterFaultToleranceIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -590,10 +552,6 @@ public class ClusterFaultToleranceIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 
@@ -606,7 +564,6 @@ public class ClusterFaultToleranceIT {
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         try {
@@ -636,12 +593,14 @@ public class ClusterFaultToleranceIT {
                 "        properties:\n" +
                 "          type: hdfs\n" +
                 "          namespace: /tmp/seatunnel/imap\n" +
-                "          clusterName: seatunnel-clsuter\n" +
+                "          clusterName: " + testClusterName + "\n" +
                 "          fs.defaultFS: file:///\n" +
                 "\n" +
                 "  properties:\n" +
-                "    hazelcast.invocation.max.retry.count: 20\n" +
+                "    hazelcast.invocation.max.retry.count: 200\n" +
                 "    hazelcast.tcp.join.port.try.count: 30\n" +
+                "    hazelcast.invocation.retry.pause.millis: 2000\n" +
+                "    
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n" +
                 "    hazelcast.logging.type: log4j2\n";
 
             Config hazelcastConfig = Config.loadFromString(yaml);
@@ -652,12 +611,10 @@ public class ClusterFaultToleranceIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -688,7 +645,6 @@ public class ClusterFaultToleranceIT {
             // shutdown all node
             node1.shutdown();
             node2.shutdown();
-            node3.shutdown();
 
             Thread.sleep(10000);
 
@@ -696,12 +652,10 @@ public class ClusterFaultToleranceIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl restoreFinalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
restoreFinalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
restoreFinalNode.getCluster().getMembers().size()));
 
             Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> {
@@ -736,10 +690,6 @@ public class ClusterFaultToleranceIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index aa3286a4c..56e0c09e2 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -66,15 +66,14 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testTwoPipelineBatchJobRunOkIn3Node() throws 
ExecutionException, InterruptedException {
-        String testCaseName = "testTwoPipelineBatchJobRunOkIn3Node";
-        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRunOkIn3Node";
+    public void testTwoPipelineBatchJobRunOkIn2Node() throws 
ExecutionException, InterruptedException {
+        String testCaseName = "testTwoPipelineBatchJobRunOkIn2Node";
+        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRunOkIn2Node";
         long testRowNumber = 1000;
         int testParallelism = 6;
 
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -85,12 +84,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -132,10 +129,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 
@@ -177,14 +170,13 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testTwoPipelineStreamJobRunOkIn3Node() throws 
ExecutionException, InterruptedException {
-        String testCaseName = "testTwoPipelineStreamJobRunOkIn3Node";
-        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRunOkIn3Node";
+    public void testTwoPipelineStreamJobRunOkIn2Node() throws 
ExecutionException, InterruptedException {
+        String testCaseName = "testTwoPipelineStreamJobRunOkIn2Node";
+        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRunOkIn2Node";
         long testRowNumber = 1000;
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -194,12 +186,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -219,7 +209,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
                 return clientJobProxy.waitForJobComplete();
             });
 
-            Awaitility.await().atMost(3, TimeUnit.MINUTES)
+            Awaitility.await().atMost(6, TimeUnit.MINUTES)
                 .untilAsserted(() -> {
                     Thread.sleep(2000);
                     
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
@@ -248,23 +238,18 @@ public class ClusterFaultToleranceTwoPipelineIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testTwoPipelineBatchJobRestoreIn3NodeWorkerDown() throws 
ExecutionException, InterruptedException {
-        String testCaseName = 
"testTwoPipelineBatchJobRestoreIn3NodeWorkerDown";
-        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn3NodeWorkerDown";
+    public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown() throws 
ExecutionException, InterruptedException {
+        String testCaseName = 
"testTwoPipelineBatchJobRestoreIn2NodeWorkerDown";
+        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeWorkerDown";
         long testRowNumber = 1000;
         int testParallelism = 2;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -274,12 +259,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -330,23 +313,18 @@ public class ClusterFaultToleranceTwoPipelineIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testTwoPipelineStreamJobRestoreIn3NodeWorkerDown() throws 
ExecutionException, InterruptedException {
-        String testCaseName = 
"testTwoPipelineStreamJobRestoreIn3NodeWorkerDown";
-        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn3NodeWorkerDown";
+    public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown() throws 
ExecutionException, InterruptedException {
+        String testCaseName = 
"testTwoPipelineStreamJobRestoreIn2NodeWorkerDown";
+        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeWorkerDown";
         long testRowNumber = 1000;
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -356,12 +334,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -381,7 +357,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
                 return clientJobProxy.waitForJobComplete();
             });
 
-            Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+            Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> {
                     // Wait some tasks commit finished, and we can get rows 
from the sink target dir
                     Thread.sleep(2000);
@@ -427,23 +403,18 @@ public class ClusterFaultToleranceTwoPipelineIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testTwoPipelineBatchJobRestoreIn3NodeMasterDown() throws 
ExecutionException, InterruptedException {
-        String testCaseName = 
"testTwoPipelineBatchJobRestoreIn3NodeMasterDown";
-        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn3NodeMasterDown";
+    public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() throws 
ExecutionException, InterruptedException {
+        String testCaseName = 
"testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
+        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
         long testRowNumber = 1000;
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -453,12 +424,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -476,7 +445,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
             CompletableFuture<JobStatus> objectCompletableFuture = 
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
 
-            Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+            Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> {
                     // Wait some tasks commit finished
                     Thread.sleep(2000);
@@ -488,7 +457,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
             // shutdown master node
             node1.shutdown();
 
-            Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
+            Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> Assertions.assertTrue(
                     objectCompletableFuture.isDone() && 
JobStatus.FINISHED.equals(objectCompletableFuture.get())));
 
@@ -507,23 +476,18 @@ public class ClusterFaultToleranceTwoPipelineIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    public void testTwoPipelineStreamJobRestoreIn3NodeMasterDown() throws 
ExecutionException, InterruptedException {
-        String testCaseName = 
"testTwoPipelineStreamJobRestoreIn3NodeMasterDown";
-        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn3NodeMasterDown";
+    public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown() throws 
ExecutionException, InterruptedException {
+        String testCaseName = 
"testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
+        String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
         long testRowNumber = 1000;
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
-        HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -533,12 +497,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
             Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> Assertions.assertEquals(3, 
finalNode.getCluster().getMembers().size()));
+                .untilAsserted(() -> Assertions.assertEquals(2, 
finalNode.getCluster().getMembers().size()));
 
             Common.setDeployMode(DeployMode.CLIENT);
             ImmutablePair<String, String> testResources =
@@ -558,7 +520,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
                 return clientJobProxy.waitForJobComplete();
             });
 
-            Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+            Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> {
                     // Wait some tasks commit finished, and we can get rows 
from the sink target dir
                     Thread.sleep(2000);
@@ -603,10 +565,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
             if (node2 != null) {
                 node2.shutdown();
             }
-
-            if (node3 != null) {
-                node3.shutdown();
-            }
         }
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
index 3146ffc69..e83cc722e 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
@@ -28,6 +28,8 @@ hazelcast:
       port-count: 100
       port: 5801
   properties:
-    hazelcast.invocation.max.retry.count: 60
+    hazelcast.invocation.max.retry.count: 200
+    hazelcast.invocation.retry.pause.millis: 2000
     hazelcast.tcp.join.port.try.count: 30
+    hazelcast.slow.operation.detector.stacktrace.logging.enabled: true
     hazelcast.logging.type: log4j2
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index cfbee02ce..b64a79022 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -159,6 +159,7 @@ public class PhysicalPlan {
                     LOGGER.info(String.format("release the pipeline %s 
resource", subPlan.getPipelineFullName()));
                 } else if 
(PipelineStatus.FAILED.equals(pipelineState.getPipelineStatus())) {
                     if (canRestorePipeline(subPlan)) {
+                        LOGGER.info(String.format("Can restore pipeline %s", 
subPlan.getPipelineFullName()));
                         subPlan.restorePipeline();
                         return;
                     }
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java
 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java
index f7edb9278..52b2e82e1 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java
@@ -43,7 +43,7 @@ public class WALDisruptor implements Closeable {
 
     private volatile Disruptor<FileWALEvent> disruptor;
 
-    private static final int DEFAULT_RING_BUFFER_SIZE = 256 * 1024;
+    private static final int DEFAULT_RING_BUFFER_SIZE = 1024;
 
     private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5;
 


Reply via email to