Hisoka-X commented on code in PR #7693:
URL: https://github.com/apache/seatunnel/pull/7693#discussion_r1798940373


##########
seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java:
##########
@@ -434,7 +434,7 @@ public void testGetJobInfo() {
             final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
             CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
             long jobId = clientJobProxy.getJobId();
-
+            Thread.sleep(1000);

Review Comment:
   Please use
   ```
   await().atMost(10, TimeUnit.SECONDS)
                       .untilAsserted(()-> {
   Assertions.assertNotNull(jobClient.getJobInfo(jobId));
   })
   ```



##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobStatusRunner.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client.job;
+
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+@Slf4j
+public class JobStatusRunner implements Runnable {
+    private final JobClient jobClient;
+    private final Long jobId;
+    private final AtomicReference<String> atomicReference;
+
+    public JobStatusRunner(
+            JobClient jobClient, Long jobId, AtomicReference<String> 
atomicReference) {
+        this.jobClient = jobClient;
+        this.jobId = jobId;
+        this.atomicReference = atomicReference;
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName("job-status-runner-" + jobId);
+        try {
+            String jobStatus = jobClient.getJobStatus(jobId);
+            String lastJobStatus = atomicReference.get();
+            if (lastJobStatus == null
+                    || lastJobStatus.equals(JobStatus.PENDING.toString())
+                    || !lastJobStatus.equals(jobStatus)) {
+                atomicReference.set(jobStatus);
+                log.info("Job status: {}", jobStatus);
+            }

Review Comment:
   Rethinking it, can we only log when the status is `pending` and when 
`pending` switches to other status? I think this will make the logs look 
simpler.
   ```
   Current job status: PENDING, waiting for resource application
   Current job status: PENDING, waiting for resource application.
   Current job status: PENDING, waiting for resource application.
   
   Current job status: RUNNING, resource application succeed.
   ```
   
   If we never catch `PENDING` status, we will never print log.
   



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.utils;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PeekBlockingQueue<E> {

Review Comment:
   Share some design and why we need this queue? What's different between this 
and other java queue?
   
   And, we need test case for this queue. It is important.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -195,6 +215,129 @@ public CoordinatorService(
         masterActiveListener = Executors.newSingleThreadScheduledExecutor();
         masterActiveListener.scheduleAtFixedRate(
                 this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
+        scheduleStrategy = engineConfig.getScheduleStrategy();
+        isWaitStrategy = scheduleStrategy.equals(ScheduleStrategy.WAIT);
+        logger.info("Start pending job schedule thread");
+        // start pending job schedule thread
+        startPendingJobScheduleThread();
+    }
+
+    private void startPendingJobScheduleThread() {
+        Runnable pendingJobScheduleTask =
+                () -> {
+                    
Thread.currentThread().setName("pending-job-schedule-runner");
+                    while (true) {
+                        try {
+                            pendingJobSchedule();
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        } finally {
+                            pendingJob.release();
+                        }
+                    }
+                };
+        executorService.submit(pendingJobScheduleTask);
+    }
+
+    private void pendingJobSchedule() throws InterruptedException {
+        JobMaster jobMaster = pendingJob.peekBlocking();
+        if (Objects.isNull(jobMaster)) {
+            // This situation almost never happens because pendingJobSchedule 
is single-threaded
+            logger.warning("The peek job master is null");
+            return;
+        }
+        logger.info(
+                String.format(
+                        "Start pending job schedule, pendingJob Size : %s", 
pendingJob.size()));
+
+        Long jobId = jobMaster.getJobId();
+
+        logger.info(
+                String.format(
+                        "Start calculating whether pending task resources are 
enough: %s", jobId));

Review Comment:
   Please change this two log level to debug.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -195,6 +215,129 @@ public CoordinatorService(
         masterActiveListener = Executors.newSingleThreadScheduledExecutor();
         masterActiveListener.scheduleAtFixedRate(
                 this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
+        scheduleStrategy = engineConfig.getScheduleStrategy();
+        isWaitStrategy = scheduleStrategy.equals(ScheduleStrategy.WAIT);
+        logger.info("Start pending job schedule thread");
+        // start pending job schedule thread
+        startPendingJobScheduleThread();
+    }
+
+    private void startPendingJobScheduleThread() {
+        Runnable pendingJobScheduleTask =
+                () -> {
+                    
Thread.currentThread().setName("pending-job-schedule-runner");
+                    while (true) {
+                        try {
+                            pendingJobSchedule();
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        } finally {
+                            pendingJob.release();
+                        }
+                    }
+                };
+        executorService.submit(pendingJobScheduleTask);
+    }
+
+    private void pendingJobSchedule() throws InterruptedException {
+        JobMaster jobMaster = pendingJob.peekBlocking();
+        if (Objects.isNull(jobMaster)) {
+            // This situation almost never happens because pendingJobSchedule 
is single-threaded

Review Comment:
   If happend, we should add sleep to avoid cpu overload.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingSourceState.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.execution;
+
+public enum PendingSourceState {

Review Comment:
   Add some comment on `PendingSourceState`? What's used for?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java:
##########
@@ -32,47 +36,73 @@
 
 public class ResourceUtils {
 
+    private static final ILogger LOGGER = 
Logger.getLogger(ResourceUtils.class);
+
     public static void applyResourceForPipeline(
-            @NonNull ResourceManager resourceManager, @NonNull SubPlan 
subPlan) {
+            @NonNull JobMaster jobMaster, @NonNull SubPlan subPlan) {
+
         Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new 
HashMap<>();
         Map<TaskGroupLocation, SlotProfile> slotProfiles = new HashMap<>();
-        // TODO If there is no enough resources for tasks, we need add some 
wait profile
-        subPlan.getCoordinatorVertexList()
-                .forEach(
-                        coordinator ->
-                                futures.put(
-                                        coordinator.getTaskGroupLocation(),
-                                        applyResourceForTask(
-                                                resourceManager, coordinator, 
subPlan.getTags())));
+        Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
preApplyResourceFutures =
+                jobMaster.getPhysicalPlan().getPreApplyResourceFutures();
 
-        subPlan.getPhysicalVertexList()
-                .forEach(
-                        task ->
-                                futures.put(
-                                        task.getTaskGroupLocation(),
-                                        applyResourceForTask(
-                                                resourceManager, task, 
subPlan.getTags())));
+        // TODO If there is no enough resources for tasks, we need add some 
wait profile
+        applyResources(subPlan, futures, preApplyResourceFutures);

Review Comment:
   ```suggestion
           allocateResources(subPlan, futures, preApplyResourceFutures);
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -195,6 +215,129 @@ public CoordinatorService(
         masterActiveListener = Executors.newSingleThreadScheduledExecutor();
         masterActiveListener.scheduleAtFixedRate(
                 this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
+        scheduleStrategy = engineConfig.getScheduleStrategy();
+        isWaitStrategy = scheduleStrategy.equals(ScheduleStrategy.WAIT);
+        logger.info("Start pending job schedule thread");
+        // start pending job schedule thread
+        startPendingJobScheduleThread();
+    }
+
+    private void startPendingJobScheduleThread() {
+        Runnable pendingJobScheduleTask =
+                () -> {
+                    
Thread.currentThread().setName("pending-job-schedule-runner");
+                    while (true) {
+                        try {
+                            pendingJobSchedule();
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        } finally {
+                            pendingJob.release();
+                        }
+                    }
+                };
+        executorService.submit(pendingJobScheduleTask);
+    }
+
+    private void pendingJobSchedule() throws InterruptedException {
+        JobMaster jobMaster = pendingJob.peekBlocking();
+        if (Objects.isNull(jobMaster)) {
+            // This situation almost never happens because pendingJobSchedule 
is single-threaded
+            logger.warning("The peek job master is null");
+            return;
+        }
+        logger.info(
+                String.format(
+                        "Start pending job schedule, pendingJob Size : %s", 
pendingJob.size()));
+
+        Long jobId = jobMaster.getJobId();
+
+        logger.info(
+                String.format(
+                        "Start calculating whether pending task resources are 
enough: %s", jobId));
+
+        boolean preApplyResources = jobMaster.preApplyResources();
+        if (!preApplyResources) {
+            logger.info(
+                    String.format(
+                            "Current strategy is %s, and resources is not 
enough, skipping this schedule, JobID: %s",
+                            scheduleStrategy, jobId));
+            if (isWaitStrategy) {
+                try {
+                    Thread.sleep(3000);
+                } catch (InterruptedException e) {
+                    logger.severe(ExceptionUtils.getMessage(e));
+                }
+                return;
+            } else {
+                queueRemove(jobMaster);
+                completeFailJob(jobMaster);
+                return;
+            }
+        }
+
+        logger.info(String.format("Resources enough, start running: %s", 
jobId));
+
+        queueRemove(jobMaster);
+
+        PendingSourceState pendingSourceState = 
pendingJobMasterMap.get(jobId)._1;
+
+        MDCExecutorService mdcExecutorService = MDCTracer.tracing(jobId, 
executorService);
+        mdcExecutorService.submit(
+                () -> {
+                    try {
+                        String jobFullName = 
jobMaster.getPhysicalPlan().getJobFullName();
+                        JobStatus jobStatus = (JobStatus) 
runningJobStateIMap.get(jobId);
+                        if (pendingSourceState == PendingSourceState.RESTORE) {
+                            jobMaster
+                                    .getPhysicalPlan()
+                                    .getPipelineList()
+                                    .forEach(SubPlan::restorePipelineState);
+                        }
+                        logger.info(
+                                String.format(
+                                        "The %s %s is in %s state, restore 
pipeline and take over this job running",
+                                        pendingSourceState, jobFullName, 
jobStatus));
+
+                        pendingJobMasterMap.remove(jobId);
+                        runningJobMasterMap.put(jobId, jobMaster);
+                        jobMaster.run();
+                    } finally {
+                        if (jobMasterCompletedSuccessfully(jobMaster, 
pendingSourceState)) {
+                            runningJobMasterMap.remove(jobId);
+                        }
+                    }
+                });
+    }
+
+    private void queueRemove(JobMaster jobMaster) throws InterruptedException {
+        JobMaster take = pendingJob.take();
+        if (take != jobMaster) {
+            logger.warning("The job master is not equal to the peek job 
master");

Review Comment:
   ```suggestion
               logger.error("The job master is not equal to the peek job 
master");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to