ruanwenjun commented on code in PR #17605:
URL:
https://github.com/apache/dolphinscheduler/pull/17605#discussion_r2465019872
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java:
##########
@@ -149,43 +150,67 @@ public void run() {
}
private CompletableFuture<IWorkflowExecutionRunnable>
bootstrapCommand(Command command) {
- return supplyAsync(
- () ->
workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command),
commandHandleThreadPool);
+ return supplyAsync(() -> {
+ LogUtils.setWorkflowInstanceIdMDC(command.getWorkflowInstanceId());
+ try {
+ IWorkflowExecutionRunnable result =
+
workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command);
+ return result;
+ } finally {
+ LogUtils.removeWorkflowInstanceIdMDC();
+ }
+ }, commandHandleThreadPool);
}
private CompletableFuture<Void>
bootstrapWorkflowExecutionRunnable(IWorkflowExecutionRunnable
workflowExecutionRunnable) {
final WorkflowInstance workflowInstance =
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance();
- if (workflowInstance.getState() ==
WorkflowExecutionStatus.SERIAL_WAIT) {
- log.info("The workflow {} state is: {} will not be trigger now",
- workflowInstance.getName(),
- workflowInstance.getState());
+
+ LogUtils.setWorkflowInstanceIdMDC(workflowInstance.getId());
Review Comment:
You have set MDC in `bootstrapCommand`, is it needed to set here?
##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskEngineDelegator.java:
##########
@@ -74,6 +74,18 @@ public void ackPhysicalTaskExecutorLifecycleEventACK(final
ITaskExecutorLifecycl
physicalTaskExecutorEventReporter.receiveTaskExecutorLifecycleEventACK(taskExecutorLifecycleEventAck);
}
+ public int getWorkflowInstanceId(final int taskInstanceId) {
+ final Optional<ITaskExecutor> taskExecutorOptional =
physicalTaskExecutorRepository.get(taskInstanceId);
+ if (!taskExecutorOptional.isPresent()) {
+ return 0;
+ }
+ final TaskExecutionContext taskExecutionContext =
taskExecutorOptional.get().getTaskExecutionContext();
+ if (taskExecutionContext == null) {
+ return 0;
+ }
+ return taskExecutionContext.getWorkflowInstanceId();
+ }
Review Comment:
Why add `getWorkflowInstanceId` method here, the ExecutorDelegator don't
care this.
##########
dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java:
##########
@@ -25,26 +25,40 @@ public class TaskExecutorMDCUtils {
private static final String TASK_INSTANCE_ID_MDC_KEY = "taskInstanceId";
private static final String TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY =
"taskInstanceLogFullPath";
+ private static final String WORKFLOW_INSTANCE_ID_MDC_KEY =
"workflowInstanceId";
public static MDCAutoClosable logWithMDC(final ITaskExecutor taskExecutor)
{
- return logWithMDC(taskExecutor.getId(),
taskExecutor.getTaskExecutionContext().getLogPath());
+ return logWithMDC(taskExecutor.getId(),
+ taskExecutor.getTaskExecutionContext().getLogPath(),
+ taskExecutor.getTaskExecutionContext() == null ? 0
+ :
taskExecutor.getTaskExecutionContext().getWorkflowInstanceId());
}
public static MDCAutoClosable logWithMDC(final int taskInstanceId) {
- return logWithMDC(taskInstanceId, null);
+ return logWithMDC(taskInstanceId, null, 0);
}
- public static MDCAutoClosable logWithMDC(final int taskInstanceId, final
String logPath) {
+ public static MDCAutoClosable logWithMDC(final int taskInstanceId, final
int workflowInstanceId) {
+ return logWithMDC(taskInstanceId, null, workflowInstanceId);
+ }
+
+ public static MDCAutoClosable logWithMDC(final int taskInstanceId, final
String logPath,
+ final int workflowInstanceId) {
if (logPath != null) {
MDC.put(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY, logPath);
}
+ if (workflowInstanceId > 0) {
+ MDC.put(WORKFLOW_INSTANCE_ID_MDC_KEY,
String.valueOf(workflowInstanceId));
+ }
Review Comment:
If workflowInstanceId might not exist, please use Integer here
##########
dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java:
##########
@@ -25,26 +25,40 @@ public class TaskExecutorMDCUtils {
private static final String TASK_INSTANCE_ID_MDC_KEY = "taskInstanceId";
private static final String TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY =
"taskInstanceLogFullPath";
+ private static final String WORKFLOW_INSTANCE_ID_MDC_KEY =
"workflowInstanceId";
public static MDCAutoClosable logWithMDC(final ITaskExecutor taskExecutor)
{
- return logWithMDC(taskExecutor.getId(),
taskExecutor.getTaskExecutionContext().getLogPath());
+ return logWithMDC(taskExecutor.getId(),
+ taskExecutor.getTaskExecutionContext().getLogPath(),
+ taskExecutor.getTaskExecutionContext() == null ? 0
+ :
taskExecutor.getTaskExecutionContext().getWorkflowInstanceId());
}
public static MDCAutoClosable logWithMDC(final int taskInstanceId) {
- return logWithMDC(taskInstanceId, null);
+ return logWithMDC(taskInstanceId, null, 0);
}
- public static MDCAutoClosable logWithMDC(final int taskInstanceId, final
String logPath) {
+ public static MDCAutoClosable logWithMDC(final int taskInstanceId, final
int workflowInstanceId) {
+ return logWithMDC(taskInstanceId, null, workflowInstanceId);
+ }
+
+ public static MDCAutoClosable logWithMDC(final int taskInstanceId, final
String logPath,
+ final int workflowInstanceId) {
if (logPath != null) {
MDC.put(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY, logPath);
}
+ if (workflowInstanceId > 0) {
+ MDC.put(WORKFLOW_INSTANCE_ID_MDC_KEY,
String.valueOf(workflowInstanceId));
+ }
Review Comment:
```suggestion
MDC.put(WORKFLOW_INSTANCE_ID_MDC_KEY,
String.valueOf(workflowInstanceId));
```
##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/PhysicalTaskExecutorOperatorImpl.java:
##########
@@ -46,10 +47,20 @@ public class PhysicalTaskExecutorOperatorImpl implements
IPhysicalTaskExecutorOp
@Override
public TaskExecutorDispatchResponse dispatchTask(final
TaskExecutorDispatchRequest taskExecutorDispatchRequest) {
- log.info("Receive TaskExecutorDispatchResponse: {}",
taskExecutorDispatchRequest);
final TaskExecutionContext taskExecutionContext =
taskExecutorDispatchRequest.getTaskExecutionContext();
- try {
+ final int taskInstanceId = taskExecutionContext.getTaskInstanceId();
+ final int workflowInstanceId =
taskExecutionContext.getWorkflowInstanceId();
+
+ try (
+ TaskExecutorMDCUtils.MDCAutoClosable _ignore1 =
+ TaskExecutorMDCUtils.logWithMDC(taskInstanceId,
workflowInstanceId)) {
+
+ log.info("Receive TaskExecutorDispatchResponse: {}",
taskExecutorDispatchRequest);
physicalTaskEngineDelegator.dispatchLogicTask(taskExecutionContext);
+
+ // Reset MDC, because dispatchLogicTask will clear MDC internally
+ TaskExecutorMDCUtils.logWithMDC(taskInstanceId,
workflowInstanceId);
Review Comment:
If so, you should remove MDC in `physicalTaskEngineDelegator`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]