zhangshenghang commented on code in PR #9749:
URL: https://github.com/apache/seatunnel/pull/9749#discussion_r2302791577


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -248,25 +241,25 @@ private void startPendingJobScheduleThread() {
     }
 
     private void pendingJobSchedule() throws InterruptedException {
-        JobMaster jobMaster = pendingJob.peekBlocking();
-        if (Objects.isNull(jobMaster)) {
-            // This situation almost never happens because pendingJobSchedule 
is single-threaded
-            logger.warning("The peek job master is null");
+        PendingJobInfo pendingJobInfo = pendingJobQueue.peekBlocking();
+        if (Objects.isNull(pendingJobInfo)) {
+            logger.warning("The peek job info is null");
             Thread.sleep(3000);
             return;
         }
 
-        Long jobId = jobMaster.getJobId();
+        Long jobId = pendingJobInfo.getJobId();
+        JobMaster jobMaster = pendingJobInfo.getJobMaster();
 
-        if (!pendingJobMasterMap.containsKey(jobId)) {
+        if (!pendingJobQueue.containsJobId(jobId)) {
             logger.fine(String.format("Job ID : %s already cancelled", jobId));
-            queueRemove(jobMaster);

Review Comment:
   peekBlocking() only retrieves the data in the queue and does not remove it. 
Here, the operation of removing its cancelled tasks is required. @Hisoka-X 
@hawk9821 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to