Hisoka-X commented on code in PR #9749:
URL: https://github.com/apache/seatunnel/pull/9749#discussion_r2297181771
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -671,16 +665,20 @@ && getJobHistoryService().getJobMetrics(jobId)
jobSubmitFuture.completeExceptionally(new
JobException(errorMsg));
}
if (!jobSubmitFuture.isCompletedExceptionally()) {
- pendingJob.put(jobMaster);
+ PendingJobInfo pendingJobInfo =
+ new PendingJobInfo(PendingSourceState.SUBMIT,
jobMaster);
jobMaster.getPhysicalPlan().updateJobState(JobStatus.PENDING);
+ pendingJobQueue.put(pendingJobInfo);
logger.info(
String.format(
"The submit job enter the pending
queue , jobId: %s , jobName: %s",
jobId,
jobMaster.getJobImmutableInformation().getJobName()));
+
} else {
runningJobInfoIMap.remove(jobId);
runningJobMasterMap.remove(jobId);
+ pendingJobQueue.removeByJobId(jobId);
Review Comment:
Does this one seem like useless because new jobMaster not be put into
pendingJobQueue?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -248,25 +241,25 @@ private void startPendingJobScheduleThread() {
}
private void pendingJobSchedule() throws InterruptedException {
- JobMaster jobMaster = pendingJob.peekBlocking();
- if (Objects.isNull(jobMaster)) {
- // This situation almost never happens because pendingJobSchedule
is single-threaded
- logger.warning("The peek job master is null");
+ PendingJobInfo pendingJobInfo = pendingJobQueue.peekBlocking();
+ if (Objects.isNull(pendingJobInfo)) {
+ logger.warning("The peek job info is null");
Thread.sleep(3000);
return;
}
- Long jobId = jobMaster.getJobId();
+ Long jobId = pendingJobInfo.getJobId();
+ JobMaster jobMaster = pendingJobInfo.getJobMaster();
- if (!pendingJobMasterMap.containsKey(jobId)) {
+ if (!pendingJobQueue.containsJobId(jobId)) {
logger.fine(String.format("Job ID : %s already cancelled", jobId));
- queueRemove(jobMaster);
Review Comment:
why not remove it from queue?
##########
seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java:
##########
@@ -296,8 +296,8 @@ public void pendingJobCancel() {
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
- Assertions.assertNotEquals(
- clientJobProxy.getJobStatus(),
JobStatus.CANCELED));
+ Assertions.assertEquals(
Review Comment:
ditto
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithBinlogDeleteIT.java:
##########
@@ -203,12 +203,15 @@ public void testRestoreTaskWhenBinlogDelete(TestContainer
container)
query(getSourceQuerySQL(MYSQL_DATABASE,
SOURCE_TABLE)),
query(getSinkQuerySQL(MYSQL_DATABASE,
SINK_TABLE)));
});
- // check no error
- log.info("****************** container logs start ******************");
- String containerLogs = container.getServerLogs();
- log.info(containerLogs);
- Assertions.assertFalse(containerLogs.contains("ERROR"));
- log.info("****************** container logs end ******************");
+ // check job status is not failed
+ await().pollDelay(20, TimeUnit.SECONDS)
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertNotEquals(
+ "FAILED",
container.getJobStatus(String.valueOf(jobId))));
+ // cancel task
+ Assertions.assertEquals(0,
container.cancelJob(String.valueOf(jobId)).getExitCode());
Review Comment:
Any reason for change this?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -248,25 +241,25 @@ private void startPendingJobScheduleThread() {
}
private void pendingJobSchedule() throws InterruptedException {
- JobMaster jobMaster = pendingJob.peekBlocking();
- if (Objects.isNull(jobMaster)) {
- // This situation almost never happens because pendingJobSchedule
is single-threaded
Review Comment:
Please revert the comment.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java:
##########
@@ -45,10 +48,19 @@ public class PeekBlockingQueue<E> {
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
+ private final Map<Long, E> jobIdMap = new ConcurrentHashMap<>();
+ private final Function<E, Long> jobIdExtractor;
Review Comment:
```suggestion
private final Function<E, Long> idExtractor;
```
Change jobid to id, so that we can use this tool in other place.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]