This is an automated email from the ASF dual-hosted git repository.

changhaifu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new c7a11ce6ed [Improvement][K8S]Optimize MDC for K8S tasks (#15390)
c7a11ce6ed is described below

commit c7a11ce6ed8b606671b4f1fa06b522a72aed50d4
Author: Gallardot <[email protected]>
AuthorDate: Wed Jan 17 17:56:11 2024 +0800

    [Improvement][K8S]Optimize MDC for K8S tasks (#15390)
    
    Signed-off-by: Gallardot <[email protected]>
    Co-authored-by: fuchanghai <[email protected]>
---
 .../plugin/task/api/k8s/impl/K8sTaskExecutor.java              | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
index 167cae8668..986c9dc8a7 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -205,6 +205,8 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
             @Override
             public void eventReceived(Action action, Job job) {
                 try {
+                    
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
+                            taskRequest.getTaskInstanceId());
                     
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
                     log.info("event received : job:{} action:{}", 
job.getMetadata().getName(), action);
                     if (action == Action.DELETED) {
@@ -222,14 +224,18 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
                     }
                 } finally {
                     LogUtils.removeTaskInstanceLogFullPathMDC();
+                    LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                 }
             }
 
             @Override
             public void onClose(WatcherException e) {
+                
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
+                        taskRequest.getTaskInstanceId());
                 log.error("[K8sJobExecutor-{}] fail in k8s: {}", 
job.getMetadata().getName(), e.getMessage());
                 taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
                 countDownLatch.countDown();
+                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
         };
         try (Watch watch = 
k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher)) {
@@ -260,10 +266,12 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
         String containerName = String.format("%s-%s", taskName, 
taskInstanceId);
         podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
             TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
+            
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
+                    taskRequest.getTaskInstanceId());
+            LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
             try (
                     LogWatch watcher = 
ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(),
                             taskRequest.getTaskAppId(), containerName)) {
-                
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
                 String line;
                 try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(watcher.getOutput()))) {
                     while ((line = reader.readLine()) != null) {

Reply via email to