caishunfeng commented on code in PR #10667:
URL: https://github.com/apache/dolphinscheduler/pull/10667#discussion_r909999242


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java:
##########
@@ -164,38 +172,53 @@ private void scheduleWorkflow() throws 
InterruptedException {
 
         List<ProcessInstance> processInstances = 
command2ProcessInstance(commands);
         if (CollectionUtils.isEmpty(processInstances)) {
+            // indicate that the command transform to processInstance error, 
sleep for 1s
+            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             return;
         }
         MasterServerMetrics.incMasterConsumeCommand(commands.size());
 
         for (ProcessInstance processInstance : processInstances) {
-            try {
-                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-                logger.info("Master schedule service starting workflow 
instance");
-                final WorkflowExecuteRunnable workflowExecuteRunnable = new 
WorkflowExecuteRunnable(
-                    processInstance
-                    , processService
-                    , nettyExecutorManager
-                    , processAlertManager
-                    , masterConfig
-                    , stateWheelExecuteThread
-                    , curingGlobalParamsService);
-
-                
this.processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowExecuteRunnable);
-                if (processInstance.getTimeout() > 0) {
-                    
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
-                }
-                ProcessInstanceMetrics.incProcessInstanceSubmit();
-                workflowExecuteThreadPool.submit(workflowExecuteRunnable);
-                logger.info("Master schedule service started workflow 
instance");
-
-            } catch (Exception ex) {
-                
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-                
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
-                logger.info("Master submit workflow to thread pool failed, 
will remove workflow runnable from cache manager", ex);
-            } finally {
-                LoggerUtils.removeWorkflowInstanceIdMDC();
+            submitProcessInstance(processInstance);
+        }
+    }
+
+    private void submitProcessInstance(@NonNull ProcessInstance 
processInstance) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            logger.info("Master schedule service starting workflow instance");
+            final WorkflowExecuteRunnable workflowExecuteRunnable = new 
WorkflowExecuteRunnable(
+                processInstance
+                , processService
+                , nettyExecutorManager
+                , processAlertManager
+                , masterConfig
+                , stateWheelExecuteThread
+                , curingGlobalParamsService);
+
+            
this.processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowExecuteRunnable);
+            if (processInstance.getTimeout() > 0) {
+                
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
             }
+            ProcessInstanceMetrics.incProcessInstanceSubmit();
+            CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = 
CompletableFuture.supplyAsync(
+                workflowExecuteRunnable::call, workflowExecuteThreadPool);
+            workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
+                if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
+                    // submit failed
+                    
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+                    
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+                    submitFailedProcessInstances.add(processInstance);
+                }
+            });
+            logger.info("Master schedule service started workflow instance");

Review Comment:
   ```suggestion
               logger.info("Master schedule service started workflow instance, 
id:{}, name:{} ", processInstance.getId(), processInstance.getName());
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java:
##########
@@ -164,38 +172,53 @@ private void scheduleWorkflow() throws 
InterruptedException {
 
         List<ProcessInstance> processInstances = 
command2ProcessInstance(commands);
         if (CollectionUtils.isEmpty(processInstances)) {
+            // indicate that the command transform to processInstance error, 
sleep for 1s
+            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             return;
         }
         MasterServerMetrics.incMasterConsumeCommand(commands.size());
 
         for (ProcessInstance processInstance : processInstances) {
-            try {
-                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-                logger.info("Master schedule service starting workflow 
instance");
-                final WorkflowExecuteRunnable workflowExecuteRunnable = new 
WorkflowExecuteRunnable(
-                    processInstance
-                    , processService
-                    , nettyExecutorManager
-                    , processAlertManager
-                    , masterConfig
-                    , stateWheelExecuteThread
-                    , curingGlobalParamsService);
-
-                
this.processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowExecuteRunnable);
-                if (processInstance.getTimeout() > 0) {
-                    
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
-                }
-                ProcessInstanceMetrics.incProcessInstanceSubmit();
-                workflowExecuteThreadPool.submit(workflowExecuteRunnable);
-                logger.info("Master schedule service started workflow 
instance");
-
-            } catch (Exception ex) {
-                
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-                
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
-                logger.info("Master submit workflow to thread pool failed, 
will remove workflow runnable from cache manager", ex);
-            } finally {
-                LoggerUtils.removeWorkflowInstanceIdMDC();
+            submitProcessInstance(processInstance);
+        }
+    }
+
+    private void submitProcessInstance(@NonNull ProcessInstance 
processInstance) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            logger.info("Master schedule service starting workflow instance");

Review Comment:
   add processInstance id or name



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java:
##########
@@ -164,38 +172,53 @@ private void scheduleWorkflow() throws 
InterruptedException {
 
         List<ProcessInstance> processInstances = 
command2ProcessInstance(commands);
         if (CollectionUtils.isEmpty(processInstances)) {
+            // indicate that the command transform to processInstance error, 
sleep for 1s
+            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             return;
         }
         MasterServerMetrics.incMasterConsumeCommand(commands.size());
 
         for (ProcessInstance processInstance : processInstances) {
-            try {
-                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-                logger.info("Master schedule service starting workflow 
instance");
-                final WorkflowExecuteRunnable workflowExecuteRunnable = new 
WorkflowExecuteRunnable(
-                    processInstance
-                    , processService
-                    , nettyExecutorManager
-                    , processAlertManager
-                    , masterConfig
-                    , stateWheelExecuteThread
-                    , curingGlobalParamsService);
-
-                
this.processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowExecuteRunnable);
-                if (processInstance.getTimeout() > 0) {
-                    
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
-                }
-                ProcessInstanceMetrics.incProcessInstanceSubmit();
-                workflowExecuteThreadPool.submit(workflowExecuteRunnable);
-                logger.info("Master schedule service started workflow 
instance");
-
-            } catch (Exception ex) {
-                
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-                
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
-                logger.info("Master submit workflow to thread pool failed, 
will remove workflow runnable from cache manager", ex);
-            } finally {
-                LoggerUtils.removeWorkflowInstanceIdMDC();
+            submitProcessInstance(processInstance);
+        }
+    }
+
+    private void submitProcessInstance(@NonNull ProcessInstance 
processInstance) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            logger.info("Master schedule service starting workflow instance");
+            final WorkflowExecuteRunnable workflowExecuteRunnable = new 
WorkflowExecuteRunnable(
+                processInstance
+                , processService
+                , nettyExecutorManager
+                , processAlertManager
+                , masterConfig
+                , stateWheelExecuteThread
+                , curingGlobalParamsService);
+
+            
this.processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowExecuteRunnable);
+            if (processInstance.getTimeout() > 0) {
+                
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
             }
+            ProcessInstanceMetrics.incProcessInstanceSubmit();
+            CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = 
CompletableFuture.supplyAsync(
+                workflowExecuteRunnable::call, workflowExecuteThreadPool);
+            workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
+                if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
+                    // submit failed
+                    
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+                    
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+                    submitFailedProcessInstances.add(processInstance);
+                }
+            });
+            logger.info("Master schedule service started workflow instance");
+
+        } catch (Exception ex) {
+            
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+            
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+            logger.info("Master submit workflow to thread pool failed, will 
remove workflow runnable from cache manager", ex);

Review Comment:
   add processInstance id or name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to