dybyte commented on code in PR #9749:
URL: https://github.com/apache/seatunnel/pull/9749#discussion_r2371613197
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java:
##########
@@ -78,9 +81,8 @@ public void testMasterNodeActive() {
try {
server2.getCoordinatorService();
- Assertions.fail("Need throw SeaTunnelEngineException here but
not.");
} catch (Exception e) {
- Assertions.assertTrue(e instanceof SeaTunnelEngineException);
+ Assertions.assertInstanceOf(SeaTunnelEngineException.class, e);
}
Review Comment:
```suggestion
Assertions.assertThrows(
SeaTunnelEngineException.class,
() -> server2.getCoordinatorService()
);
```
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java:
##########
@@ -98,6 +99,8 @@ public void testJobStateEvent() {
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(2,
accessCounter.get()));
+ TimeUnit.SECONDS.sleep(2);
+ Assertions.assertEquals(2, accessCounter.get());
Review Comment:
I think this is no longer needed.
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java:
##########
@@ -78,6 +78,7 @@ public void testJobStateEvent() {
JobStatus.FINISHED,
server.getCoordinatorService()
.getJobStatus(jobId_finished)));
+ TimeUnit.SECONDS.sleep(2);
Review Comment:
I think this is no longer needed.
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithBinlogDeleteIT.java:
##########
@@ -203,12 +203,15 @@ public void testRestoreTaskWhenBinlogDelete(TestContainer
container)
query(getSourceQuerySQL(MYSQL_DATABASE,
SOURCE_TABLE)),
query(getSinkQuerySQL(MYSQL_DATABASE,
SINK_TABLE)));
});
- // check no error
- log.info("****************** container logs start ******************");
- String containerLogs = container.getServerLogs();
- log.info(containerLogs);
- Assertions.assertFalse(containerLogs.contains("ERROR"));
- log.info("****************** container logs end ******************");
+ // check job status is not failed
+ await().pollDelay(20, TimeUnit.SECONDS)
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertNotEquals(
+ "FAILED",
container.getJobStatus(String.valueOf(jobId))));
+ // cancel task
+ Assertions.assertEquals(0,
container.cancelJob(String.valueOf(jobId)).getExitCode());
Review Comment:
@hawk9821
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -248,25 +241,25 @@ private void startPendingJobScheduleThread() {
}
private void pendingJobSchedule() throws InterruptedException {
- JobMaster jobMaster = pendingJob.peekBlocking();
- if (Objects.isNull(jobMaster)) {
- // This situation almost never happens because pendingJobSchedule
is single-threaded
- logger.warning("The peek job master is null");
+ PendingJobInfo pendingJobInfo = pendingJobQueue.peekBlocking();
+ if (Objects.isNull(pendingJobInfo)) {
+ logger.warning("The peek job info is null");
Thread.sleep(3000);
return;
}
- Long jobId = jobMaster.getJobId();
+ Long jobId = pendingJobInfo.getJobId();
+ JobMaster jobMaster = pendingJobInfo.getJobMaster();
- if (!pendingJobMasterMap.containsKey(jobId)) {
+ if (!pendingJobQueue.containsJobId(jobId)) {
logger.fine(String.format("Job ID : %s already cancelled", jobId));
- queueRemove(jobMaster);
Review Comment:
I think that since the behavior of the queue and the map has been combined,
this if statement will never be executed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]