Hisoka-X commented on code in PR #7693:
URL: https://github.com/apache/seatunnel/pull/7693#discussion_r1794577718


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -195,6 +217,89 @@ public CoordinatorService(
         masterActiveListener = Executors.newSingleThreadScheduledExecutor();
         masterActiveListener.scheduleAtFixedRate(
                 this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
+        isJobPending =
+                
engineConfig.getScheduleStrategy().equals(ScheduleStrategy.WAIT) && 
!dynamicSlot;

Review Comment:
   Repeatedly checking `dynamicSlot` has become a burden. When checking the 
configuration, if users configure `dynamicSlot = true` and 
`ScheduleStrategy.WAIT` at the same time, it may be a better way to tell 
`ScheduleStrategy.WAIT` that it cannot take effect. Then roll back 
ScheduleStrategy to `REJECT`



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -343,6 +346,77 @@ public void initStateFuture() {
                         }));
     }
 
+    /**
+     * Apply for resources
+     *
+     * @return true if apply resources successfully, otherwise false
+     */
+    public boolean preApplyResources() {
+        Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
preApplyResourceFutures =
+                new HashMap<>();
+        for (SubPlan subPlan : physicalPlan.getPipelineList()) {
+            Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
coordinatorFutures =
+                    new HashMap<>();
+            subPlan.getCoordinatorVertexList()
+                    .forEach(
+                            coordinator ->
+                                    coordinatorFutures.put(
+                                            coordinator.getTaskGroupLocation(),
+                                            ResourceUtils.applyResourceForTask(
+                                                    resourceManager,
+                                                    coordinator,
+                                                    subPlan.getTags())));
+
+            Map<TaskGroupLocation, CompletableFuture<SlotProfile>> taskFutures 
= new HashMap<>();
+            subPlan.getPhysicalVertexList()
+                    .forEach(
+                            task ->
+                                    taskFutures.put(
+                                            task.getTaskGroupLocation(),
+                                            ResourceUtils.applyResourceForTask(
+                                                    resourceManager, task, 
subPlan.getTags())));
+
+            preApplyResourceFutures.putAll(coordinatorFutures);
+            preApplyResourceFutures.putAll(taskFutures);
+        }
+
+        boolean enoughResource =
+                preApplyResourceFutures.values().stream()
+                                .filter(
+                                        value -> {
+                                            try {
+                                                return value != null && 
value.join() != null;
+                                            } catch (CompletionException e) {
+                                                return false;
+                                            }
+                                        })
+                                .count()
+                        == preApplyResourceFutures.size();
+
+        if (enoughResource) {
+            // Adequate resources, pass on resources to the plan
+            physicalPlan.setPreApplyResourceFutures(preApplyResourceFutures);
+        } else {
+            // Release the resource that has been applied
+            resourceManager

Review Comment:
   Please add retry when release resource, please refer 
https://github.com/apache/seatunnel/blob/749b2fe364bcca33ce4cf4e69c4863c0853c278b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java#L502-L523



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -519,7 +668,19 @@ && getJobHistoryService().getJobMetrics(jobId) != null) {
                     }
                     if (!jobSubmitFuture.isCompletedExceptionally()) {
                         try {
-                            jobMaster.run();
+                            if (isJobPending) {
+                                if (pendingJob.isEmpty() && 
jobMaster.preApplyResources()) {
+                                    pendingJobMasterMap.remove(jobId);
+                                    runningJobMasterMap.put(jobId, jobMaster);
+                                    jobMaster.run();
+                                } else {
+                                    pendingJob.add(new Tuple2<>(jobId, 
jobMaster));
+                                    
jobMaster.getPhysicalPlan().updateJobState(JobStatus.PENDING);
+                                    logger.info("Resources not enough, enter 
the pending queue");

Review Comment:
   please add job id and job name into log.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java:
##########
@@ -141,6 +141,61 @@ private List<ResourceProfile> stillNeedRequestResource() {
         return needRequestResource;
     }
 
+    /**
+     * Check if the worker has enough resources to provide the required 
resources
+     *
+     * @return true if the worker has enough resources to provide the required 
resources
+     */
+    public List<WorkerProfile> isWorkerResourceEnough(List<WorkerProfile> 
workerProfiles) {

Review Comment:
   useless?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -173,10 +187,18 @@ public class CoordinatorService {
 
     private PassiveCompletableFuture restoreAllJobFromMasterNodeSwitchFuture;
 
+    private ConcurrentLinkedQueue<Tuple2<Long, JobMaster>> pendingJob =

Review Comment:
   Can we store jobid in jobmaster? Then invoke jobmaster::getJobId when we 
need jobid.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -536,6 +697,16 @@ && getJobHistoryService().getJobMetrics(jobId) != null) {
         return new PassiveCompletableFuture<>(jobSubmitFuture);
     }
 
+    private static void completeFailJob(JobMaster jobMaster) {
+        // If the pending queue is not enabled and resources are insufficient, 
stop the task from
+        // running
+        JobResult jobResult =
+                new JobResult(
+                        JobStatus.FAILING,

Review Comment:
   ```suggestion
                           JobStatus.FAILED,
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -195,6 +217,89 @@ public CoordinatorService(
         masterActiveListener = Executors.newSingleThreadScheduledExecutor();
         masterActiveListener.scheduleAtFixedRate(
                 this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
+        isJobPending =
+                
engineConfig.getScheduleStrategy().equals(ScheduleStrategy.WAIT) && 
!dynamicSlot;
+        if (isJobPending) {
+            logger.info("Start pending job schedule thread");
+            // start pending job schedule thread
+            startPendingJobScheduleThread();
+        }
+    }
+
+    private void startPendingJobScheduleThread() {
+        Runnable pendingJobScheduleTask =
+                () -> {
+                    
Thread.currentThread().setName("pending-job-schedule-runner");
+                    while (true) {
+                        if (pendingJob.isEmpty()) {
+                            try {
+                                Thread.sleep(3000);
+                            } catch (InterruptedException e) {
+                                logger.severe(ExceptionUtils.getMessage(e));
+                            }
+                        } else {
+                            pendingJobSchedule();
+                        }
+                    }
+                };
+        executorService.submit(pendingJobScheduleTask);
+    }
+
+    private void pendingJobSchedule() {
+        pendingJob.stream()
+                .findFirst()
+                .ifPresent(
+                        pendingJobEntry -> {
+                            Long jobId = pendingJobEntry._1;
+                            JobMaster jobMaster = pendingJobEntry._2;
+
+                            logger.info(
+                                    "Start calculating whether pending task 
resources are enough: "
+                                            + jobId);
+
+                            if (jobMaster.preApplyResources()) {
+                                logger.info("Resources enough, start running: 
" + jobId);
+                                pendingJob.poll();
+                                PendingSourceState pendingSourceState =
+                                        pendingJobMasterMap.get(jobId)._1;
+
+                                CompletableFuture.runAsync(
+                                        () -> {
+                                            try {
+                                                if (pendingSourceState
+                                                        == 
PendingSourceState.RESTORE) {
+                                                    jobMaster
+                                                            .getPhysicalPlan()
+                                                            .getPipelineList()
+                                                            
.forEach(SubPlan::restorePipelineState);
+                                                }
+                                                
pendingJobMasterMap.remove(jobId);
+                                                runningJobMasterMap.put(jobId, 
jobMaster);
+                                                jobMaster.run();
+                                            } finally {
+                                                if 
(jobMasterCompletedSuccessfully(
+                                                        jobMaster, 
pendingSourceState)) {
+                                                    
runningJobMasterMap.remove(jobId);
+                                                }
+                                            }
+                                        },
+                                        executorService);
+                            } else {
+                                logger.info("Resources not enough, waiting 
next time");

Review Comment:
   Please add more details about jobid, need slot count?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -566,7 +737,10 @@ public PassiveCompletableFuture<Void> savePoint(long 
jobId) {
     public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
         // must wait for all job restore complete
         restoreAllJobFromMasterNodeSwitchFuture.join();
-        JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
+        JobMaster runningJobMaster =
+                Optional.ofNullable(pendingJobMasterMap.get(jobId))
+                        .map(t -> t._2)
+                        .orElse(runningJobMasterMap.get(jobId));

Review Comment:
   please do refactor because this part show up not only once.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -369,30 +477,59 @@ private void restoreJobFromMasterActiveSwitch(@NonNull 
Long jobId, @NonNull JobI
         }
 
         String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
-        runningJobMasterMap.put(jobId, jobMaster);
-
+        if (isJobPending) {
+            pendingJobMasterMap.put(jobId, new 
Tuple2<>(PendingSourceState.RESTORE, jobMaster));
+        } else {
+            runningJobMasterMap.put(jobId, jobMaster);
+        }
         logger.info(
                 String.format(
                         "The restore %s is in %s state, restore pipeline and 
take over this job running",
                         jobFullName, jobStatus));
-        CompletableFuture.runAsync(
-                () -> {
-                    try {
-                        jobMaster
-                                .getPhysicalPlan()
-                                .getPipelineList()
-                                .forEach(SubPlan::restorePipelineState);
-                        jobMaster.run();
-                    } finally {
-                        // voidCompletableFuture will be cancelled when zeta 
master node
-                        // shutdown to simulate master failure,
-                        // don't update runningJobMasterMap is this case.
-                        if 
(!jobMaster.getJobMasterCompleteFuture().isCompletedExceptionally()) {
-                            runningJobMasterMap.remove(jobId);
+        // FIFO strategy,If there is a waiting task, the subsequent task will 
keep waiting
+        boolean canRunJob, preApplyResources;
+        if (isJobPending) {
+            preApplyResources = jobMaster.preApplyResources();
+            // The pending queue is empty and there are sufficient resources 
to run directly
+            canRunJob = (pendingJob.size() == 0 && preApplyResources);
+        } else {
+            // If it is a dynamic slot or the pending queue has not been 
started, run it directly
+            canRunJob = true;
+            preApplyResources = true;
+        }
+        if (canRunJob) {
+            CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            if (isJobPending) {
+                                pendingJobMasterMap.remove(jobId);
+                                runningJobMasterMap.put(jobId, jobMaster);
+                            }
+                            jobMaster
+                                    .getPhysicalPlan()
+                                    .getPipelineList()
+                                    .forEach(SubPlan::restorePipelineState);
+                            if (!dynamicSlot && !preApplyResources) {

Review Comment:
   I can not find when the `dynamicSlot` and `preApplyResources` both false 
when `canRunJob` is true.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -343,6 +346,77 @@ public void initStateFuture() {
                         }));
     }
 
+    /**
+     * Apply for resources
+     *
+     * @return true if apply resources successfully, otherwise false
+     */
+    public boolean preApplyResources() {
+        Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
preApplyResourceFutures =
+                new HashMap<>();
+        for (SubPlan subPlan : physicalPlan.getPipelineList()) {
+            Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
coordinatorFutures =
+                    new HashMap<>();
+            subPlan.getCoordinatorVertexList()
+                    .forEach(
+                            coordinator ->
+                                    coordinatorFutures.put(
+                                            coordinator.getTaskGroupLocation(),
+                                            ResourceUtils.applyResourceForTask(
+                                                    resourceManager,
+                                                    coordinator,
+                                                    subPlan.getTags())));
+
+            Map<TaskGroupLocation, CompletableFuture<SlotProfile>> taskFutures 
= new HashMap<>();
+            subPlan.getPhysicalVertexList()
+                    .forEach(
+                            task ->
+                                    taskFutures.put(
+                                            task.getTaskGroupLocation(),
+                                            ResourceUtils.applyResourceForTask(
+                                                    resourceManager, task, 
subPlan.getTags())));
+
+            preApplyResourceFutures.putAll(coordinatorFutures);
+            preApplyResourceFutures.putAll(taskFutures);
+        }
+
+        boolean enoughResource =
+                preApplyResourceFutures.values().stream()
+                                .filter(
+                                        value -> {
+                                            try {
+                                                return value != null && 
value.join() != null;
+                                            } catch (CompletionException e) {

Review Comment:
   Please add some log when catched Exception.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -195,6 +217,89 @@ public CoordinatorService(
         masterActiveListener = Executors.newSingleThreadScheduledExecutor();
         masterActiveListener.scheduleAtFixedRate(
                 this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
+        isJobPending =
+                
engineConfig.getScheduleStrategy().equals(ScheduleStrategy.WAIT) && 
!dynamicSlot;
+        if (isJobPending) {
+            logger.info("Start pending job schedule thread");
+            // start pending job schedule thread
+            startPendingJobScheduleThread();
+        }
+    }
+
+    private void startPendingJobScheduleThread() {
+        Runnable pendingJobScheduleTask =
+                () -> {
+                    
Thread.currentThread().setName("pending-job-schedule-runner");
+                    while (true) {
+                        if (pendingJob.isEmpty()) {
+                            try {
+                                Thread.sleep(3000);
+                            } catch (InterruptedException e) {
+                                logger.severe(ExceptionUtils.getMessage(e));
+                            }
+                        } else {
+                            pendingJobSchedule();
+                        }
+                    }
+                };
+        executorService.submit(pendingJobScheduleTask);
+    }
+
+    private void pendingJobSchedule() {
+        pendingJob.stream()
+                .findFirst()
+                .ifPresent(
+                        pendingJobEntry -> {
+                            Long jobId = pendingJobEntry._1;
+                            JobMaster jobMaster = pendingJobEntry._2;
+
+                            logger.info(
+                                    "Start calculating whether pending task 
resources are enough: "
+                                            + jobId);
+
+                            if (jobMaster.preApplyResources()) {
+                                logger.info("Resources enough, start running: 
" + jobId);
+                                pendingJob.poll();

Review Comment:
   We need to add a check to ensure that `pendingJob.stream().findFirst()` and 
`pendingJob.poll()` return the same jobmaster



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -195,6 +217,89 @@ public CoordinatorService(
         masterActiveListener = Executors.newSingleThreadScheduledExecutor();
         masterActiveListener.scheduleAtFixedRate(
                 this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
+        isJobPending =
+                
engineConfig.getScheduleStrategy().equals(ScheduleStrategy.WAIT) && 
!dynamicSlot;
+        if (isJobPending) {
+            logger.info("Start pending job schedule thread");
+            // start pending job schedule thread
+            startPendingJobScheduleThread();
+        }
+    }
+
+    private void startPendingJobScheduleThread() {
+        Runnable pendingJobScheduleTask =
+                () -> {
+                    
Thread.currentThread().setName("pending-job-schedule-runner");
+                    while (true) {
+                        if (pendingJob.isEmpty()) {
+                            try {
+                                Thread.sleep(3000);
+                            } catch (InterruptedException e) {
+                                logger.severe(ExceptionUtils.getMessage(e));
+                            }
+                        } else {
+                            pendingJobSchedule();
+                        }
+                    }
+                };
+        executorService.submit(pendingJobScheduleTask);
+    }
+
+    private void pendingJobSchedule() {
+        pendingJob.stream()
+                .findFirst()
+                .ifPresent(
+                        pendingJobEntry -> {
+                            Long jobId = pendingJobEntry._1;
+                            JobMaster jobMaster = pendingJobEntry._2;
+
+                            logger.info(
+                                    "Start calculating whether pending task 
resources are enough: "
+                                            + jobId);
+
+                            if (jobMaster.preApplyResources()) {
+                                logger.info("Resources enough, start running: 
" + jobId);
+                                pendingJob.poll();
+                                PendingSourceState pendingSourceState =
+                                        pendingJobMasterMap.get(jobId)._1;
+
+                                CompletableFuture.runAsync(
+                                        () -> {
+                                            try {
+                                                if (pendingSourceState
+                                                        == 
PendingSourceState.RESTORE) {
+                                                    jobMaster
+                                                            .getPhysicalPlan()
+                                                            .getPipelineList()
+                                                            
.forEach(SubPlan::restorePipelineState);
+                                                }
+                                                
pendingJobMasterMap.remove(jobId);
+                                                runningJobMasterMap.put(jobId, 
jobMaster);
+                                                jobMaster.run();
+                                            } finally {
+                                                if 
(jobMasterCompletedSuccessfully(
+                                                        jobMaster, 
pendingSourceState)) {
+                                                    
runningJobMasterMap.remove(jobId);
+                                                }
+                                            }
+                                        },
+                                        executorService);
+                            } else {
+                                logger.info("Resources not enough, waiting 
next time");
+                                try {
+                                    Thread.sleep(3000);
+                                } catch (InterruptedException e) {
+                                    
logger.severe(ExceptionUtils.getMessage(e));
+                                }
+                            }
+                        });
+    }
+
+    private boolean jobMasterCompletedSuccessfully(JobMaster jobMaster, 
PendingSourceState state) {
+        return 
(!jobMaster.getJobMasterCompleteFuture().isCompletedExceptionally()
+                        && state == PendingSourceState.RESTORE)
+                || (!jobMaster.getJobMasterCompleteFuture().isCancelled()
+                        && state == PendingSourceState.SUBMIT);

Review Comment:
   why change 
   ```java
   !jobMaster.getJobMasterCompleteFuture().isCompletedExceptionally()
   ```
   to this?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -369,30 +477,59 @@ private void restoreJobFromMasterActiveSwitch(@NonNull 
Long jobId, @NonNull JobI
         }
 
         String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
-        runningJobMasterMap.put(jobId, jobMaster);
-
+        if (isJobPending) {
+            pendingJobMasterMap.put(jobId, new 
Tuple2<>(PendingSourceState.RESTORE, jobMaster));
+        } else {
+            runningJobMasterMap.put(jobId, jobMaster);
+        }
         logger.info(
                 String.format(
                         "The restore %s is in %s state, restore pipeline and 
take over this job running",
                         jobFullName, jobStatus));
-        CompletableFuture.runAsync(
-                () -> {
-                    try {
-                        jobMaster
-                                .getPhysicalPlan()
-                                .getPipelineList()
-                                .forEach(SubPlan::restorePipelineState);
-                        jobMaster.run();
-                    } finally {
-                        // voidCompletableFuture will be cancelled when zeta 
master node
-                        // shutdown to simulate master failure,
-                        // don't update runningJobMasterMap is this case.
-                        if 
(!jobMaster.getJobMasterCompleteFuture().isCompletedExceptionally()) {
-                            runningJobMasterMap.remove(jobId);
+        // FIFO strategy,If there is a waiting task, the subsequent task will 
keep waiting
+        boolean canRunJob, preApplyResources;
+        if (isJobPending) {
+            preApplyResources = jobMaster.preApplyResources();

Review Comment:
   can we move all ScheduleStrategy apply resource into same place, now the 
different ScheduleStrategy and the places to apply for resources are not 
unified. When `isJobPending` is true, we apply resource in here, but when 
`isJobPending` is false, we apply resource in 
https://github.com/apache/seatunnel/pull/7693/files#diff-873557476f41c5c8e00cf79e501304f0c0709083d1d77662cacb11c9c08c201aR107



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to