This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f8e82c2a6a [Chore][Master] Supplemental workflow instance trace id in
log (#17605)
f8e82c2a6a is described below
commit f8e82c2a6af5feda3643c6c1a897eced3562c91b
Author: [email protected] <[email protected]>
AuthorDate: Thu Oct 30 14:07:11 2025 +0800
[Chore][Master] Supplemental workflow instance trace id in log (#17605)
---
.../master/engine/command/CommandEngine.java | 16 ++-
.../master/rpc/TaskExecutorEventListenerImpl.java | 138 +++++++++++++--------
2 files changed, 100 insertions(+), 54 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
index be681eb1b1..5e84a24a76 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
@@ -129,10 +130,15 @@ public class CommandEngine extends BaseDaemonThread
implements AutoCloseable {
List<CompletableFuture<Void>> allCompleteFutures = new
ArrayList<>();
for (Command command : commands) {
- CompletableFuture<Void> completableFuture =
bootstrapCommand(command)
+ CompletableFuture<Void> completableFuture = supplyAsync(()
-> {
+
LogUtils.setWorkflowInstanceIdMDC(command.getWorkflowInstanceId());
+ return command;
+ }, commandHandleThreadPool)
+ .thenApply(this::bootstrapCommand)
.thenAccept(this::bootstrapWorkflowExecutionRunnable)
.thenAccept((unused) -> bootstrapSuccess(command))
- .exceptionally(throwable ->
bootstrapError(command, throwable));
+ .exceptionally(throwable ->
bootstrapError(command, throwable))
+ .whenComplete((result, throwable) ->
LogUtils.removeWorkflowInstanceIdMDC());
allCompleteFutures.add(completableFuture);
}
CompletableFuture.allOf(allCompleteFutures.toArray(new
CompletableFuture[0])).join();
@@ -148,14 +154,14 @@ public class CommandEngine extends BaseDaemonThread
implements AutoCloseable {
}
}
- private CompletableFuture<IWorkflowExecutionRunnable>
bootstrapCommand(Command command) {
- return supplyAsync(
- () ->
workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command),
commandHandleThreadPool);
+ private IWorkflowExecutionRunnable bootstrapCommand(Command command) {
+ return
workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command);
}
private CompletableFuture<Void>
bootstrapWorkflowExecutionRunnable(IWorkflowExecutionRunnable
workflowExecutionRunnable) {
final WorkflowInstance workflowInstance =
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance();
+
if (workflowInstance.getState() ==
WorkflowExecutionStatus.SERIAL_WAIT) {
log.info("The workflow {} state is: {} will not be trigger now",
workflowInstance.getName(),
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java
index 0998cde8b6..94df43ede4 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.extract.master.ITaskExecutorEventListener;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
@@ -53,79 +54,118 @@ public class TaskExecutorEventListenerImpl implements
ITaskExecutorEventListener
@Override
public void onTaskExecutorDispatched(final
TaskExecutorDispatchedLifecycleEvent taskExecutorDispatchedLifecycleEvent) {
- final ITaskExecutionRunnable taskExecutionRunnable =
- getTaskExecutionRunnable(taskExecutorDispatchedLifecycleEvent);
- final TaskDispatchedLifecycleEvent taskDispatchedLifecycleEvent =
TaskDispatchedLifecycleEvent.builder()
- .taskExecutionRunnable(taskExecutionRunnable)
-
.executorHost(taskExecutorDispatchedLifecycleEvent.getTaskInstanceHost())
- .build();
-
-
taskExecutionRunnable.getWorkflowEventBus().publish(taskDispatchedLifecycleEvent);
+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorDispatchedLifecycleEvent.getWorkflowInstanceId());
+ try {
+ final ITaskExecutionRunnable taskExecutionRunnable =
+
getTaskExecutionRunnable(taskExecutorDispatchedLifecycleEvent);
+ final TaskDispatchedLifecycleEvent taskDispatchedLifecycleEvent =
TaskDispatchedLifecycleEvent.builder()
+ .taskExecutionRunnable(taskExecutionRunnable)
+
.executorHost(taskExecutorDispatchedLifecycleEvent.getTaskInstanceHost())
+ .build();
+
+
taskExecutionRunnable.getWorkflowEventBus().publish(taskDispatchedLifecycleEvent);
+ } finally {
+ LogUtils.removeWorkflowInstanceIdMDC();
+ }
}
@Override
public void onTaskExecutorRunning(final TaskExecutorStartedLifecycleEvent
taskExecutorStartedLifecycleEvent) {
- final ITaskExecutionRunnable taskExecutionRunnable =
- getTaskExecutionRunnable(taskExecutorStartedLifecycleEvent);
- final TaskRunningLifecycleEvent taskRunningEvent =
TaskRunningLifecycleEvent.builder()
- .taskExecutionRunnable(taskExecutionRunnable)
- .startTime(new
Date(taskExecutorStartedLifecycleEvent.getStartTime()))
- .logPath(taskExecutorStartedLifecycleEvent.getLogPath())
- .build();
-
- taskExecutionRunnable.getWorkflowEventBus().publish(taskRunningEvent);
+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorStartedLifecycleEvent.getWorkflowInstanceId());
+ try {
+ final ITaskExecutionRunnable taskExecutionRunnable =
+
getTaskExecutionRunnable(taskExecutorStartedLifecycleEvent);
+ final TaskRunningLifecycleEvent taskRunningEvent =
TaskRunningLifecycleEvent.builder()
+ .taskExecutionRunnable(taskExecutionRunnable)
+ .startTime(new
Date(taskExecutorStartedLifecycleEvent.getStartTime()))
+ .logPath(taskExecutorStartedLifecycleEvent.getLogPath())
+ .build();
+
+
taskExecutionRunnable.getWorkflowEventBus().publish(taskRunningEvent);
+ } finally {
+ LogUtils.removeWorkflowInstanceIdMDC();
+ }
}
@Override
public void onTaskExecutorRuntimeContextChanged(final
TaskExecutorRuntimeContextChangedLifecycleEvent
taskExecutorRuntimeContextChangedLifecycleEventr) {
- final ITaskExecutionRunnable taskExecutionRunnable =
-
getTaskExecutionRunnable(taskExecutorRuntimeContextChangedLifecycleEventr);
-
- final TaskRuntimeContextChangedEvent taskRuntimeContextChangedEvent =
TaskRuntimeContextChangedEvent.builder()
- .taskExecutionRunnable(taskExecutionRunnable)
-
.runtimeContext(taskExecutorRuntimeContextChangedLifecycleEventr.getAppIds())
- .build();
-
-
taskExecutionRunnable.getWorkflowEventBus().publish(taskRuntimeContextChangedEvent);
+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorRuntimeContextChangedLifecycleEventr.getWorkflowInstanceId());
+ try {
+ final ITaskExecutionRunnable taskExecutionRunnable =
+
getTaskExecutionRunnable(taskExecutorRuntimeContextChangedLifecycleEventr);
+
+ final TaskRuntimeContextChangedEvent
taskRuntimeContextChangedEvent =
+ TaskRuntimeContextChangedEvent.builder()
+ .taskExecutionRunnable(taskExecutionRunnable)
+
.runtimeContext(taskExecutorRuntimeContextChangedLifecycleEventr.getAppIds())
+ .build();
+
+
taskExecutionRunnable.getWorkflowEventBus().publish(taskRuntimeContextChangedEvent);
+ } finally {
+ LogUtils.removeWorkflowInstanceIdMDC();
+ }
}
@Override
public void onTaskExecutorSuccess(final TaskExecutorSuccessLifecycleEvent
taskExecutorSuccessLifecycleEvent) {
- final ITaskExecutionRunnable taskExecutionRunnable =
- getTaskExecutionRunnable(taskExecutorSuccessLifecycleEvent);
- final TaskSuccessLifecycleEvent taskSuccessEvent =
TaskSuccessLifecycleEvent.builder()
- .taskExecutionRunnable(taskExecutionRunnable)
- .endTime(new
Date(taskExecutorSuccessLifecycleEvent.getEndTime()))
- .varPool(taskExecutorSuccessLifecycleEvent.getVarPool())
- .build();
- taskExecutionRunnable.getWorkflowEventBus().publish(taskSuccessEvent);
+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorSuccessLifecycleEvent.getWorkflowInstanceId());
+ try {
+ final ITaskExecutionRunnable taskExecutionRunnable =
+
getTaskExecutionRunnable(taskExecutorSuccessLifecycleEvent);
+ final TaskSuccessLifecycleEvent taskSuccessEvent =
TaskSuccessLifecycleEvent.builder()
+ .taskExecutionRunnable(taskExecutionRunnable)
+ .endTime(new
Date(taskExecutorSuccessLifecycleEvent.getEndTime()))
+ .varPool(taskExecutorSuccessLifecycleEvent.getVarPool())
+ .build();
+
taskExecutionRunnable.getWorkflowEventBus().publish(taskSuccessEvent);
+ } finally {
+ LogUtils.removeWorkflowInstanceIdMDC();
+ }
}
@Override
public void onTaskExecutorFailed(final TaskExecutorFailedLifecycleEvent
taskExecutorFailedLifecycleEvent) {
- final ITaskExecutionRunnable taskExecutionRunnable =
getTaskExecutionRunnable(taskExecutorFailedLifecycleEvent);
- final TaskFailedLifecycleEvent taskFailedEvent =
TaskFailedLifecycleEvent.builder()
- .taskExecutionRunnable(taskExecutionRunnable)
- .endTime(new
Date(taskExecutorFailedLifecycleEvent.getEndTime()))
- .build();
- taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorFailedLifecycleEvent.getWorkflowInstanceId());
+ try {
+ final ITaskExecutionRunnable taskExecutionRunnable =
+ getTaskExecutionRunnable(taskExecutorFailedLifecycleEvent);
+ final TaskFailedLifecycleEvent taskFailedEvent =
TaskFailedLifecycleEvent.builder()
+ .taskExecutionRunnable(taskExecutionRunnable)
+ .endTime(new
Date(taskExecutorFailedLifecycleEvent.getEndTime()))
+ .build();
+
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
+ } finally {
+ LogUtils.removeWorkflowInstanceIdMDC();
+ }
}
@Override
public void onTaskExecutorKilled(final TaskExecutorKilledLifecycleEvent
taskExecutorKilledLifecycleEvent) {
- final ITaskExecutionRunnable taskExecutionRunnable =
getTaskExecutionRunnable(taskExecutorKilledLifecycleEvent);
- final TaskKilledLifecycleEvent taskKilledEvent =
TaskKilledLifecycleEvent.builder()
- .taskExecutionRunnable(taskExecutionRunnable)
- .endTime(new
Date(taskExecutorKilledLifecycleEvent.getEndTime()))
- .build();
- taskExecutionRunnable.getWorkflowEventBus().publish(taskKilledEvent);
+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorKilledLifecycleEvent.getWorkflowInstanceId());
+ try {
+ final ITaskExecutionRunnable taskExecutionRunnable =
+ getTaskExecutionRunnable(taskExecutorKilledLifecycleEvent);
+ final TaskKilledLifecycleEvent taskKilledEvent =
TaskKilledLifecycleEvent.builder()
+ .taskExecutionRunnable(taskExecutionRunnable)
+ .endTime(new
Date(taskExecutorKilledLifecycleEvent.getEndTime()))
+ .build();
+
taskExecutionRunnable.getWorkflowEventBus().publish(taskKilledEvent);
+ } finally {
+ LogUtils.removeWorkflowInstanceIdMDC();
+ }
}
@Override
public void onTaskExecutorPaused(final TaskExecutorPausedLifecycleEvent
taskExecutorPausedLifecycleEvent) {
- final ITaskExecutionRunnable taskExecutionRunnable =
getTaskExecutionRunnable(taskExecutorPausedLifecycleEvent);
- final TaskPausedLifecycleEvent taskPausedEvent =
TaskPausedLifecycleEvent.of(taskExecutionRunnable);
- taskExecutionRunnable.getWorkflowEventBus().publish(taskPausedEvent);
+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorPausedLifecycleEvent.getWorkflowInstanceId());
+ try {
+ final ITaskExecutionRunnable taskExecutionRunnable =
+ getTaskExecutionRunnable(taskExecutorPausedLifecycleEvent);
+ final TaskPausedLifecycleEvent taskPausedEvent =
TaskPausedLifecycleEvent.of(taskExecutionRunnable);
+
taskExecutionRunnable.getWorkflowEventBus().publish(taskPausedEvent);
+ } finally {
+ LogUtils.removeWorkflowInstanceIdMDC();
+ }
}
private ITaskExecutionRunnable getTaskExecutionRunnable(final
IReportableTaskExecutorLifecycleEvent reportableTaskExecutorLifecycleEvent) {