Hisoka-X commented on code in PR #9749:
URL: https://github.com/apache/seatunnel/pull/9749#discussion_r2347356393
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithBinlogDeleteIT.java:
##########
@@ -203,12 +203,23 @@ public void testRestoreTaskWhenBinlogDelete(TestContainer
container)
query(getSourceQuerySQL(MYSQL_DATABASE,
SOURCE_TABLE)),
query(getSinkQuerySQL(MYSQL_DATABASE,
SINK_TABLE)));
});
+ // check job status is not failed
+ await().pollDelay(20, TimeUnit.SECONDS)
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertNotEquals(
+ "FAILED",
container.getJobStatus(String.valueOf(jobId))));
Review Comment:
why not check it still running?
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java:
##########
@@ -196,12 +203,33 @@ void testCleanupRunningJobStateIMap() {
"batch_fake_to_console.conf",
"test_cleanup_running_job_state_imap");
CoordinatorService coordinatorService =
jobInformation.coordinatorService;
- IMap<Object, Object> runningJobStateIMap =
-
coordinatorService.getJobMaster(jobInformation.jobId).getRunningJobStateIMap();
- Assertions.assertTrue(!runningJobStateIMap.isEmpty());
await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() ->
Assertions.assertTrue(runningJobStateIMap.isEmpty()));
Review Comment:
we should check `runningJobStateIMap` is empty.
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java:
##########
@@ -615,21 +606,25 @@ public void
testTwoPipelineBatchJobRestoreIn2NodeMasterDown() throws Exception {
// shutdown master node
node1.shutdown();
+ log.info(
+ "=============================shutdown
node1===================================");
+
Awaitility.await()
- .atMost(600000, TimeUnit.MILLISECONDS)
+ .atMost(300000, TimeUnit.MILLISECONDS)
+ .pollInterval(2000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
- // Wait some tasks commit finished
- Thread.sleep(2000);
log.warn(
"\n================================="
+
FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
+
"=================================\n");
- Assertions.assertTrue(
- objectCompletableFuture.isDone()
- && JobStatus.FINISHED.equals(
-
objectCompletableFuture.get()));
+ JobStatus jobStatus =
clientJobProxy.getJobStatus();
+
System.out.println("++++++++++++++++++++++++++++++++++++++++");
Review Comment:
remove this?
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java:
##########
@@ -119,20 +119,24 @@ public void cancelJobTest() throws Exception {
engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- JobStatus jobStatus1 = clientJobProxy.getJobStatus();
- Assertions.assertFalse(jobStatus1.isEndState());
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertFalse(
+
clientJobProxy.getJobStatus().isEndState()));
+
Review Comment:
why we need add await?
##########
seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java:
##########
@@ -453,7 +455,8 @@ public void testWorkerIsFirstMemberThenGetJobDetailStatus()
{
.listJobStatus(true)
.contains("RUNNING")));
jobClient.cancelJob(jobId);
- await().atMost(30000, TimeUnit.MILLISECONDS)
+ await().pollDelay(10000, TimeUnit.MILLISECONDS)
+ .atMost(60000, TimeUnit.MILLISECONDS)
Review Comment:
why increase time?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -317,21 +311,19 @@ private void pendingJobSchedule() throws
InterruptedException {
"The %s %s is in %s state, restore
pipeline and take over this job running",
pendingSourceState, jobFullName,
jobStatus));
- pendingJobMasterMap.remove(jobId);
- runningJobMasterMap.put(jobId, jobMaster);
- jobMaster.run();
+ runningJobMasterMap.put(jobId, finalJobMaster);
+ finalJobMaster.run();
} finally {
- if (jobMasterCompletedSuccessfully(jobMaster,
pendingSourceState)) {
+ if (jobMasterCompletedSuccessfully(finalJobMaster,
pendingSourceState)) {
runningJobMasterMap.remove(jobId);
}
}
});
}
- private void queueRemove(JobMaster jobMaster) throws InterruptedException {
- JobMaster take = pendingJob.take();
- if (take != jobMaster) {
- logger.severe("The job master is not equal to the peek job
master");
+ private void queueRemove(JobMaster jobMaster) {
+ if (pendingJobQueue.contains(jobMaster.getJobId())) {
+ pendingJobQueue.removeById(jobMaster.getJobId());
}
Review Comment:
```suggestion
pendingJobQueue.removeById(jobMaster.getJobId());
```
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java:
##########
@@ -78,9 +81,8 @@ public void testMasterNodeActive() {
try {
server2.getCoordinatorService();
- Assertions.fail("Need throw SeaTunnelEngineException here but
not.");
Review Comment:
why remove this?
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java:
##########
@@ -119,20 +119,24 @@ public void cancelJobTest() throws Exception {
engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- JobStatus jobStatus1 = clientJobProxy.getJobStatus();
- Assertions.assertFalse(jobStatus1.isEndState());
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertFalse(
+
clientJobProxy.getJobStatus().isEndState()));
+
CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
Thread.sleep(1000);
clientJobProxy.cancelJob();
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
Review Comment:
Why increase time?
##########
seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java:
##########
@@ -285,8 +283,13 @@ public void cancelJobTest() throws Exception {
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath,
jobConfig, SEATUNNEL_CONFIG);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- JobStatus jobStatus1 = clientJobProxy.getJobStatus();
- Assertions.assertFalse(jobStatus1.isEndState());
+ await().atMost(5000, TimeUnit.MILLISECONDS)
Review Comment:
ditto
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java:
##########
@@ -93,8 +95,8 @@ public void testMasterNodeActive() {
CoordinatorService coordinatorService =
server2.getCoordinatorService();
Assertions.assertTrue(coordinatorService.isCoordinatorActive());
- } catch (SeaTunnelEngineException e) {
- Assertions.assertTrue(false);
+ } catch (Exception e) {
+
Assertions.assertInstanceOf(SeaTunnelEngineException.class, e);
Review Comment:
Before this change, it should not throw Exception, why it will throw
exception now?
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceWithCancelPendingJobTest.java:
##########
@@ -171,16 +170,28 @@ private JobMaster newJobInstanceWithRunningState(long
jobId, boolean restore)
.submitJob(jobId, data,
jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
- JobMaster jobMaster =
server.getCoordinatorService().getJobMaster(jobId);
+ AtomicReference<JobMaster> jobMasterReference = new
AtomicReference<>();
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
Review Comment:
why we should waiting to getJobMaster?
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java:
##########
@@ -310,16 +311,28 @@ private JobMaster newJobInstanceWithRunningState(long
jobId, boolean restore)
.submitJob(jobId, data,
jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
- JobMaster jobMaster =
server.getCoordinatorService().getJobMaster(jobId);
+ AtomicReference<JobMaster> jobMasterReference = new
AtomicReference<>();
Review Comment:
ditto
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java:
##########
@@ -339,9 +384,13 @@ private JobInformation submitJob(String testClassName,
String jobConfigFile, Str
Data data =
coordinatorServiceTest.getSerializationService().toData(jobImmutableInformation);
- coordinatorService
- .submitJob(jobId, data,
jobImmutableInformation.isStartWithSavePoint())
- .join();
+ try {
+ coordinatorService
+ .submitJob(jobId, data,
jobImmutableInformation.isStartWithSavePoint())
+ .join();
+ } catch (Throwable e) {
+ log.error("submit job failed", e);
Review Comment:
why?
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java:
##########
@@ -41,14 +42,25 @@ public void testCheckpointRestoreToFailEnd() {
long jobId = System.currentTimeMillis();
startJob(jobId, STREAM_CONF_WITH_ERROR_PATH, false);
- JobMaster jobMaster =
server.getCoordinatorService().getJobMaster(jobId);
- Assertions.assertEquals(1,
jobMaster.getPhysicalPlan().getPipelineList().size());
+ AtomicReference<JobMaster> jobMasterReference = new
AtomicReference<>();
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]