This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f8e82c2a6a [Chore][Master] Supplemental workflow instance trace id in 
log (#17605)
f8e82c2a6a is described below

commit f8e82c2a6af5feda3643c6c1a897eced3562c91b
Author: [email protected] <[email protected]>
AuthorDate: Thu Oct 30 14:07:11 2025 +0800

    [Chore][Master] Supplemental workflow instance trace id in log (#17605)
---
 .../master/engine/command/CommandEngine.java       |  16 ++-
 .../master/rpc/TaskExecutorEventListenerImpl.java  | 138 +++++++++++++--------
 2 files changed, 100 insertions(+), 54 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
index be681eb1b1..5e84a24a76 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
 import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
 import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
 import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
@@ -129,10 +130,15 @@ public class CommandEngine extends BaseDaemonThread 
implements AutoCloseable {
 
                 List<CompletableFuture<Void>> allCompleteFutures = new 
ArrayList<>();
                 for (Command command : commands) {
-                    CompletableFuture<Void> completableFuture = 
bootstrapCommand(command)
+                    CompletableFuture<Void> completableFuture = supplyAsync(() 
-> {
+                        
LogUtils.setWorkflowInstanceIdMDC(command.getWorkflowInstanceId());
+                        return command;
+                    }, commandHandleThreadPool)
+                            .thenApply(this::bootstrapCommand)
                             
.thenAccept(this::bootstrapWorkflowExecutionRunnable)
                             .thenAccept((unused) -> bootstrapSuccess(command))
-                            .exceptionally(throwable -> 
bootstrapError(command, throwable));
+                            .exceptionally(throwable -> 
bootstrapError(command, throwable))
+                            .whenComplete((result, throwable) -> 
LogUtils.removeWorkflowInstanceIdMDC());
                     allCompleteFutures.add(completableFuture);
                 }
                 CompletableFuture.allOf(allCompleteFutures.toArray(new 
CompletableFuture[0])).join();
@@ -148,14 +154,14 @@ public class CommandEngine extends BaseDaemonThread 
implements AutoCloseable {
         }
     }
 
-    private CompletableFuture<IWorkflowExecutionRunnable> 
bootstrapCommand(Command command) {
-        return supplyAsync(
-                () -> 
workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command), 
commandHandleThreadPool);
+    private IWorkflowExecutionRunnable bootstrapCommand(Command command) {
+        return 
workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command);
     }
 
     private CompletableFuture<Void> 
bootstrapWorkflowExecutionRunnable(IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
         final WorkflowInstance workflowInstance =
                 
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance();
+
         if (workflowInstance.getState() == 
WorkflowExecutionStatus.SERIAL_WAIT) {
             log.info("The workflow {} state is: {} will not be trigger now",
                     workflowInstance.getName(),
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java
index 0998cde8b6..94df43ede4 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.rpc;
 
 import org.apache.dolphinscheduler.extract.master.ITaskExecutorEventListener;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
@@ -53,79 +54,118 @@ public class TaskExecutorEventListenerImpl implements 
ITaskExecutorEventListener
 
     @Override
     public void onTaskExecutorDispatched(final 
TaskExecutorDispatchedLifecycleEvent taskExecutorDispatchedLifecycleEvent) {
-        final ITaskExecutionRunnable taskExecutionRunnable =
-                getTaskExecutionRunnable(taskExecutorDispatchedLifecycleEvent);
-        final TaskDispatchedLifecycleEvent taskDispatchedLifecycleEvent = 
TaskDispatchedLifecycleEvent.builder()
-                .taskExecutionRunnable(taskExecutionRunnable)
-                
.executorHost(taskExecutorDispatchedLifecycleEvent.getTaskInstanceHost())
-                .build();
-
-        
taskExecutionRunnable.getWorkflowEventBus().publish(taskDispatchedLifecycleEvent);
+        
LogUtils.setWorkflowInstanceIdMDC(taskExecutorDispatchedLifecycleEvent.getWorkflowInstanceId());
+        try {
+            final ITaskExecutionRunnable taskExecutionRunnable =
+                    
getTaskExecutionRunnable(taskExecutorDispatchedLifecycleEvent);
+            final TaskDispatchedLifecycleEvent taskDispatchedLifecycleEvent = 
TaskDispatchedLifecycleEvent.builder()
+                    .taskExecutionRunnable(taskExecutionRunnable)
+                    
.executorHost(taskExecutorDispatchedLifecycleEvent.getTaskInstanceHost())
+                    .build();
+
+            
taskExecutionRunnable.getWorkflowEventBus().publish(taskDispatchedLifecycleEvent);
+        } finally {
+            LogUtils.removeWorkflowInstanceIdMDC();
+        }
     }
 
     @Override
     public void onTaskExecutorRunning(final TaskExecutorStartedLifecycleEvent 
taskExecutorStartedLifecycleEvent) {
-        final ITaskExecutionRunnable taskExecutionRunnable =
-                getTaskExecutionRunnable(taskExecutorStartedLifecycleEvent);
-        final TaskRunningLifecycleEvent taskRunningEvent = 
TaskRunningLifecycleEvent.builder()
-                .taskExecutionRunnable(taskExecutionRunnable)
-                .startTime(new 
Date(taskExecutorStartedLifecycleEvent.getStartTime()))
-                .logPath(taskExecutorStartedLifecycleEvent.getLogPath())
-                .build();
-
-        taskExecutionRunnable.getWorkflowEventBus().publish(taskRunningEvent);
+        
LogUtils.setWorkflowInstanceIdMDC(taskExecutorStartedLifecycleEvent.getWorkflowInstanceId());
+        try {
+            final ITaskExecutionRunnable taskExecutionRunnable =
+                    
getTaskExecutionRunnable(taskExecutorStartedLifecycleEvent);
+            final TaskRunningLifecycleEvent taskRunningEvent = 
TaskRunningLifecycleEvent.builder()
+                    .taskExecutionRunnable(taskExecutionRunnable)
+                    .startTime(new 
Date(taskExecutorStartedLifecycleEvent.getStartTime()))
+                    .logPath(taskExecutorStartedLifecycleEvent.getLogPath())
+                    .build();
+
+            
taskExecutionRunnable.getWorkflowEventBus().publish(taskRunningEvent);
+        } finally {
+            LogUtils.removeWorkflowInstanceIdMDC();
+        }
     }
 
     @Override
     public void onTaskExecutorRuntimeContextChanged(final 
TaskExecutorRuntimeContextChangedLifecycleEvent 
taskExecutorRuntimeContextChangedLifecycleEventr) {
-        final ITaskExecutionRunnable taskExecutionRunnable =
-                
getTaskExecutionRunnable(taskExecutorRuntimeContextChangedLifecycleEventr);
-
-        final TaskRuntimeContextChangedEvent taskRuntimeContextChangedEvent = 
TaskRuntimeContextChangedEvent.builder()
-                .taskExecutionRunnable(taskExecutionRunnable)
-                
.runtimeContext(taskExecutorRuntimeContextChangedLifecycleEventr.getAppIds())
-                .build();
-
-        
taskExecutionRunnable.getWorkflowEventBus().publish(taskRuntimeContextChangedEvent);
+        
LogUtils.setWorkflowInstanceIdMDC(taskExecutorRuntimeContextChangedLifecycleEventr.getWorkflowInstanceId());
+        try {
+            final ITaskExecutionRunnable taskExecutionRunnable =
+                    
getTaskExecutionRunnable(taskExecutorRuntimeContextChangedLifecycleEventr);
+
+            final TaskRuntimeContextChangedEvent 
taskRuntimeContextChangedEvent =
+                    TaskRuntimeContextChangedEvent.builder()
+                            .taskExecutionRunnable(taskExecutionRunnable)
+                            
.runtimeContext(taskExecutorRuntimeContextChangedLifecycleEventr.getAppIds())
+                            .build();
+
+            
taskExecutionRunnable.getWorkflowEventBus().publish(taskRuntimeContextChangedEvent);
+        } finally {
+            LogUtils.removeWorkflowInstanceIdMDC();
+        }
     }
 
     @Override
     public void onTaskExecutorSuccess(final TaskExecutorSuccessLifecycleEvent 
taskExecutorSuccessLifecycleEvent) {
-        final ITaskExecutionRunnable taskExecutionRunnable =
-                getTaskExecutionRunnable(taskExecutorSuccessLifecycleEvent);
-        final TaskSuccessLifecycleEvent taskSuccessEvent = 
TaskSuccessLifecycleEvent.builder()
-                .taskExecutionRunnable(taskExecutionRunnable)
-                .endTime(new 
Date(taskExecutorSuccessLifecycleEvent.getEndTime()))
-                .varPool(taskExecutorSuccessLifecycleEvent.getVarPool())
-                .build();
-        taskExecutionRunnable.getWorkflowEventBus().publish(taskSuccessEvent);
+        
LogUtils.setWorkflowInstanceIdMDC(taskExecutorSuccessLifecycleEvent.getWorkflowInstanceId());
+        try {
+            final ITaskExecutionRunnable taskExecutionRunnable =
+                    
getTaskExecutionRunnable(taskExecutorSuccessLifecycleEvent);
+            final TaskSuccessLifecycleEvent taskSuccessEvent = 
TaskSuccessLifecycleEvent.builder()
+                    .taskExecutionRunnable(taskExecutionRunnable)
+                    .endTime(new 
Date(taskExecutorSuccessLifecycleEvent.getEndTime()))
+                    .varPool(taskExecutorSuccessLifecycleEvent.getVarPool())
+                    .build();
+            
taskExecutionRunnable.getWorkflowEventBus().publish(taskSuccessEvent);
+        } finally {
+            LogUtils.removeWorkflowInstanceIdMDC();
+        }
     }
 
     @Override
     public void onTaskExecutorFailed(final TaskExecutorFailedLifecycleEvent 
taskExecutorFailedLifecycleEvent) {
-        final ITaskExecutionRunnable taskExecutionRunnable = 
getTaskExecutionRunnable(taskExecutorFailedLifecycleEvent);
-        final TaskFailedLifecycleEvent taskFailedEvent = 
TaskFailedLifecycleEvent.builder()
-                .taskExecutionRunnable(taskExecutionRunnable)
-                .endTime(new 
Date(taskExecutorFailedLifecycleEvent.getEndTime()))
-                .build();
-        taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
+        
LogUtils.setWorkflowInstanceIdMDC(taskExecutorFailedLifecycleEvent.getWorkflowInstanceId());
+        try {
+            final ITaskExecutionRunnable taskExecutionRunnable =
+                    getTaskExecutionRunnable(taskExecutorFailedLifecycleEvent);
+            final TaskFailedLifecycleEvent taskFailedEvent = 
TaskFailedLifecycleEvent.builder()
+                    .taskExecutionRunnable(taskExecutionRunnable)
+                    .endTime(new 
Date(taskExecutorFailedLifecycleEvent.getEndTime()))
+                    .build();
+            
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
+        } finally {
+            LogUtils.removeWorkflowInstanceIdMDC();
+        }
     }
 
     @Override
     public void onTaskExecutorKilled(final TaskExecutorKilledLifecycleEvent 
taskExecutorKilledLifecycleEvent) {
-        final ITaskExecutionRunnable taskExecutionRunnable = 
getTaskExecutionRunnable(taskExecutorKilledLifecycleEvent);
-        final TaskKilledLifecycleEvent taskKilledEvent = 
TaskKilledLifecycleEvent.builder()
-                .taskExecutionRunnable(taskExecutionRunnable)
-                .endTime(new 
Date(taskExecutorKilledLifecycleEvent.getEndTime()))
-                .build();
-        taskExecutionRunnable.getWorkflowEventBus().publish(taskKilledEvent);
+        
LogUtils.setWorkflowInstanceIdMDC(taskExecutorKilledLifecycleEvent.getWorkflowInstanceId());
+        try {
+            final ITaskExecutionRunnable taskExecutionRunnable =
+                    getTaskExecutionRunnable(taskExecutorKilledLifecycleEvent);
+            final TaskKilledLifecycleEvent taskKilledEvent = 
TaskKilledLifecycleEvent.builder()
+                    .taskExecutionRunnable(taskExecutionRunnable)
+                    .endTime(new 
Date(taskExecutorKilledLifecycleEvent.getEndTime()))
+                    .build();
+            
taskExecutionRunnable.getWorkflowEventBus().publish(taskKilledEvent);
+        } finally {
+            LogUtils.removeWorkflowInstanceIdMDC();
+        }
     }
 
     @Override
     public void onTaskExecutorPaused(final TaskExecutorPausedLifecycleEvent 
taskExecutorPausedLifecycleEvent) {
-        final ITaskExecutionRunnable taskExecutionRunnable = 
getTaskExecutionRunnable(taskExecutorPausedLifecycleEvent);
-        final TaskPausedLifecycleEvent taskPausedEvent = 
TaskPausedLifecycleEvent.of(taskExecutionRunnable);
-        taskExecutionRunnable.getWorkflowEventBus().publish(taskPausedEvent);
+        
LogUtils.setWorkflowInstanceIdMDC(taskExecutorPausedLifecycleEvent.getWorkflowInstanceId());
+        try {
+            final ITaskExecutionRunnable taskExecutionRunnable =
+                    getTaskExecutionRunnable(taskExecutorPausedLifecycleEvent);
+            final TaskPausedLifecycleEvent taskPausedEvent = 
TaskPausedLifecycleEvent.of(taskExecutionRunnable);
+            
taskExecutionRunnable.getWorkflowEventBus().publish(taskPausedEvent);
+        } finally {
+            LogUtils.removeWorkflowInstanceIdMDC();
+        }
     }
 
     private ITaskExecutionRunnable getTaskExecutionRunnable(final 
IReportableTaskExecutorLifecycleEvent reportableTaskExecutorLifecycleEvent) {

Reply via email to