ruanwenjun commented on code in PR #10667:
URL: https://github.com/apache/dolphinscheduler/pull/10667#discussion_r910122841


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java:
##########
@@ -164,38 +172,53 @@ private void scheduleWorkflow() throws 
InterruptedException {
 
         List<ProcessInstance> processInstances = 
command2ProcessInstance(commands);
         if (CollectionUtils.isEmpty(processInstances)) {
+            // indicate that the command transform to processInstance error, 
sleep for 1s
+            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             return;
         }
         MasterServerMetrics.incMasterConsumeCommand(commands.size());
 
         for (ProcessInstance processInstance : processInstances) {
-            try {
-                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-                logger.info("Master schedule service starting workflow 
instance");
-                final WorkflowExecuteRunnable workflowExecuteRunnable = new 
WorkflowExecuteRunnable(
-                    processInstance
-                    , processService
-                    , nettyExecutorManager
-                    , processAlertManager
-                    , masterConfig
-                    , stateWheelExecuteThread
-                    , curingGlobalParamsService);
-
-                
this.processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowExecuteRunnable);
-                if (processInstance.getTimeout() > 0) {
-                    
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
-                }
-                ProcessInstanceMetrics.incProcessInstanceSubmit();
-                workflowExecuteThreadPool.submit(workflowExecuteRunnable);
-                logger.info("Master schedule service started workflow 
instance");
-
-            } catch (Exception ex) {
-                
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-                
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
-                logger.info("Master submit workflow to thread pool failed, 
will remove workflow runnable from cache manager", ex);
-            } finally {
-                LoggerUtils.removeWorkflowInstanceIdMDC();
+            submitProcessInstance(processInstance);
+        }
+    }
+
+    private void submitProcessInstance(@NonNull ProcessInstance 
processInstance) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            logger.info("Master schedule service starting workflow instance");
+            final WorkflowExecuteRunnable workflowExecuteRunnable = new 
WorkflowExecuteRunnable(
+                processInstance
+                , processService
+                , nettyExecutorManager
+                , processAlertManager
+                , masterConfig
+                , stateWheelExecuteThread
+                , curingGlobalParamsService);
+
+            
this.processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowExecuteRunnable);
+            if (processInstance.getTimeout() > 0) {
+                
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
             }
+            ProcessInstanceMetrics.incProcessInstanceSubmit();
+            CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = 
CompletableFuture.supplyAsync(
+                workflowExecuteRunnable::call, workflowExecuteThreadPool);
+            workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
+                if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
+                    // submit failed
+                    
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+                    
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+                    submitFailedProcessInstances.add(processInstance);
+                }
+            });
+            logger.info("Master schedule service started workflow instance");

Review Comment:
   We already use MDC to set the processInstanceId by 
`LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());` so the log 
will bring this id, we don't need to set this in every log. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to