This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 852479609 [CI][Zeta] fix ci error (#4028)
852479609 is described below
commit 8524796090d88b7f9fb2374ede9a4a592412bf1b
Author: Eric <[email protected]>
AuthorDate: Thu Feb 2 10:59:54 2023 +0800
[CI][Zeta] fix ci error (#4028)
* fix ci error
* fix zeta bug
* fix ci error
---
.../engine/e2e/ClusterFaultToleranceIT.java | 112 ++++++---------------
.../e2e/ClusterFaultToleranceTwoPipelineIT.java | 100 ++++++------------
.../src/test/resources/hazelcast.yaml | 4 +-
.../engine/server/dag/physical/PhysicalPlan.java | 1 +
.../imap/storage/file/disruptor/WALDisruptor.java | 2 +-
5 files changed, 65 insertions(+), 154 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index 269b69524..b04e1363d 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -65,15 +65,14 @@ public class ClusterFaultToleranceIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testBatchJobRunOkIn3Node() throws ExecutionException,
InterruptedException {
- String testCaseName = "testBatchJobRunOkIn3Node";
- String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRunOkIn3Node";
+ public void testBatchJobRunOkIn2Node() throws ExecutionException,
InterruptedException {
+ String testCaseName = "testBatchJobRunOkIn2Node";
+ String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRunOkIn2Node";
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -84,12 +83,10 @@ public class ClusterFaultToleranceIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -129,10 +126,6 @@ public class ClusterFaultToleranceIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
@@ -172,14 +165,13 @@ public class ClusterFaultToleranceIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testStreamJobRunOkIn3Node() throws ExecutionException,
InterruptedException {
- String testCaseName = "testStreamJobRunOkIn3Node";
- String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRunOkIn3Node";
+ public void testStreamJobRunOkIn2Node() throws ExecutionException,
InterruptedException {
+ String testCaseName = "testStreamJobRunOkIn2Node";
+ String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRunOkIn2Node";
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -189,12 +181,10 @@ public class ClusterFaultToleranceIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -212,7 +202,7 @@ public class ClusterFaultToleranceIT {
CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
- Awaitility.await().atMost(3, TimeUnit.MINUTES)
+ Awaitility.await().atMost(2, TimeUnit.MINUTES)
.untilAsserted(() -> {
Thread.sleep(2000);
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
@@ -241,23 +231,18 @@ public class ClusterFaultToleranceIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testBatchJobRestoreIn3NodeWorkerDown() throws
ExecutionException, InterruptedException {
- String testCaseName = "testBatchJobRestoreIn3NodeWorkerDown";
- String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRestoreIn3NodeWorkerDown";
+ public void testBatchJobRestoreIn2NodeWorkerDown() throws
ExecutionException, InterruptedException {
+ String testCaseName = "testBatchJobRestoreIn2NodeWorkerDown";
+ String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeWorkerDown";
long testRowNumber = 1000;
int testParallelism = 2;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -267,12 +252,10 @@ public class ClusterFaultToleranceIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -321,23 +304,18 @@ public class ClusterFaultToleranceIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testStreamJobRestoreIn3NodeWorkerDown() throws
ExecutionException, InterruptedException {
- String testCaseName = "testStreamJobRestoreIn3NodeWorkerDown";
- String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreIn3NodeWorkerDown";
+ public void testStreamJobRestoreIn2NodeWorkerDown() throws
ExecutionException, InterruptedException {
+ String testCaseName = "testStreamJobRestoreIn2NodeWorkerDown";
+ String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeWorkerDown";
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -347,12 +325,10 @@ public class ClusterFaultToleranceIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -416,23 +392,18 @@ public class ClusterFaultToleranceIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testBatchJobRestoreIn3NodeMasterDown() throws
ExecutionException, InterruptedException {
- String testCaseName = "testBatchJobRestoreIn3NodeMasterDown";
- String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRestoreIn3NodeMasterDown";
+ public void testBatchJobRestoreIn2NodeMasterDown() throws
ExecutionException, InterruptedException {
+ String testCaseName = "testBatchJobRestoreIn2NodeMasterDown";
+ String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeMasterDown";
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -442,12 +413,10 @@ public class ClusterFaultToleranceIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -496,23 +465,18 @@ public class ClusterFaultToleranceIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testStreamJobRestoreIn3NodeMasterDown() throws
ExecutionException, InterruptedException {
- String testCaseName = "testStreamJobRestoreIn3NodeMasterDown";
- String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreIn3NodeMasterDown";
+ public void testStreamJobRestoreIn2NodeMasterDown() throws
ExecutionException, InterruptedException {
+ String testCaseName = "testStreamJobRestoreIn2NodeMasterDown";
+ String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeMasterDown";
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -522,12 +486,10 @@ public class ClusterFaultToleranceIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -590,10 +552,6 @@ public class ClusterFaultToleranceIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
@@ -606,7 +564,6 @@ public class ClusterFaultToleranceIT {
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
try {
@@ -636,12 +593,14 @@ public class ClusterFaultToleranceIT {
" properties:\n" +
" type: hdfs\n" +
" namespace: /tmp/seatunnel/imap\n" +
- " clusterName: seatunnel-clsuter\n" +
+ " clusterName: " + testClusterName + "\n" +
" fs.defaultFS: file:///\n" +
"\n" +
" properties:\n" +
- " hazelcast.invocation.max.retry.count: 20\n" +
+ " hazelcast.invocation.max.retry.count: 200\n" +
" hazelcast.tcp.join.port.try.count: 30\n" +
+ " hazelcast.invocation.retry.pause.millis: 2000\n" +
+ "
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n" +
" hazelcast.logging.type: log4j2\n";
Config hazelcastConfig = Config.loadFromString(yaml);
@@ -652,12 +611,10 @@ public class ClusterFaultToleranceIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -688,7 +645,6 @@ public class ClusterFaultToleranceIT {
// shutdown all node
node1.shutdown();
node2.shutdown();
- node3.shutdown();
Thread.sleep(10000);
@@ -696,12 +652,10 @@ public class ClusterFaultToleranceIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
restoreFinalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
restoreFinalNode.getCluster().getMembers().size()));
Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
@@ -736,10 +690,6 @@ public class ClusterFaultToleranceIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index aa3286a4c..56e0c09e2 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -66,15 +66,14 @@ public class ClusterFaultToleranceTwoPipelineIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testTwoPipelineBatchJobRunOkIn3Node() throws
ExecutionException, InterruptedException {
- String testCaseName = "testTwoPipelineBatchJobRunOkIn3Node";
- String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRunOkIn3Node";
+ public void testTwoPipelineBatchJobRunOkIn2Node() throws
ExecutionException, InterruptedException {
+ String testCaseName = "testTwoPipelineBatchJobRunOkIn2Node";
+ String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRunOkIn2Node";
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -85,12 +84,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -132,10 +129,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
@@ -177,14 +170,13 @@ public class ClusterFaultToleranceTwoPipelineIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testTwoPipelineStreamJobRunOkIn3Node() throws
ExecutionException, InterruptedException {
- String testCaseName = "testTwoPipelineStreamJobRunOkIn3Node";
- String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRunOkIn3Node";
+ public void testTwoPipelineStreamJobRunOkIn2Node() throws
ExecutionException, InterruptedException {
+ String testCaseName = "testTwoPipelineStreamJobRunOkIn2Node";
+ String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRunOkIn2Node";
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -194,12 +186,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -219,7 +209,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
return clientJobProxy.waitForJobComplete();
});
- Awaitility.await().atMost(3, TimeUnit.MINUTES)
+ Awaitility.await().atMost(6, TimeUnit.MINUTES)
.untilAsserted(() -> {
Thread.sleep(2000);
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
@@ -248,23 +238,18 @@ public class ClusterFaultToleranceTwoPipelineIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testTwoPipelineBatchJobRestoreIn3NodeWorkerDown() throws
ExecutionException, InterruptedException {
- String testCaseName =
"testTwoPipelineBatchJobRestoreIn3NodeWorkerDown";
- String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn3NodeWorkerDown";
+ public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown() throws
ExecutionException, InterruptedException {
+ String testCaseName =
"testTwoPipelineBatchJobRestoreIn2NodeWorkerDown";
+ String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeWorkerDown";
long testRowNumber = 1000;
int testParallelism = 2;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -274,12 +259,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -330,23 +313,18 @@ public class ClusterFaultToleranceTwoPipelineIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testTwoPipelineStreamJobRestoreIn3NodeWorkerDown() throws
ExecutionException, InterruptedException {
- String testCaseName =
"testTwoPipelineStreamJobRestoreIn3NodeWorkerDown";
- String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn3NodeWorkerDown";
+ public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown() throws
ExecutionException, InterruptedException {
+ String testCaseName =
"testTwoPipelineStreamJobRestoreIn2NodeWorkerDown";
+ String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeWorkerDown";
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -356,12 +334,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -381,7 +357,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
return clientJobProxy.waitForJobComplete();
});
- Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+ Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
// Wait some tasks commit finished, and we can get rows
from the sink target dir
Thread.sleep(2000);
@@ -427,23 +403,18 @@ public class ClusterFaultToleranceTwoPipelineIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testTwoPipelineBatchJobRestoreIn3NodeMasterDown() throws
ExecutionException, InterruptedException {
- String testCaseName =
"testTwoPipelineBatchJobRestoreIn3NodeMasterDown";
- String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn3NodeMasterDown";
+ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() throws
ExecutionException, InterruptedException {
+ String testCaseName =
"testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
+ String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -453,12 +424,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -476,7 +445,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
- Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+ Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
// Wait some tasks commit finished
Thread.sleep(2000);
@@ -488,7 +457,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
// shutdown master node
node1.shutdown();
- Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
+ Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(
objectCompletableFuture.isDone() &&
JobStatus.FINISHED.equals(objectCompletableFuture.get())));
@@ -507,23 +476,18 @@ public class ClusterFaultToleranceTwoPipelineIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- public void testTwoPipelineStreamJobRestoreIn3NodeMasterDown() throws
ExecutionException, InterruptedException {
- String testCaseName =
"testTwoPipelineStreamJobRestoreIn3NodeMasterDown";
- String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn3NodeMasterDown";
+ public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown() throws
ExecutionException, InterruptedException {
+ String testCaseName =
"testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
+ String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
- HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
@@ -533,12 +497,10 @@ public class ClusterFaultToleranceTwoPipelineIT {
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
finalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(2,
finalNode.getCluster().getMembers().size()));
Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
@@ -558,7 +520,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
return clientJobProxy.waitForJobComplete();
});
- Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+ Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
// Wait some tasks commit finished, and we can get rows
from the sink target dir
Thread.sleep(2000);
@@ -603,10 +565,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
if (node2 != null) {
node2.shutdown();
}
-
- if (node3 != null) {
- node3.shutdown();
- }
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
index 3146ffc69..e83cc722e 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
@@ -28,6 +28,8 @@ hazelcast:
port-count: 100
port: 5801
properties:
- hazelcast.invocation.max.retry.count: 60
+ hazelcast.invocation.max.retry.count: 200
+ hazelcast.invocation.retry.pause.millis: 2000
hazelcast.tcp.join.port.try.count: 30
+ hazelcast.slow.operation.detector.stacktrace.logging.enabled: true
hazelcast.logging.type: log4j2
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index cfbee02ce..b64a79022 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -159,6 +159,7 @@ public class PhysicalPlan {
LOGGER.info(String.format("release the pipeline %s
resource", subPlan.getPipelineFullName()));
} else if
(PipelineStatus.FAILED.equals(pipelineState.getPipelineStatus())) {
if (canRestorePipeline(subPlan)) {
+ LOGGER.info(String.format("Can restore pipeline %s",
subPlan.getPipelineFullName()));
subPlan.restorePipeline();
return;
}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java
index f7edb9278..52b2e82e1 100644
---
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java
+++
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java
@@ -43,7 +43,7 @@ public class WALDisruptor implements Closeable {
private volatile Disruptor<FileWALEvent> disruptor;
- private static final int DEFAULT_RING_BUFFER_SIZE = 256 * 1024;
+ private static final int DEFAULT_RING_BUFFER_SIZE = 1024;
private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5;