This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 04800a48d1 [Feature-13763][K8S Task] collect real-time log (#14379)
04800a48d1 is described below

commit 04800a48d19700809ce9b97d2efaf1be34d4e474
Author: Aaron Wang <[email protected]>
AuthorDate: Thu Jul 20 18:38:25 2023 +0800

    [Feature-13763][K8S Task] collect real-time log (#14379)
    
    * [Improvement-13763][K8S Task] collect real-time log
    
    * fix codesmell
    
    * get pod watcher until pod is ready
    
    * fix codesmell
    
    * specify container name & loop waiting pod creation
    
    * sleep when pod is not initialized
    
    ---------
    
    Co-authored-by: Jay Chung <[email protected]>
---
 .../plugin/task/api/AbstractCommandExecutor.java   |   4 +-
 .../task/api/am/KubernetesApplicationManager.java  |  57 +++++++---
 .../am/KubernetesApplicationManagerContext.java    |   7 +-
 .../plugin/task/api/k8s/AbstractK8sTask.java       |   2 +-
 .../task/api/k8s/AbstractK8sTaskExecutor.java      |  12 ---
 .../plugin/task/api/k8s/impl/K8sTaskExecutor.java  | 116 ++++++++++++++-------
 .../plugin/task/api/utils/ProcessUtils.java        |  22 ++--
 .../plugin/task/api/k8s/K8sTaskExecutorTest.java   |   9 +-
 8 files changed, 149 insertions(+), 80 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 4c8edc6dac..2501e1e824 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -201,7 +201,7 @@ public abstract class AbstractCommandExecutor {
                 // Wait the task log process finished.
                 taskOutputFuture.get();
             } catch (ExecutionException e) {
-                logger.info("Handle task log error", e);
+                logger.error("Handle task log error", e);
             }
         }
 
@@ -272,7 +272,7 @@ public abstract class AbstractCommandExecutor {
             ThreadUtils.sleep(SLEEP_TIME_MILLIS * 5L);
             try (
                     LogWatch watcher = 
ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(),
-                            taskRequest.getTaskAppId())) {
+                            taskRequest.getTaskAppId(), "")) {
                 if (watcher == null) {
                     throw new RuntimeException("The driver pod does not 
exist.");
                 } else {
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
index 92aa6cb447..ac1ce69f76 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
@@ -17,9 +17,11 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.am;
 
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.SLEEP_TIME_MILLIS;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;
 
 import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -54,6 +56,8 @@ public class KubernetesApplicationManager implements 
ApplicationManager {
     private static final String FAILED = "Failed";
     private static final String UNKNOWN = "Unknown";
 
+    private static final int MAX_RETRY_TIMES = 10;
+
     /**
      * cache k8s client for same task
      */
@@ -67,7 +71,7 @@ public class KubernetesApplicationManager implements 
ApplicationManager {
         boolean isKill;
         String labelValue = 
kubernetesApplicationManagerContext.getLabelValue();
         FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
-                getDriverPod(kubernetesApplicationManagerContext);
+                getListenPod(kubernetesApplicationManagerContext);
         try {
             if (getApplicationStatus(kubernetesApplicationManagerContext, 
watchList).isFailure()) {
                 log.error("Driver pod is in FAILED or UNKNOWN status.");
@@ -97,16 +101,24 @@ public class KubernetesApplicationManager implements 
ApplicationManager {
      * @param kubernetesApplicationManagerContext
      * @return
      */
-    private FilterWatchListDeletable<Pod, PodList, PodResource> 
getDriverPod(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
+    private FilterWatchListDeletable<Pod, PodList, PodResource> 
getListenPod(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
         KubernetesClient client = 
getClient(kubernetesApplicationManagerContext);
         String labelValue = 
kubernetesApplicationManagerContext.getLabelValue();
-        FilterWatchListDeletable<Pod, PodList, PodResource> watchList = 
client.pods()
-                
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
-                .withLabel(UNIQUE_LABEL_NAME, labelValue);
-        List<Pod> podList = watchList.list().getItems();
-        if (podList.size() != 1) {
-            log.warn("Expected driver pod 1, but get {}.", podList.size());
+        List<Pod> podList = null;
+        FilterWatchListDeletable<Pod, PodList, PodResource> watchList = null;
+        int retryTimes = 0;
+        while (CollectionUtils.isEmpty(podList) && retryTimes < 
MAX_RETRY_TIMES) {
+            watchList = client.pods()
+                    
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
+                    .withLabel(UNIQUE_LABEL_NAME, labelValue);
+            podList = watchList.list().getItems();
+            if (!CollectionUtils.isEmpty(podList)) {
+                break;
+            }
+            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+            retryTimes += 1;
         }
+
         return watchList;
     }
 
@@ -153,7 +165,7 @@ public class KubernetesApplicationManager implements 
ApplicationManager {
         String phase;
         try {
             if (Objects.isNull(watchList)) {
-                watchList = getDriverPod(kubernetesApplicationManagerContext);
+                watchList = getListenPod(kubernetesApplicationManagerContext);
             }
             List<Pod> driverPod = watchList.list().getItems();
             if (!driverPod.isEmpty()) {
@@ -180,16 +192,27 @@ public class KubernetesApplicationManager implements 
ApplicationManager {
      */
     public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
         KubernetesClient client = 
getClient(kubernetesApplicationManagerContext);
-        FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
-                getDriverPod(kubernetesApplicationManagerContext);
-        List<Pod> driverPod = watchList.list().getItems();
-        if (CollectionUtils.isEmpty(driverPod)) {
-            return null;
+        boolean podIsReady = false;
+        Pod pod = null;
+        while (!podIsReady) {
+            FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
+                    getListenPod(kubernetesApplicationManagerContext);
+            List<Pod> podList = watchList == null ? null : 
watchList.list().getItems();
+            if (CollectionUtils.isEmpty(podList)) {
+                return null;
+            }
+            pod = podList.get(0);
+            String phase = pod.getStatus().getPhase();
+            if (phase.equals(PENDING) || phase.equals(UNKNOWN)) {
+                ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+            } else {
+                podIsReady = true;
+            }
         }
-        Pod driver = driverPod.get(0);
 
-        return client.pods().inNamespace(driver.getMetadata().getNamespace())
-                .withName(driver.getMetadata().getName())
+        return client.pods().inNamespace(pod.getMetadata().getNamespace())
+                .withName(pod.getMetadata().getName())
+                
.inContainer(kubernetesApplicationManagerContext.getContainerName())
                 .watchLog();
     }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
index 19dd223477..2a37d16d35 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
@@ -32,8 +32,13 @@ public class KubernetesApplicationManagerContext implements 
ApplicationManagerCo
     private final K8sTaskExecutionContext k8sTaskExecutionContext;
 
     /**
-     * driver pod label value
+     * pod label value
      */
     private final String labelValue;
 
+    /**
+     * container name (optional)
+     */
+    private final String containerName;
+
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
index 6f08dde14d..0dbff0ba33 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
@@ -48,7 +48,7 @@ public abstract class AbstractK8sTask extends 
AbstractRemoteTask {
             setExitStatusCode(response.getExitStatusCode());
             setAppIds(response.getAppIds());
         } catch (Exception e) {
-            log.error("k8s task submit failed with error", e);
+            log.error("k8s task submit failed with error");
             exitStatusCode = -1;
             throw new TaskException("Execute k8s task error", e);
         }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
index 6bdf8d39ed..045af3a484 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.k8s;
 
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@@ -32,14 +30,12 @@ public abstract class AbstractK8sTaskExecutor {
     protected Logger log;
     protected TaskExecutionContext taskRequest;
     protected K8sUtils k8sUtils;
-    protected StringBuilder logStringBuffer;
     protected Yaml yaml;
 
     protected AbstractK8sTaskExecutor(Logger log, TaskExecutionContext 
taskRequest) {
         this.log = log;
         this.taskRequest = taskRequest;
         this.k8sUtils = new K8sUtils();
-        this.logStringBuffer = new StringBuilder();
         this.yaml = new Yaml();
     }
 
@@ -53,14 +49,6 @@ public abstract class AbstractK8sTaskExecutor {
         }
     }
 
-    public void flushLog(TaskResponse taskResponse) {
-        if (logStringBuffer.length() != 0 && taskResponse.getExitStatusCode() 
== EXIT_CODE_FAILURE) {
-            log.error(logStringBuffer.toString());
-        } else if (logStringBuffer.length() != 0) {
-            log.info(logStringBuffer.toString());
-        }
-    }
-
     public abstract void submitJob2k8s(String k8sParameterStr);
 
     public abstract void stopJobOnK8s(String k8sParameterStr);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
index f357713d39..aecb8b1aac 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -30,7 +30,9 @@ import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.MI;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAME_LABEL;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RESTART_POLICY;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_INSTANCE_ID;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;
 
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -41,16 +43,23 @@ import 
org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -67,6 +76,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
 import io.fabric8.kubernetes.client.Watch;
 import io.fabric8.kubernetes.client.Watcher;
 import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.dsl.LogWatch;
 
 /**
  * K8sTaskExecutor used to submit k8s task to K8S
@@ -74,6 +84,9 @@ import io.fabric8.kubernetes.client.WatcherException;
 public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
 
     private Job job;
+    protected boolean podLogOutputIsFinished = false;
+    protected Future<?> podLogOutputFuture;
+
     public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
         super(logger, taskRequest);
     }
@@ -100,6 +113,8 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
         Map<String, String> labelMap = k8STaskMainParameters.getLabelMap();
         labelMap.put(LAYER_LABEL, LAYER_LABEL_VALUE);
         labelMap.put(NAME_LABEL, k8sJobName);
+        Map<String, String> podLabelMap = new HashMap<>();
+        podLabelMap.put(UNIQUE_LABEL_NAME, taskRequest.getTaskAppId());
         EnvVar taskInstanceIdVar = new EnvVar(TASK_INSTANCE_ID, 
taskInstanceId, null);
         List<EnvVar> envVars = new ArrayList<>();
         envVars.add(taskInstanceIdVar);
@@ -150,6 +165,9 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
                 .withNewSpec()
                 .withTtlSecondsAfterFinished(JOB_TTL_SECONDS)
                 .withNewTemplate()
+                .withNewMetadata()
+                .withLabels(podLabelMap)
+                .endMetadata()
                 .withNewSpec()
                 .addNewContainer()
                 .withName(k8sJobName)
@@ -170,36 +188,36 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
         return jobBuilder.build();
     }
 
-    public void registerBatchJobWatcher(Job job, String taskInstanceId, 
TaskResponse taskResponse,
-                                        K8sTaskMainParameters 
k8STaskMainParameters) {
+    public void registerBatchJobWatcher(Job job, String taskInstanceId, 
TaskResponse taskResponse) {
         CountDownLatch countDownLatch = new CountDownLatch(1);
         Watcher<Job> watcher = new Watcher<Job>() {
 
             @Override
             public void eventReceived(Action action, Job job) {
-                log.info("event received : job:{} action:{}", 
job.getMetadata().getName(), action);
-                if (action != Action.ADDED) {
-                    int jobStatus = getK8sJobStatus(job);
-                    log.info("job {} status {}", job.getMetadata().getName(), 
jobStatus);
-                    if (jobStatus == TaskConstants.RUNNING_CODE) {
-                        return;
+                try (
+                        final LogUtils.MDCAutoClosableContext 
mdcAutoClosableContext =
+                                
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath())) {
+                    log.info("event received : job:{} action:{}", 
job.getMetadata().getName(), action);
+                    if (action != Action.ADDED) {
+                        int jobStatus = getK8sJobStatus(job);
+                        log.info("job {} status {}", 
job.getMetadata().getName(), jobStatus);
+                        if (jobStatus == TaskConstants.RUNNING_CODE) {
+                            return;
+                        }
+                        setTaskStatus(jobStatus, taskInstanceId, taskResponse);
+                        countDownLatch.countDown();
                     }
-                    setTaskStatus(jobStatus, taskInstanceId, taskResponse, 
k8STaskMainParameters);
-                    countDownLatch.countDown();
                 }
             }
 
             @Override
             public void onClose(WatcherException e) {
-                logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail 
in k8s: %s", job.getMetadata().getName(),
-                        e.getMessage()));
+                log.error("[K8sJobExecutor-{}] fail in k8s: {}", 
job.getMetadata().getName(), e.getMessage());
                 taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
                 countDownLatch.countDown();
             }
         };
-        Watch watch = null;
-        try {
-            watch = 
k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher);
+        try (Watch watch = 
k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher)) {
             boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == 
TaskTimeoutStrategy.FAILED
                     || taskRequest.getTaskTimeoutStrategy() == 
TaskTimeoutStrategy.WARNFAILED;
             if (timeoutFlag) {
@@ -208,7 +226,6 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
             } else {
                 countDownLatch.await();
             }
-            flushLog(taskResponse);
         } catch (InterruptedException e) {
             log.error("job failed in k8s: {}", e.getMessage(), e);
             Thread.currentThread().interrupt();
@@ -216,19 +233,42 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
         } catch (Exception e) {
             log.error("job failed in k8s: {}", e.getMessage(), e);
             taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
-        } finally {
-            if (watch != null) {
-                watch.close();
-            }
         }
     }
 
+    private void parsePodLogOutput() {
+        ExecutorService collectPodLogExecutorService = ThreadUtils
+                
.newSingleDaemonScheduledExecutorService("CollectPodLogOutput-thread-" + 
taskRequest.getTaskName());
+
+        String taskInstanceId = 
String.valueOf(taskRequest.getTaskInstanceId());
+        String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
+        String containerName = String.format("%s-%s", taskName, 
taskInstanceId);
+        podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
+            try (
+                    final LogUtils.MDCAutoClosableContext 
mdcAutoClosableContext =
+                            
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
+                    LogWatch watcher = 
ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(),
+                            taskRequest.getTaskAppId(), containerName)) {
+                String line;
+                try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(watcher.getOutput()))) {
+                    while ((line = reader.readLine()) != null) {
+                        log.info("[K8S-pod-log] {}", line);
+                    }
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            } finally {
+                podLogOutputIsFinished = true;
+            }
+        });
+
+        collectPodLogExecutorService.shutdown();
+    }
+
     @Override
     public TaskResponse run(String k8sParameterStr) throws Exception {
         TaskResponse result = new TaskResponse();
         int taskInstanceId = taskRequest.getTaskInstanceId();
-        K8sTaskMainParameters k8STaskMainParameters =
-                JSONUtils.parseObject(k8sParameterStr, 
K8sTaskMainParameters.class);
         try {
             if (null == 
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
                 result.setExitStatusCode(EXIT_CODE_KILL);
@@ -242,9 +282,20 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
             String configYaml = k8sTaskExecutionContext.getConfigYaml();
             k8sUtils.buildClient(configYaml);
             submitJob2k8s(k8sParameterStr);
-            registerBatchJobWatcher(job, Integer.toString(taskInstanceId), 
result, k8STaskMainParameters);
+            parsePodLogOutput();
+            registerBatchJobWatcher(job, Integer.toString(taskInstanceId), 
result);
+
+            if (podLogOutputFuture != null) {
+                try {
+                    // Wait kubernetes pod log collection finished
+                    podLogOutputFuture.get();
+                } catch (ExecutionException e) {
+                    log.error("Handle pod log error", e);
+                }
+            }
         } catch (Exception e) {
             cancelApplication(k8sParameterStr);
+            Thread.currentThread().interrupt();
             result.setExitStatusCode(EXIT_CODE_FAILURE);
             throw e;
         }
@@ -270,9 +321,9 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
             stopJobOnK8s(k8sParameterStr);
             String namespaceName = k8STaskMainParameters.getNamespaceName();
             k8sUtils.createJob(namespaceName, job);
-            log.info("[K8sJobExecutor-{}-{}]  submitted job successfully", 
taskName, taskInstanceId);
+            log.info("[K8sJobExecutor-{}-{}] submitted job successfully", 
taskName, taskInstanceId);
         } catch (Exception e) {
-            log.error("[K8sJobExecutor-{}-{}]  fail to submit job", taskName, 
taskInstanceId);
+            log.error("[K8sJobExecutor-{}-{}] fail to submit job", taskName, 
taskInstanceId);
             throw new TaskException("K8sJobExecutor fail to submit job", e);
         }
     }
@@ -288,7 +339,7 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
                 k8sUtils.deleteJob(jobName, namespaceName);
             }
         } catch (Exception e) {
-            log.error("[K8sJobExecutor-{}]  fail to stop job", jobName);
+            log.error("[K8sJobExecutor-{}] fail to stop job", jobName);
             throw new TaskException("K8sJobExecutor fail to stop job", e);
         }
     }
@@ -304,21 +355,16 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
         }
     }
 
-    public void setTaskStatus(int jobStatus, String taskInstanceId, 
TaskResponse taskResponse,
-                              K8sTaskMainParameters k8STaskMainParameters) {
+    public void setTaskStatus(int jobStatus, String taskInstanceId, 
TaskResponse taskResponse) {
         if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) {
             if (null == 
TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId)))
 {
-                logStringBuffer.append(String.format("[K8sJobExecutor-%s] 
killed", job.getMetadata().getName()));
+                log.info("[K8sJobExecutor-{}] killed", 
job.getMetadata().getName());
                 taskResponse.setExitStatusCode(EXIT_CODE_KILL);
             } else if (jobStatus == EXIT_CODE_SUCCESS) {
-                logStringBuffer
-                        .append(String.format("[K8sJobExecutor-%s] succeed in 
k8s", job.getMetadata().getName()));
+                log.info("[K8sJobExecutor-{}] succeed in k8s", 
job.getMetadata().getName());
                 taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS);
             } else {
-                String errorMessage =
-                        k8sUtils.getPodLog(job.getMetadata().getName(), 
k8STaskMainParameters.getNamespaceName());
-                logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail 
in k8s: %s", job.getMetadata().getName(),
-                        errorMessage));
+                log.error("[K8sJobExecutor-{}] fail in k8s", 
job.getMetadata().getName());
                 taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
             }
         }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
index 130159137c..f0c3aa0f9a 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
@@ -142,12 +142,14 @@ public final class ProcessUtils {
      */
     public static void cancelApplication(TaskExecutionContext 
taskExecutionContext) {
         try {
-            if 
(Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext()) &&
-                    
!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) {
-                applicationManagerMap.get(ResourceManagerType.KUBERNETES)
-                        .killApplication(new 
KubernetesApplicationManagerContext(
-                                
taskExecutionContext.getK8sTaskExecutionContext(),
-                                taskExecutionContext.getTaskAppId()));
+            if 
(Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext())) {
+                if 
(!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) {
+                    // Set empty container name for Spark on K8S task
+                    applicationManagerMap.get(ResourceManagerType.KUBERNETES)
+                            .killApplication(new 
KubernetesApplicationManagerContext(
+                                    
taskExecutionContext.getK8sTaskExecutionContext(),
+                                    taskExecutionContext.getTaskAppId(), ""));
+                }
             } else {
                 String host = taskExecutionContext.getHost();
                 String executePath = taskExecutionContext.getExecutePath();
@@ -197,7 +199,7 @@ public final class ProcessUtils {
         KubernetesApplicationManager applicationManager =
                 (KubernetesApplicationManager) 
applicationManagerMap.get(ResourceManagerType.KUBERNETES);
         return applicationManager
-                .getApplicationStatus(new 
KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId));
+                .getApplicationStatus(new 
KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId, ""));
     }
 
     /**
@@ -207,12 +209,14 @@ public final class ProcessUtils {
      * @param taskAppId
      * @return
      */
-    public static LogWatch getPodLogWatcher(K8sTaskExecutionContext 
k8sTaskExecutionContext, String taskAppId) {
+    public static LogWatch getPodLogWatcher(K8sTaskExecutionContext 
k8sTaskExecutionContext, String taskAppId,
+                                            String containerName) {
         KubernetesApplicationManager applicationManager =
                 (KubernetesApplicationManager) 
applicationManagerMap.get(ResourceManagerType.KUBERNETES);
 
         return applicationManager
-                .getPodLogWatcher(new 
KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId));
+                .getPodLogWatcher(
+                        new 
KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId, 
containerName));
     }
 
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
index 9931a33ae5..46b226268c 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
@@ -34,6 +34,8 @@ import java.util.Map;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -41,6 +43,8 @@ import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
 
 public class K8sTaskExecutorTest {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(K8sTaskExecutorTest.class);
+
     private K8sTaskExecutor k8sTaskExecutor = null;
     private K8sTaskMainParameters k8sTaskMainParameters = null;
     private final String image = "ds-dev";
@@ -66,7 +70,7 @@ public class K8sTaskExecutorTest {
         requirement.setKey("node-label");
         requirement.setOperator("In");
         requirement.setValues(Arrays.asList("1234", "123456"));
-        k8sTaskExecutor = new K8sTaskExecutor(null, taskRequest);
+        k8sTaskExecutor = new K8sTaskExecutor(logger, taskRequest);
         k8sTaskMainParameters = new K8sTaskMainParameters();
         k8sTaskMainParameters.setImage(image);
         k8sTaskMainParameters.setImagePullPolicy(imagePullPolicy);
@@ -90,9 +94,8 @@ public class K8sTaskExecutorTest {
     public void testSetTaskStatusNormal() {
         int jobStatus = 0;
         TaskResponse taskResponse = new TaskResponse();
-        K8sTaskMainParameters k8STaskMainParameters = new 
K8sTaskMainParameters();
         k8sTaskExecutor.setJob(job);
-        k8sTaskExecutor.setTaskStatus(jobStatus, 
String.valueOf(taskInstanceId), taskResponse, k8STaskMainParameters);
+        k8sTaskExecutor.setTaskStatus(jobStatus, 
String.valueOf(taskInstanceId), taskResponse);
         Assertions.assertEquals(0, Integer.compare(EXIT_CODE_KILL, 
taskResponse.getExitStatusCode()));
     }
     @Test

Reply via email to