ruanwenjun commented on code in PR #10667:
URL: https://github.com/apache/dolphinscheduler/pull/10667#discussion_r910122841
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java:
##########
@@ -164,38 +172,53 @@ private void scheduleWorkflow() throws
InterruptedException {
List<ProcessInstance> processInstances =
command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
+ // indicate that the command transform to processInstance error,
sleep for 1s
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return;
}
MasterServerMetrics.incMasterConsumeCommand(commands.size());
for (ProcessInstance processInstance : processInstances) {
- try {
- LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
- logger.info("Master schedule service starting workflow
instance");
- final WorkflowExecuteRunnable workflowExecuteRunnable = new
WorkflowExecuteRunnable(
- processInstance
- , processService
- , nettyExecutorManager
- , processAlertManager
- , masterConfig
- , stateWheelExecuteThread
- , curingGlobalParamsService);
-
-
this.processInstanceExecCacheManager.cache(processInstance.getId(),
workflowExecuteRunnable);
- if (processInstance.getTimeout() > 0) {
-
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
- }
- ProcessInstanceMetrics.incProcessInstanceSubmit();
- workflowExecuteThreadPool.submit(workflowExecuteRunnable);
- logger.info("Master schedule service started workflow
instance");
-
- } catch (Exception ex) {
-
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
- logger.info("Master submit workflow to thread pool failed,
will remove workflow runnable from cache manager", ex);
- } finally {
- LoggerUtils.removeWorkflowInstanceIdMDC();
+ submitProcessInstance(processInstance);
+ }
+ }
+
+ private void submitProcessInstance(@NonNull ProcessInstance
processInstance) {
+ try {
+ LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+ logger.info("Master schedule service starting workflow instance");
+ final WorkflowExecuteRunnable workflowExecuteRunnable = new
WorkflowExecuteRunnable(
+ processInstance
+ , processService
+ , nettyExecutorManager
+ , processAlertManager
+ , masterConfig
+ , stateWheelExecuteThread
+ , curingGlobalParamsService);
+
+
this.processInstanceExecCacheManager.cache(processInstance.getId(),
workflowExecuteRunnable);
+ if (processInstance.getTimeout() > 0) {
+
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
+ ProcessInstanceMetrics.incProcessInstanceSubmit();
+ CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture =
CompletableFuture.supplyAsync(
+ workflowExecuteRunnable::call, workflowExecuteThreadPool);
+ workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
+ if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
+ // submit failed
+
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+ submitFailedProcessInstances.add(processInstance);
+ }
+ });
+ logger.info("Master schedule service started workflow instance");
Review Comment:
We already use [MDC](https://logback.qos.ch/manual/mdc.html) to set the
processInstanceId by
`LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());` so the log
will bring this id, we don't need to set this in every log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]