ruanwenjun commented on code in PR #17605:
URL: 
https://github.com/apache/dolphinscheduler/pull/17605#discussion_r2465019872


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java:
##########
@@ -149,43 +150,67 @@ public void run() {
     }
 
     private CompletableFuture<IWorkflowExecutionRunnable> 
bootstrapCommand(Command command) {
-        return supplyAsync(
-                () -> 
workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command), 
commandHandleThreadPool);
+        return supplyAsync(() -> {
+            LogUtils.setWorkflowInstanceIdMDC(command.getWorkflowInstanceId());
+            try {
+                IWorkflowExecutionRunnable result =
+                        
workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command);
+                return result;
+            } finally {
+                LogUtils.removeWorkflowInstanceIdMDC();
+            }
+        }, commandHandleThreadPool);
     }
 
     private CompletableFuture<Void> 
bootstrapWorkflowExecutionRunnable(IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
         final WorkflowInstance workflowInstance =
                 
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance();
-        if (workflowInstance.getState() == 
WorkflowExecutionStatus.SERIAL_WAIT) {
-            log.info("The workflow {} state is: {} will not be trigger now",
-                    workflowInstance.getName(),
-                    workflowInstance.getState());
+
+        LogUtils.setWorkflowInstanceIdMDC(workflowInstance.getId());

Review Comment:
   You have set MDC in `bootstrapCommand`, is it needed to set here?



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskEngineDelegator.java:
##########
@@ -74,6 +74,18 @@ public void ackPhysicalTaskExecutorLifecycleEventACK(final 
ITaskExecutorLifecycl
         
physicalTaskExecutorEventReporter.receiveTaskExecutorLifecycleEventACK(taskExecutorLifecycleEventAck);
     }
 
+    public int getWorkflowInstanceId(final int taskInstanceId) {
+        final Optional<ITaskExecutor> taskExecutorOptional = 
physicalTaskExecutorRepository.get(taskInstanceId);
+        if (!taskExecutorOptional.isPresent()) {
+            return 0;
+        }
+        final TaskExecutionContext taskExecutionContext = 
taskExecutorOptional.get().getTaskExecutionContext();
+        if (taskExecutionContext == null) {
+            return 0;
+        }
+        return taskExecutionContext.getWorkflowInstanceId();
+    }

Review Comment:
   Why add `getWorkflowInstanceId` method here, the ExecutorDelegator don't 
care this.



##########
dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java:
##########
@@ -25,26 +25,40 @@ public class TaskExecutorMDCUtils {
 
     private static final String TASK_INSTANCE_ID_MDC_KEY = "taskInstanceId";
     private static final String TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY = 
"taskInstanceLogFullPath";
+    private static final String WORKFLOW_INSTANCE_ID_MDC_KEY = 
"workflowInstanceId";
 
     public static MDCAutoClosable logWithMDC(final ITaskExecutor taskExecutor) 
{
-        return logWithMDC(taskExecutor.getId(), 
taskExecutor.getTaskExecutionContext().getLogPath());
+        return logWithMDC(taskExecutor.getId(),
+                taskExecutor.getTaskExecutionContext().getLogPath(),
+                taskExecutor.getTaskExecutionContext() == null ? 0
+                        : 
taskExecutor.getTaskExecutionContext().getWorkflowInstanceId());
     }
 
     public static MDCAutoClosable logWithMDC(final int taskInstanceId) {
-        return logWithMDC(taskInstanceId, null);
+        return logWithMDC(taskInstanceId, null, 0);
     }
 
-    public static MDCAutoClosable logWithMDC(final int taskInstanceId, final 
String logPath) {
+    public static MDCAutoClosable logWithMDC(final int taskInstanceId, final 
int workflowInstanceId) {
+        return logWithMDC(taskInstanceId, null, workflowInstanceId);
+    }
+
+    public static MDCAutoClosable logWithMDC(final int taskInstanceId, final 
String logPath,
+                                             final int workflowInstanceId) {
 
         if (logPath != null) {
             MDC.put(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY, logPath);
         }
 
+        if (workflowInstanceId > 0) {
+            MDC.put(WORKFLOW_INSTANCE_ID_MDC_KEY, 
String.valueOf(workflowInstanceId));
+        }

Review Comment:
   If workflowInstanceId might not exist, please use Integer here



##########
dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java:
##########
@@ -25,26 +25,40 @@ public class TaskExecutorMDCUtils {
 
     private static final String TASK_INSTANCE_ID_MDC_KEY = "taskInstanceId";
     private static final String TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY = 
"taskInstanceLogFullPath";
+    private static final String WORKFLOW_INSTANCE_ID_MDC_KEY = 
"workflowInstanceId";
 
     public static MDCAutoClosable logWithMDC(final ITaskExecutor taskExecutor) 
{
-        return logWithMDC(taskExecutor.getId(), 
taskExecutor.getTaskExecutionContext().getLogPath());
+        return logWithMDC(taskExecutor.getId(),
+                taskExecutor.getTaskExecutionContext().getLogPath(),
+                taskExecutor.getTaskExecutionContext() == null ? 0
+                        : 
taskExecutor.getTaskExecutionContext().getWorkflowInstanceId());
     }
 
     public static MDCAutoClosable logWithMDC(final int taskInstanceId) {
-        return logWithMDC(taskInstanceId, null);
+        return logWithMDC(taskInstanceId, null, 0);
     }
 
-    public static MDCAutoClosable logWithMDC(final int taskInstanceId, final 
String logPath) {
+    public static MDCAutoClosable logWithMDC(final int taskInstanceId, final 
int workflowInstanceId) {
+        return logWithMDC(taskInstanceId, null, workflowInstanceId);
+    }
+
+    public static MDCAutoClosable logWithMDC(final int taskInstanceId, final 
String logPath,
+                                             final int workflowInstanceId) {
 
         if (logPath != null) {
             MDC.put(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY, logPath);
         }
 
+        if (workflowInstanceId > 0) {
+            MDC.put(WORKFLOW_INSTANCE_ID_MDC_KEY, 
String.valueOf(workflowInstanceId));
+        }

Review Comment:
   ```suggestion
               MDC.put(WORKFLOW_INSTANCE_ID_MDC_KEY, 
String.valueOf(workflowInstanceId));
   ```



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/PhysicalTaskExecutorOperatorImpl.java:
##########
@@ -46,10 +47,20 @@ public class PhysicalTaskExecutorOperatorImpl implements 
IPhysicalTaskExecutorOp
 
     @Override
     public TaskExecutorDispatchResponse dispatchTask(final 
TaskExecutorDispatchRequest taskExecutorDispatchRequest) {
-        log.info("Receive TaskExecutorDispatchResponse: {}", 
taskExecutorDispatchRequest);
         final TaskExecutionContext taskExecutionContext = 
taskExecutorDispatchRequest.getTaskExecutionContext();
-        try {
+        final int taskInstanceId = taskExecutionContext.getTaskInstanceId();
+        final int workflowInstanceId = 
taskExecutionContext.getWorkflowInstanceId();
+
+        try (
+                TaskExecutorMDCUtils.MDCAutoClosable _ignore1 =
+                        TaskExecutorMDCUtils.logWithMDC(taskInstanceId, 
workflowInstanceId)) {
+
+            log.info("Receive TaskExecutorDispatchResponse: {}", 
taskExecutorDispatchRequest);
             
physicalTaskEngineDelegator.dispatchLogicTask(taskExecutionContext);
+
+            // Reset MDC, because dispatchLogicTask will clear MDC internally
+            TaskExecutorMDCUtils.logWithMDC(taskInstanceId, 
workflowInstanceId);

Review Comment:
   If so, you should remove MDC in `physicalTaskEngineDelegator`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to