luoyedeyi commented on code in PR #9425:
URL: https://github.com/apache/dolphinscheduler/pull/9425#discussion_r870930179
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java:
##########
@@ -0,0 +1,191 @@
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RUNNING_CODE;
+
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+
+
+public abstract class AbstractK8sTaskExecutor {
+ protected Logger logger;
+ protected TaskExecutionContext taskRequest;
+ protected K8sUtils k8sUtils;
+ protected Job job;
+ protected StringBuffer logStringBuffer;
+
+ public AbstractK8sTaskExecutor(Logger logger, TaskExecutionContext
taskRequest) {
+ this.logger = logger;
+ this.taskRequest = taskRequest;
+ this.k8sUtils = new K8sUtils();
+ this.logStringBuffer = new StringBuffer();
+ }
+
+ public TaskResponse run(String k8sParameterStr) throws Exception {
+ TaskResponse result = new TaskResponse();
+ int taskInstanceId = taskRequest.getTaskInstanceId();
+ K8sTaskMainParameters k8STaskMainParameters =
JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
+ try {
+ if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
+ result.setExitStatusCode(EXIT_CODE_KILL);
+ return result;
+ }
+ if (StringUtils.isEmpty(k8sParameterStr)) {
+
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+ return result;
+ }
+ K8sTaskExecutionContext k8sTaskExecutionContext =
taskRequest.getK8sTaskExecutionContext();
+ String configYaml = k8sTaskExecutionContext.getConfigYaml();
+ k8sUtils.buildClient(configYaml);
+ submitJob2k8s(taskRequest, k8STaskMainParameters);
+ registerBatchJobWatcher(job, Integer.toString(taskInstanceId),
result, k8STaskMainParameters);
+ } catch (Exception e) {
+ result.setExitStatusCode(EXIT_CODE_FAILURE);
+ throw e;
+ }
+ return result;
+ }
+
+ public void cancelApplication() {
+ K8sTaskMainParameters k8STaskMainParameters =
JSONUtils.parseObject(taskRequest.getTaskParams(), K8sTaskMainParameters.class);
+ if (job != null) {
+ stopJobOnK8s(job.getMetadata().getName(), k8STaskMainParameters);
+ }
+ }
+
+ public void registerBatchJobWatcher(Job job, String taskInstanceId,
TaskResponse taskResponse, K8sTaskMainParameters k8STaskMainParameters) {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Watcher watcher = new Watcher<Job>() {
+ @Override
+ public void eventReceived(Action action, Job job) {
+ if (action != Action.ADDED) {
+ int jobStatus = getK8sJobStatus(job);
+ if (jobStatus == EXIT_CODE_SUCCESS || jobStatus ==
EXIT_CODE_FAILURE) {
+ if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId)))
{
+
logStringBuffer.append(String.format("[K8sJobExecutor-%s] killed",
job.getMetadata().getName()));
+ taskResponse.setExitStatusCode(EXIT_CODE_KILL);
+ } else if (jobStatus == EXIT_CODE_SUCCESS) {
+
logStringBuffer.append(String.format("[K8sJobExecutor-%s] succeed in k8s",
job.getMetadata().getName()));
+ taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS);
+ } else {
+ String errorMessage =
k8sUtils.getPodLog(job.getMetadata().getName(),
k8STaskMainParameters.getNamespace());
+
logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s",
job.getMetadata().getName(), errorMessage));
+ taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+ }
+ countDownLatch.countDown();
Review Comment:
The countdown only happens when job success or fail by accident.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]