This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 04800a48d1 [Feature-13763][K8S Task] collect real-time log (#14379)
04800a48d1 is described below
commit 04800a48d19700809ce9b97d2efaf1be34d4e474
Author: Aaron Wang <[email protected]>
AuthorDate: Thu Jul 20 18:38:25 2023 +0800
[Feature-13763][K8S Task] collect real-time log (#14379)
* [Improvement-13763][K8S Task] collect real-time log
* fix codesmell
* get pod watcher until pod is ready
* fix codesmell
* specify container name & loop waiting pod creation
* sleep when pod is not initialized
---------
Co-authored-by: Jay Chung <[email protected]>
---
.../plugin/task/api/AbstractCommandExecutor.java | 4 +-
.../task/api/am/KubernetesApplicationManager.java | 57 +++++++---
.../am/KubernetesApplicationManagerContext.java | 7 +-
.../plugin/task/api/k8s/AbstractK8sTask.java | 2 +-
.../task/api/k8s/AbstractK8sTaskExecutor.java | 12 ---
.../plugin/task/api/k8s/impl/K8sTaskExecutor.java | 116 ++++++++++++++-------
.../plugin/task/api/utils/ProcessUtils.java | 22 ++--
.../plugin/task/api/k8s/K8sTaskExecutorTest.java | 9 +-
8 files changed, 149 insertions(+), 80 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 4c8edc6dac..2501e1e824 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -201,7 +201,7 @@ public abstract class AbstractCommandExecutor {
// Wait the task log process finished.
taskOutputFuture.get();
} catch (ExecutionException e) {
- logger.info("Handle task log error", e);
+ logger.error("Handle task log error", e);
}
}
@@ -272,7 +272,7 @@ public abstract class AbstractCommandExecutor {
ThreadUtils.sleep(SLEEP_TIME_MILLIS * 5L);
try (
LogWatch watcher =
ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(),
- taskRequest.getTaskAppId())) {
+ taskRequest.getTaskAppId(), "")) {
if (watcher == null) {
throw new RuntimeException("The driver pod does not
exist.");
} else {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
index 92aa6cb447..ac1ce69f76 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
@@ -17,9 +17,11 @@
package org.apache.dolphinscheduler.plugin.task.api.am;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.SLEEP_TIME_MILLIS;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;
import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -54,6 +56,8 @@ public class KubernetesApplicationManager implements
ApplicationManager {
private static final String FAILED = "Failed";
private static final String UNKNOWN = "Unknown";
+ private static final int MAX_RETRY_TIMES = 10;
+
/**
* cache k8s client for same task
*/
@@ -67,7 +71,7 @@ public class KubernetesApplicationManager implements
ApplicationManager {
boolean isKill;
String labelValue =
kubernetesApplicationManagerContext.getLabelValue();
FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
- getDriverPod(kubernetesApplicationManagerContext);
+ getListenPod(kubernetesApplicationManagerContext);
try {
if (getApplicationStatus(kubernetesApplicationManagerContext,
watchList).isFailure()) {
log.error("Driver pod is in FAILED or UNKNOWN status.");
@@ -97,16 +101,24 @@ public class KubernetesApplicationManager implements
ApplicationManager {
* @param kubernetesApplicationManagerContext
* @return
*/
- private FilterWatchListDeletable<Pod, PodList, PodResource>
getDriverPod(KubernetesApplicationManagerContext
kubernetesApplicationManagerContext) {
+ private FilterWatchListDeletable<Pod, PodList, PodResource>
getListenPod(KubernetesApplicationManagerContext
kubernetesApplicationManagerContext) {
KubernetesClient client =
getClient(kubernetesApplicationManagerContext);
String labelValue =
kubernetesApplicationManagerContext.getLabelValue();
- FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
client.pods()
-
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
- .withLabel(UNIQUE_LABEL_NAME, labelValue);
- List<Pod> podList = watchList.list().getItems();
- if (podList.size() != 1) {
- log.warn("Expected driver pod 1, but get {}.", podList.size());
+ List<Pod> podList = null;
+ FilterWatchListDeletable<Pod, PodList, PodResource> watchList = null;
+ int retryTimes = 0;
+ while (CollectionUtils.isEmpty(podList) && retryTimes <
MAX_RETRY_TIMES) {
+ watchList = client.pods()
+
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
+ .withLabel(UNIQUE_LABEL_NAME, labelValue);
+ podList = watchList.list().getItems();
+ if (!CollectionUtils.isEmpty(podList)) {
+ break;
+ }
+ ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+ retryTimes += 1;
}
+
return watchList;
}
@@ -153,7 +165,7 @@ public class KubernetesApplicationManager implements
ApplicationManager {
String phase;
try {
if (Objects.isNull(watchList)) {
- watchList = getDriverPod(kubernetesApplicationManagerContext);
+ watchList = getListenPod(kubernetesApplicationManagerContext);
}
List<Pod> driverPod = watchList.list().getItems();
if (!driverPod.isEmpty()) {
@@ -180,16 +192,27 @@ public class KubernetesApplicationManager implements
ApplicationManager {
*/
public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext
kubernetesApplicationManagerContext) {
KubernetesClient client =
getClient(kubernetesApplicationManagerContext);
- FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
- getDriverPod(kubernetesApplicationManagerContext);
- List<Pod> driverPod = watchList.list().getItems();
- if (CollectionUtils.isEmpty(driverPod)) {
- return null;
+ boolean podIsReady = false;
+ Pod pod = null;
+ while (!podIsReady) {
+ FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
+ getListenPod(kubernetesApplicationManagerContext);
+ List<Pod> podList = watchList == null ? null :
watchList.list().getItems();
+ if (CollectionUtils.isEmpty(podList)) {
+ return null;
+ }
+ pod = podList.get(0);
+ String phase = pod.getStatus().getPhase();
+ if (phase.equals(PENDING) || phase.equals(UNKNOWN)) {
+ ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+ } else {
+ podIsReady = true;
+ }
}
- Pod driver = driverPod.get(0);
- return client.pods().inNamespace(driver.getMetadata().getNamespace())
- .withName(driver.getMetadata().getName())
+ return client.pods().inNamespace(pod.getMetadata().getNamespace())
+ .withName(pod.getMetadata().getName())
+
.inContainer(kubernetesApplicationManagerContext.getContainerName())
.watchLog();
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
index 19dd223477..2a37d16d35 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
@@ -32,8 +32,13 @@ public class KubernetesApplicationManagerContext implements
ApplicationManagerCo
private final K8sTaskExecutionContext k8sTaskExecutionContext;
/**
- * driver pod label value
+ * pod label value
*/
private final String labelValue;
+ /**
+ * container name (optional)
+ */
+ private final String containerName;
+
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
index 6f08dde14d..0dbff0ba33 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
@@ -48,7 +48,7 @@ public abstract class AbstractK8sTask extends
AbstractRemoteTask {
setExitStatusCode(response.getExitStatusCode());
setAppIds(response.getAppIds());
} catch (Exception e) {
- log.error("k8s task submit failed with error", e);
+ log.error("k8s task submit failed with error");
exitStatusCode = -1;
throw new TaskException("Execute k8s task error", e);
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
index 6bdf8d39ed..045af3a484 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.api.k8s;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@@ -32,14 +30,12 @@ public abstract class AbstractK8sTaskExecutor {
protected Logger log;
protected TaskExecutionContext taskRequest;
protected K8sUtils k8sUtils;
- protected StringBuilder logStringBuffer;
protected Yaml yaml;
protected AbstractK8sTaskExecutor(Logger log, TaskExecutionContext
taskRequest) {
this.log = log;
this.taskRequest = taskRequest;
this.k8sUtils = new K8sUtils();
- this.logStringBuffer = new StringBuilder();
this.yaml = new Yaml();
}
@@ -53,14 +49,6 @@ public abstract class AbstractK8sTaskExecutor {
}
}
- public void flushLog(TaskResponse taskResponse) {
- if (logStringBuffer.length() != 0 && taskResponse.getExitStatusCode()
== EXIT_CODE_FAILURE) {
- log.error(logStringBuffer.toString());
- } else if (logStringBuffer.length() != 0) {
- log.info(logStringBuffer.toString());
- }
- }
-
public abstract void submitJob2k8s(String k8sParameterStr);
public abstract void stopJobOnK8s(String k8sParameterStr);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
index f357713d39..aecb8b1aac 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -30,7 +30,9 @@ import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.MI;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAME_LABEL;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RESTART_POLICY;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_INSTANCE_ID;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -41,16 +43,23 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.commons.lang3.StringUtils;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@@ -67,6 +76,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.dsl.LogWatch;
/**
* K8sTaskExecutor used to submit k8s task to K8S
@@ -74,6 +84,9 @@ import io.fabric8.kubernetes.client.WatcherException;
public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
private Job job;
+ protected boolean podLogOutputIsFinished = false;
+ protected Future<?> podLogOutputFuture;
+
public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
super(logger, taskRequest);
}
@@ -100,6 +113,8 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
Map<String, String> labelMap = k8STaskMainParameters.getLabelMap();
labelMap.put(LAYER_LABEL, LAYER_LABEL_VALUE);
labelMap.put(NAME_LABEL, k8sJobName);
+ Map<String, String> podLabelMap = new HashMap<>();
+ podLabelMap.put(UNIQUE_LABEL_NAME, taskRequest.getTaskAppId());
EnvVar taskInstanceIdVar = new EnvVar(TASK_INSTANCE_ID,
taskInstanceId, null);
List<EnvVar> envVars = new ArrayList<>();
envVars.add(taskInstanceIdVar);
@@ -150,6 +165,9 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
.withNewSpec()
.withTtlSecondsAfterFinished(JOB_TTL_SECONDS)
.withNewTemplate()
+ .withNewMetadata()
+ .withLabels(podLabelMap)
+ .endMetadata()
.withNewSpec()
.addNewContainer()
.withName(k8sJobName)
@@ -170,36 +188,36 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
return jobBuilder.build();
}
- public void registerBatchJobWatcher(Job job, String taskInstanceId,
TaskResponse taskResponse,
- K8sTaskMainParameters
k8STaskMainParameters) {
+ public void registerBatchJobWatcher(Job job, String taskInstanceId,
TaskResponse taskResponse) {
CountDownLatch countDownLatch = new CountDownLatch(1);
Watcher<Job> watcher = new Watcher<Job>() {
@Override
public void eventReceived(Action action, Job job) {
- log.info("event received : job:{} action:{}",
job.getMetadata().getName(), action);
- if (action != Action.ADDED) {
- int jobStatus = getK8sJobStatus(job);
- log.info("job {} status {}", job.getMetadata().getName(),
jobStatus);
- if (jobStatus == TaskConstants.RUNNING_CODE) {
- return;
+ try (
+ final LogUtils.MDCAutoClosableContext
mdcAutoClosableContext =
+
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath())) {
+ log.info("event received : job:{} action:{}",
job.getMetadata().getName(), action);
+ if (action != Action.ADDED) {
+ int jobStatus = getK8sJobStatus(job);
+ log.info("job {} status {}",
job.getMetadata().getName(), jobStatus);
+ if (jobStatus == TaskConstants.RUNNING_CODE) {
+ return;
+ }
+ setTaskStatus(jobStatus, taskInstanceId, taskResponse);
+ countDownLatch.countDown();
}
- setTaskStatus(jobStatus, taskInstanceId, taskResponse,
k8STaskMainParameters);
- countDownLatch.countDown();
}
}
@Override
public void onClose(WatcherException e) {
- logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail
in k8s: %s", job.getMetadata().getName(),
- e.getMessage()));
+ log.error("[K8sJobExecutor-{}] fail in k8s: {}",
job.getMetadata().getName(), e.getMessage());
taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
countDownLatch.countDown();
}
};
- Watch watch = null;
- try {
- watch =
k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher);
+ try (Watch watch =
k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher)) {
boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() ==
TaskTimeoutStrategy.FAILED
|| taskRequest.getTaskTimeoutStrategy() ==
TaskTimeoutStrategy.WARNFAILED;
if (timeoutFlag) {
@@ -208,7 +226,6 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
} else {
countDownLatch.await();
}
- flushLog(taskResponse);
} catch (InterruptedException e) {
log.error("job failed in k8s: {}", e.getMessage(), e);
Thread.currentThread().interrupt();
@@ -216,19 +233,42 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
} catch (Exception e) {
log.error("job failed in k8s: {}", e.getMessage(), e);
taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
- } finally {
- if (watch != null) {
- watch.close();
- }
}
}
+ private void parsePodLogOutput() {
+ ExecutorService collectPodLogExecutorService = ThreadUtils
+
.newSingleDaemonScheduledExecutorService("CollectPodLogOutput-thread-" +
taskRequest.getTaskName());
+
+ String taskInstanceId =
String.valueOf(taskRequest.getTaskInstanceId());
+ String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
+ String containerName = String.format("%s-%s", taskName,
taskInstanceId);
+ podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
+ try (
+ final LogUtils.MDCAutoClosableContext
mdcAutoClosableContext =
+
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
+ LogWatch watcher =
ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(),
+ taskRequest.getTaskAppId(), containerName)) {
+ String line;
+ try (BufferedReader reader = new BufferedReader(new
InputStreamReader(watcher.getOutput()))) {
+ while ((line = reader.readLine()) != null) {
+ log.info("[K8S-pod-log] {}", line);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ podLogOutputIsFinished = true;
+ }
+ });
+
+ collectPodLogExecutorService.shutdown();
+ }
+
@Override
public TaskResponse run(String k8sParameterStr) throws Exception {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
- K8sTaskMainParameters k8STaskMainParameters =
- JSONUtils.parseObject(k8sParameterStr,
K8sTaskMainParameters.class);
try {
if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
result.setExitStatusCode(EXIT_CODE_KILL);
@@ -242,9 +282,20 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
String configYaml = k8sTaskExecutionContext.getConfigYaml();
k8sUtils.buildClient(configYaml);
submitJob2k8s(k8sParameterStr);
- registerBatchJobWatcher(job, Integer.toString(taskInstanceId),
result, k8STaskMainParameters);
+ parsePodLogOutput();
+ registerBatchJobWatcher(job, Integer.toString(taskInstanceId),
result);
+
+ if (podLogOutputFuture != null) {
+ try {
+ // Wait kubernetes pod log collection finished
+ podLogOutputFuture.get();
+ } catch (ExecutionException e) {
+ log.error("Handle pod log error", e);
+ }
+ }
} catch (Exception e) {
cancelApplication(k8sParameterStr);
+ Thread.currentThread().interrupt();
result.setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
}
@@ -270,9 +321,9 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
stopJobOnK8s(k8sParameterStr);
String namespaceName = k8STaskMainParameters.getNamespaceName();
k8sUtils.createJob(namespaceName, job);
- log.info("[K8sJobExecutor-{}-{}] submitted job successfully",
taskName, taskInstanceId);
+ log.info("[K8sJobExecutor-{}-{}] submitted job successfully",
taskName, taskInstanceId);
} catch (Exception e) {
- log.error("[K8sJobExecutor-{}-{}] fail to submit job", taskName,
taskInstanceId);
+ log.error("[K8sJobExecutor-{}-{}] fail to submit job", taskName,
taskInstanceId);
throw new TaskException("K8sJobExecutor fail to submit job", e);
}
}
@@ -288,7 +339,7 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
k8sUtils.deleteJob(jobName, namespaceName);
}
} catch (Exception e) {
- log.error("[K8sJobExecutor-{}] fail to stop job", jobName);
+ log.error("[K8sJobExecutor-{}] fail to stop job", jobName);
throw new TaskException("K8sJobExecutor fail to stop job", e);
}
}
@@ -304,21 +355,16 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
}
}
- public void setTaskStatus(int jobStatus, String taskInstanceId,
TaskResponse taskResponse,
- K8sTaskMainParameters k8STaskMainParameters) {
+ public void setTaskStatus(int jobStatus, String taskInstanceId,
TaskResponse taskResponse) {
if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) {
if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId)))
{
- logStringBuffer.append(String.format("[K8sJobExecutor-%s]
killed", job.getMetadata().getName()));
+ log.info("[K8sJobExecutor-{}] killed",
job.getMetadata().getName());
taskResponse.setExitStatusCode(EXIT_CODE_KILL);
} else if (jobStatus == EXIT_CODE_SUCCESS) {
- logStringBuffer
- .append(String.format("[K8sJobExecutor-%s] succeed in
k8s", job.getMetadata().getName()));
+ log.info("[K8sJobExecutor-{}] succeed in k8s",
job.getMetadata().getName());
taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS);
} else {
- String errorMessage =
- k8sUtils.getPodLog(job.getMetadata().getName(),
k8STaskMainParameters.getNamespaceName());
- logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail
in k8s: %s", job.getMetadata().getName(),
- errorMessage));
+ log.error("[K8sJobExecutor-{}] fail in k8s",
job.getMetadata().getName());
taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
index 130159137c..f0c3aa0f9a 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
@@ -142,12 +142,14 @@ public final class ProcessUtils {
*/
public static void cancelApplication(TaskExecutionContext
taskExecutionContext) {
try {
- if
(Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext()) &&
-
!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) {
- applicationManagerMap.get(ResourceManagerType.KUBERNETES)
- .killApplication(new
KubernetesApplicationManagerContext(
-
taskExecutionContext.getK8sTaskExecutionContext(),
- taskExecutionContext.getTaskAppId()));
+ if
(Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext())) {
+ if
(!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) {
+ // Set empty container name for Spark on K8S task
+ applicationManagerMap.get(ResourceManagerType.KUBERNETES)
+ .killApplication(new
KubernetesApplicationManagerContext(
+
taskExecutionContext.getK8sTaskExecutionContext(),
+ taskExecutionContext.getTaskAppId(), ""));
+ }
} else {
String host = taskExecutionContext.getHost();
String executePath = taskExecutionContext.getExecutePath();
@@ -197,7 +199,7 @@ public final class ProcessUtils {
KubernetesApplicationManager applicationManager =
(KubernetesApplicationManager)
applicationManagerMap.get(ResourceManagerType.KUBERNETES);
return applicationManager
- .getApplicationStatus(new
KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId));
+ .getApplicationStatus(new
KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId, ""));
}
/**
@@ -207,12 +209,14 @@ public final class ProcessUtils {
* @param taskAppId
* @return
*/
- public static LogWatch getPodLogWatcher(K8sTaskExecutionContext
k8sTaskExecutionContext, String taskAppId) {
+ public static LogWatch getPodLogWatcher(K8sTaskExecutionContext
k8sTaskExecutionContext, String taskAppId,
+ String containerName) {
KubernetesApplicationManager applicationManager =
(KubernetesApplicationManager)
applicationManagerMap.get(ResourceManagerType.KUBERNETES);
return applicationManager
- .getPodLogWatcher(new
KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId));
+ .getPodLogWatcher(
+ new
KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId,
containerName));
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
index 9931a33ae5..46b226268c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
@@ -34,6 +34,8 @@ import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -41,6 +43,8 @@ import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
public class K8sTaskExecutorTest {
+ private static final Logger logger =
LoggerFactory.getLogger(K8sTaskExecutorTest.class);
+
private K8sTaskExecutor k8sTaskExecutor = null;
private K8sTaskMainParameters k8sTaskMainParameters = null;
private final String image = "ds-dev";
@@ -66,7 +70,7 @@ public class K8sTaskExecutorTest {
requirement.setKey("node-label");
requirement.setOperator("In");
requirement.setValues(Arrays.asList("1234", "123456"));
- k8sTaskExecutor = new K8sTaskExecutor(null, taskRequest);
+ k8sTaskExecutor = new K8sTaskExecutor(logger, taskRequest);
k8sTaskMainParameters = new K8sTaskMainParameters();
k8sTaskMainParameters.setImage(image);
k8sTaskMainParameters.setImagePullPolicy(imagePullPolicy);
@@ -90,9 +94,8 @@ public class K8sTaskExecutorTest {
public void testSetTaskStatusNormal() {
int jobStatus = 0;
TaskResponse taskResponse = new TaskResponse();
- K8sTaskMainParameters k8STaskMainParameters = new
K8sTaskMainParameters();
k8sTaskExecutor.setJob(job);
- k8sTaskExecutor.setTaskStatus(jobStatus,
String.valueOf(taskInstanceId), taskResponse, k8STaskMainParameters);
+ k8sTaskExecutor.setTaskStatus(jobStatus,
String.valueOf(taskInstanceId), taskResponse);
Assertions.assertEquals(0, Integer.compare(EXIT_CODE_KILL,
taskResponse.getExitStatusCode()));
}
@Test