EricJoy2048 commented on code in PR #3637:
URL:
https://github.com/apache/incubator-seatunnel/pull/3637#discussion_r1050571583
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java:
##########
@@ -673,40 +709,37 @@ public void testStreamJobRestoreInAllNodeDown() throws
ExecutionException, Inter
Thread.sleep(10000);
- node1 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node2 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3,
restoreFinalNode.getCluster().getMembers().size()));
+ .untilAsserted(() -> Assertions.assertEquals(3,
restoreFinalNode.getCluster().getMembers().size()));
Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> {
- // Wait job write all rows in file
- Thread.sleep(2000);
-
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
-
Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
- testRowNumber * testParallelism ==
FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
- });
+ .untilAsserted(() -> {
+ // Wait job write all rows in file
+ Thread.sleep(2000);
+
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+
Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+ testRowNumber * testParallelism <=
FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ });
// sleep 10s and expect the job don't write more rows.
Thread.sleep(10000);
clientJobProxy.cancelJob();
Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertTrue(
- objectCompletableFuture.isDone() &&
JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+ .untilAsserted(() -> Assertions.assertTrue(
+ objectCompletableFuture.isDone() &&
JobStatus.CANCELED.equals(objectCompletableFuture.get())));
- // check the final rows
+ // prove that the task was restarted
Long fileLineNumberFromDir =
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
- Assertions.assertEquals(testRowNumber * testParallelism,
fileLineNumberFromDir);
+ Assertions.assertTrue(testRowNumber * testParallelism <=
fileLineNumberFromDir);
Review Comment:
Why `testRowNumber * testParallelism` < `fileLineNumberFromDir ` ?
I think `testRowNumber * testParallelism` must equals to
`fileLineNumberFromDir `
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]