caishunfeng commented on code in PR #9425:
URL: https://github.com/apache/dolphinscheduler/pull/9425#discussion_r847195686
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java:
##########
@@ -0,0 +1,191 @@
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RUNNING_CODE;
+
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+
+
+public abstract class AbstractK8sTaskExecutor {
+ protected Logger logger;
+ protected TaskExecutionContext taskRequest;
+ protected K8sUtils k8sUtils;
+ protected Job job;
+ protected StringBuffer logStringBuffer;
+
+ public AbstractK8sTaskExecutor(Logger logger, TaskExecutionContext
taskRequest) {
+ this.logger = logger;
+ this.taskRequest = taskRequest;
+ this.k8sUtils = new K8sUtils();
+ this.logStringBuffer = new StringBuffer();
+ }
+
+ public TaskResponse run(String k8sParameterStr) throws Exception {
+ TaskResponse result = new TaskResponse();
+ int taskInstanceId = taskRequest.getTaskInstanceId();
+ K8sTaskMainParameters k8STaskMainParameters =
JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
+ try {
+ if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
+ result.setExitStatusCode(EXIT_CODE_KILL);
+ return result;
+ }
+ if (StringUtils.isEmpty(k8sParameterStr)) {
+
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+ return result;
+ }
+ K8sTaskExecutionContext k8sTaskExecutionContext =
taskRequest.getK8sTaskExecutionContext();
+ String configYaml = k8sTaskExecutionContext.getConfigYaml();
+ k8sUtils.buildClient(configYaml);
+ submitJob2k8s(taskRequest, k8STaskMainParameters);
+ registerBatchJobWatcher(job, Integer.toString(taskInstanceId),
result, k8STaskMainParameters);
+ } catch (Exception e) {
+ result.setExitStatusCode(EXIT_CODE_FAILURE);
+ throw e;
+ }
+ return result;
+ }
+
+ public void cancelApplication() {
+ K8sTaskMainParameters k8STaskMainParameters =
JSONUtils.parseObject(taskRequest.getTaskParams(), K8sTaskMainParameters.class);
+ if (job != null) {
+ stopJobOnK8s(job.getMetadata().getName(), k8STaskMainParameters);
+ }
+ }
+
+ public void registerBatchJobWatcher(Job job, String taskInstanceId,
TaskResponse taskResponse, K8sTaskMainParameters k8STaskMainParameters) {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Watcher watcher = new Watcher<Job>() {
+ @Override
+ public void eventReceived(Action action, Job job) {
+ if (action != Action.ADDED) {
+ int jobStatus = getK8sJobStatus(job);
+ if (jobStatus == EXIT_CODE_SUCCESS || jobStatus ==
EXIT_CODE_FAILURE) {
+ if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId)))
{
+
logStringBuffer.append(String.format("[K8sJobExecutor-%s] killed",
job.getMetadata().getName()));
+ taskResponse.setExitStatusCode(EXIT_CODE_KILL);
+ } else if (jobStatus == EXIT_CODE_SUCCESS) {
+
logStringBuffer.append(String.format("[K8sJobExecutor-%s] succeed in k8s",
job.getMetadata().getName()));
+ taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS);
+ } else {
+ String errorMessage =
k8sUtils.getPodLog(job.getMetadata().getName(),
k8STaskMainParameters.getNamespace());
+
logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s",
job.getMetadata().getName(), errorMessage));
+ taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+ }
+ countDownLatch.countDown();
Review Comment:
Will this count down more than one time?
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ParameterUtils.java:
##########
@@ -0,0 +1,61 @@
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import org.apache.commons.lang.math.NumberUtils;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParameterUtils {
+ private static String NO_VALUE_KEY = "__NO_VALUE_KEY";
+ private static String DOUBLE_HYPHEN = "--";
+ private static String HYPHEN = "-";
+
+ public static Map fromArgs(String[] args) throws IllegalArgumentException
{
Review Comment:
Please add some note or example for it.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java:
##########
@@ -398,4 +403,19 @@ private TaskConstants() {
public static final String AWS_SECRET_ACCESS_KEY= "aws.secret.access.key";
public static final String AWS_REGION = "aws.region";
+ /**
+ * use for k8s task
+ */
+ public static final String API_VERSION = "batch/v1";
+ public static final String IMAGE_PULL_POLICY = "Always";
Review Comment:
Is it better to be configurable?
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ParameterUtils.java:
##########
@@ -0,0 +1,61 @@
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import org.apache.commons.lang.math.NumberUtils;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParameterUtils {
+ private static String NO_VALUE_KEY = "__NO_VALUE_KEY";
+ private static String DOUBLE_HYPHEN = "--";
+ private static String HYPHEN = "-";
+
+ public static Map fromArgs(String[] args) throws IllegalArgumentException
{
+ Map<String, String> map = new HashMap(args.length / 2);
+ int i = 0;
+ while (true) {
+ while (i < args.length) {
+ String key = getKeyFromArgs(args, i);
+ if (key.isEmpty()) {
+ throw new IllegalArgumentException("The input contains an
empty argument");
+ }
+
+ ++i;
+ if (i >= args.length) {
+ map.put(key, NO_VALUE_KEY);
+ } else if (NumberUtils.isNumber(args[i])) {
+ map.put(key, args[i]);
+ ++i;
+ } else if (!args[i].startsWith(DOUBLE_HYPHEN) &&
!args[i].startsWith(HYPHEN)) {
+ map.put(key, args[i]);
+ ++i;
+ } else {
+ map.put(key, NO_VALUE_KEY);
+ }
+ }
+
+ return map;
+ }
+ }
+
+ public static String getKeyFromArgs(String[] args, int index) {
Review Comment:
Please add some note or example for it.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java:
##########
@@ -0,0 +1,191 @@
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RUNNING_CODE;
+
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+
+
+public abstract class AbstractK8sTaskExecutor {
+ protected Logger logger;
+ protected TaskExecutionContext taskRequest;
+ protected K8sUtils k8sUtils;
+ protected Job job;
+ protected StringBuffer logStringBuffer;
+
+ public AbstractK8sTaskExecutor(Logger logger, TaskExecutionContext
taskRequest) {
+ this.logger = logger;
+ this.taskRequest = taskRequest;
+ this.k8sUtils = new K8sUtils();
+ this.logStringBuffer = new StringBuffer();
+ }
+
+ public TaskResponse run(String k8sParameterStr) throws Exception {
+ TaskResponse result = new TaskResponse();
+ int taskInstanceId = taskRequest.getTaskInstanceId();
+ K8sTaskMainParameters k8STaskMainParameters =
JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
+ try {
+ if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
+ result.setExitStatusCode(EXIT_CODE_KILL);
+ return result;
+ }
+ if (StringUtils.isEmpty(k8sParameterStr)) {
+
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+ return result;
+ }
+ K8sTaskExecutionContext k8sTaskExecutionContext =
taskRequest.getK8sTaskExecutionContext();
+ String configYaml = k8sTaskExecutionContext.getConfigYaml();
+ k8sUtils.buildClient(configYaml);
+ submitJob2k8s(taskRequest, k8STaskMainParameters);
+ registerBatchJobWatcher(job, Integer.toString(taskInstanceId),
result, k8STaskMainParameters);
+ } catch (Exception e) {
+ result.setExitStatusCode(EXIT_CODE_FAILURE);
+ throw e;
+ }
+ return result;
+ }
+
+ public void cancelApplication() {
+ K8sTaskMainParameters k8STaskMainParameters =
JSONUtils.parseObject(taskRequest.getTaskParams(), K8sTaskMainParameters.class);
+ if (job != null) {
+ stopJobOnK8s(job.getMetadata().getName(), k8STaskMainParameters);
+ }
+ }
+
+ public void registerBatchJobWatcher(Job job, String taskInstanceId,
TaskResponse taskResponse, K8sTaskMainParameters k8STaskMainParameters) {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
Review Comment:
What does `countDownLatch` do? It seems multiple places countdown.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java:
##########
@@ -398,4 +403,19 @@ private TaskConstants() {
public static final String AWS_SECRET_ACCESS_KEY= "aws.secret.access.key";
public static final String AWS_REGION = "aws.region";
+ /**
+ * use for k8s task
+ */
+ public static final String API_VERSION = "batch/v1";
+ public static final String IMAGE_PULL_POLICY = "Always";
+ public static final String RESTART_POLICY = "Never";
Review Comment:
Is it better to be configurable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]