luoyedeyi commented on code in PR #9425:
URL: https://github.com/apache/dolphinscheduler/pull/9425#discussion_r870929267


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java:
##########
@@ -0,0 +1,191 @@
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RUNNING_CODE;
+
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+
+
+public abstract class AbstractK8sTaskExecutor {
+    protected Logger logger;
+    protected TaskExecutionContext taskRequest;
+    protected K8sUtils k8sUtils;
+    protected Job job;
+    protected StringBuffer logStringBuffer;
+
+    public AbstractK8sTaskExecutor(Logger logger, TaskExecutionContext 
taskRequest) {
+        this.logger = logger;
+        this.taskRequest = taskRequest;
+        this.k8sUtils = new K8sUtils();
+        this.logStringBuffer = new StringBuffer();
+    }
+
+    public TaskResponse run(String k8sParameterStr) throws Exception {
+        TaskResponse result = new TaskResponse();
+        int taskInstanceId = taskRequest.getTaskInstanceId();
+        K8sTaskMainParameters k8STaskMainParameters = 
JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
+        try {
+            if (null == 
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
+                result.setExitStatusCode(EXIT_CODE_KILL);
+                return result;
+            }
+            if (StringUtils.isEmpty(k8sParameterStr)) {
+                
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+                return result;
+            }
+            K8sTaskExecutionContext k8sTaskExecutionContext = 
taskRequest.getK8sTaskExecutionContext();
+            String configYaml = k8sTaskExecutionContext.getConfigYaml();
+            k8sUtils.buildClient(configYaml);
+            submitJob2k8s(taskRequest, k8STaskMainParameters);
+            registerBatchJobWatcher(job, Integer.toString(taskInstanceId), 
result, k8STaskMainParameters);
+        } catch (Exception e) {
+            result.setExitStatusCode(EXIT_CODE_FAILURE);
+            throw e;
+        }
+        return result;
+    }
+
+    public void cancelApplication() {
+        K8sTaskMainParameters k8STaskMainParameters = 
JSONUtils.parseObject(taskRequest.getTaskParams(), K8sTaskMainParameters.class);
+        if (job != null) {
+            stopJobOnK8s(job.getMetadata().getName(), k8STaskMainParameters);
+        }
+    }
+
+    public void registerBatchJobWatcher(Job job, String taskInstanceId, 
TaskResponse taskResponse, K8sTaskMainParameters k8STaskMainParameters) {
+        CountDownLatch countDownLatch = new CountDownLatch(1);

Review Comment:
   countDownLatch used to wait for job is completed in k8s and get the status 
of job. Once the job is finished in k8s, the running task worker will get 
corresponding result from watcher, then return the right task status to master. 
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to