This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 047fa2f65e [Feature-13511] Submit Spark task directly on Kubernetes
(#13550)
047fa2f65e is described below
commit 047fa2f65e58b86e2f03a89ce5ac2db264ee5e0f
Author: Aaron Wang <[email protected]>
AuthorDate: Tue Feb 21 23:26:21 2023 +0800
[Feature-13511] Submit Spark task directly on Kubernetes (#13550)
---
deploy/kubernetes/dolphinscheduler/values.yaml | 2 +-
docs/docs/en/architecture/configuration.md | 2 +-
docs/docs/zh/architecture/configuration.md | 2 +-
.../common/enums/ResourceManagerType.java | 26 +-
.../dolphinscheduler/common/utils/FileUtils.java | 13 +
.../server/master/config/MasterConfig.java | 4 +-
.../master/runner/task/BaseTaskProcessor.java | 51 ++-
.../master/service/MasterFailoverService.java | 10 +-
.../master/service/WorkerFailoverService.java | 10 +-
.../src/main/resources/application.yaml | 4 +-
.../service/utils/ProcessUtils.java | 10 +-
.../src/main/resources/application.yaml | 4 +-
.../plugin/task/api/AbstractCommandExecutor.java | 20 +-
.../plugin/task/api/K8sTaskExecutionContext.java | 20 +-
.../plugin/task/api/ShellCommandExecutor.java | 10 +
.../plugin/task/api/TaskConstants.java | 7 +-
.../ApplicationManager.java} | 37 +--
.../ApplicationManagerContext.java} | 26 +-
.../task/api/am/KubernetesApplicationManager.java | 198 ++++++++++++
.../KubernetesApplicationManagerContext.java} | 35 +-
.../YarnApplicationManager.java} | 135 ++------
.../YarnApplicationManagerContext.java} | 38 +--
.../plugin/task/api/utils/ProcessUtils.java | 359 +++++----------------
.../dolphinscheduler-task-spark/pom.xml | 4 +
.../plugin/task/spark/SparkConstants.java | 32 +-
.../plugin/task/spark/SparkParameters.java | 15 +-
.../plugin/task/spark/SparkTask.java | 38 ++-
.../plugin/task/spark/SparkTaskTest.java | 20 +-
.../task/components/node/fields/use-namespace.ts | 3 +-
.../task/components/node/fields/use-spark.ts | 2 +
.../projects/task/components/node/format-data.ts | 3 +
.../server/worker/processor/TaskKillProcessor.java | 71 +---
.../worker/runner/WorkerTaskExecuteRunnable.java | 14 +-
33 files changed, 586 insertions(+), 639 deletions(-)
diff --git a/deploy/kubernetes/dolphinscheduler/values.yaml
b/deploy/kubernetes/dolphinscheduler/values.yaml
index 329511a226..20527f9e39 100644
--- a/deploy/kubernetes/dolphinscheduler/values.yaml
+++ b/deploy/kubernetes/dolphinscheduler/values.yaml
@@ -313,7 +313,7 @@ master:
MASTER_MAX_CPU_LOAD_AVG: "-1"
MASTER_RESERVED_MEMORY: "0.3"
MASTER_FAILOVER_INTERVAL: "10m"
- MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER: "true"
+ MASTER_KILL_APPLICATION_WHEN_HANDLE_FAILOVER: "true"
worker:
## PodManagementPolicy controls how pods are created during initial scale
up, when replacing pods on nodes, or when scaling down.
diff --git a/docs/docs/en/architecture/configuration.md
b/docs/docs/en/architecture/configuration.md
index 80cf3060e8..3821c7fefd 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -276,7 +276,7 @@ Location: `master-server/conf/application.yaml`
|master.max-cpu-load-avg|-1|master max CPU load avg, only higher than the
system CPU load average, master server can schedule. default value -1: the
number of CPU cores * 2|
|master.reserved-memory|0.3|master reserved memory, only lower than system
available memory, master server can schedule. default value 0.3, the unit is G|
|master.failover-interval|10|failover interval, the unit is minute|
-|master.kill-yarn-job-when-task-failover|true|whether to kill yarn job when
failover taskInstance|
+|master.kill-application-when-task-failover|true|whether to kill yarn/k8s
application when failover taskInstance|
|master.registry-disconnect-strategy.strategy|stop|Used when the master
disconnect from registry, default value: stop. Optional values include stop,
waiting|
|master.registry-disconnect-strategy.max-waiting-time|100s|Used when the
master disconnect from registry, and the disconnect strategy is waiting, this
config means the master will waiting to reconnect to registry in given times,
and after the waiting times, if the master still cannot connect to registry,
will stop itself, if the value is 0s, the Master will wait infinitely|
|master.worker-group-refresh-interval|10s|The interval to refresh worker group
from db to memory|
diff --git a/docs/docs/zh/architecture/configuration.md
b/docs/docs/zh/architecture/configuration.md
index 105b433b7c..6db0975606 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -271,7 +271,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
|master.max-cpu-load-avg|-1|master最大cpuload均值,只有高于系统cpuload均值时,master服务才能调度任务.
默认值为-1: cpu cores * 2|
|master.reserved-memory|0.3|master预留内存,只有低于系统可用内存时,master服务才能调度任务,单位为G|
|master.failover-interval|10|failover间隔,单位为分钟|
-|master.kill-yarn-job-when-task-failover|true|当任务实例failover时,是否kill掉yarn job|
+|master.kill-application-when-task-failover|true|当任务实例failover时,是否kill掉yarn或k8s
application|
|master.registry-disconnect-strategy.strategy|stop|当Master与注册中心失联之后采取的策略,
默认值是: stop. 可选值包括: stop, waiting|
|master.registry-disconnect-strategy.max-waiting-time|100s|当Master与注册中心失联之后重连时间,
之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连,
在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
|master.master.worker-group-refresh-interval|10s|定期将workerGroup从数据库中同步到内存的时间间隔|
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceManagerType.java
similarity index 59%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
copy to
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceManagerType.java
index 7e39f184fa..265b0765bf 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceManagerType.java
@@ -15,30 +15,12 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.api;
+package org.apache.dolphinscheduler.common.enums;
-import java.io.Serializable;
+public enum ResourceManagerType {
-/**
- * k8s Task ExecutionContext
- */
-
-public class K8sTaskExecutionContext implements Serializable {
-
- private String configYaml;
-
- public String getConfigYaml() {
- return configYaml;
- }
+ YARN,
- public void setConfigYaml(String configYaml) {
- this.configYaml = configYaml;
- }
+ KUBERNETES;
- @Override
- public String toString() {
- return "K8sTaskExecutionContext{"
- + "configYaml='" + configYaml + '\''
- + '}';
- }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index 36f075e00e..b2776bc7b9 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.utils;
import static
org.apache.dolphinscheduler.common.constants.Constants.DATA_BASEDIR_PATH;
import static
org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
+import static
org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
import static
org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_VIEW_SUFFIXES;
import static
org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE;
import static org.apache.dolphinscheduler.common.constants.Constants.UTF_8;
@@ -49,6 +50,8 @@ public class FileUtils {
public static final String APPINFO_PATH = "appInfo.log";
+ public static final String KUBE_CONFIG_FILE = "config";
+
private FileUtils() {
throw new UnsupportedOperationException("Construct FileUtils");
}
@@ -116,6 +119,16 @@ public class FileUtils {
taskInstanceId);
}
+ /**
+ * absolute path of kubernetes configuration file
+ *
+ * @param execPath
+ * @return
+ */
+ public static String getKubeConfigPath(String execPath) {
+ return String.format(FORMAT_S_S, execPath, KUBE_CONFIG_FILE);
+ }
+
/**
* absolute path of appInfo file
*
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 3971d1e290..bbd90eb477 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -88,7 +88,7 @@ public class MasterConfig implements Validator {
private double maxCpuLoadAvg = -1;
private double reservedMemory = 0.3;
private Duration failoverInterval = Duration.ofMinutes(10);
- private boolean killYarnJobWhenTaskFailover = true;
+ private boolean killApplicationWhenTaskFailover = true;
private ConnectStrategyProperties registryDisconnectStrategy = new
ConnectStrategyProperties();
private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
@@ -163,7 +163,7 @@ public class MasterConfig implements Validator {
log.info("Master config: maxCpuLoadAvg -> {} ", maxCpuLoadAvg);
log.info("Master config: reservedMemory -> {} ", reservedMemory);
log.info("Master config: failoverInterval -> {} ", failoverInterval);
- log.info("Master config: killYarnJobWhenTaskFailover -> {} ",
killYarnJobWhenTaskFailover);
+ log.info("Master config: killApplicationWhenTaskFailover -> {} ",
killApplicationWhenTaskFailover);
log.info("Master config: registryDisconnectStrategy -> {} ",
registryDisconnectStrategy);
log.info("Master config: masterAddress -> {} ", masterAddress);
log.info("Master config: masterRegistryPath -> {} ",
masterRegistryPath);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 757d479b9e..559163f363 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -25,8 +25,8 @@ import static
org.apache.dolphinscheduler.common.constants.Constants.PASSWORD;
import static
org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.constants.Constants.USER;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S;
import static
org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
import static
org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
import static
org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
@@ -72,6 +72,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceP
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.JdbcUrlParser;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.plugin.task.spark.SparkParameters;
import
org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -328,11 +329,8 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
dataQualityTaskExecutionContext = new
DataQualityTaskExecutionContext();
setDataQualityTaskRelation(dataQualityTaskExecutionContext,
taskInstance, tenant.getTenantCode());
}
- K8sTaskExecutionContext k8sTaskExecutionContext = null;
- if (TASK_TYPE_SET_K8S.contains(taskInstance.getTaskType())) {
- k8sTaskExecutionContext = new K8sTaskExecutionContext();
- setK8sTaskRelation(k8sTaskExecutionContext, taskInstance);
- }
+
+ K8sTaskExecutionContext k8sTaskExecutionContext =
setK8sTaskRelation(taskInstance);
Map<String, Property> businessParamsMap =
curingParamsService.preBuildBusinessParams(processInstance);
@@ -635,18 +633,39 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
}
/**
- * set k8s task relation
- * @param k8sTaskExecutionContext k8sTaskExecutionContext
+ * get k8s task execution context based on task type and deploy mode
+ *
* @param taskInstance taskInstance
*/
- private void setK8sTaskRelation(K8sTaskExecutionContext
k8sTaskExecutionContext, TaskInstance taskInstance) {
- K8sTaskParameters k8sTaskParameters =
- JSONUtils.parseObject(taskInstance.getTaskParams(),
K8sTaskParameters.class);
- Map<String, String> namespace =
JSONUtils.toMap(k8sTaskParameters.getNamespace());
- String clusterName = namespace.get(CLUSTER);
- String configYaml = processService.findConfigYamlByName(clusterName);
- if (configYaml != null) {
- k8sTaskExecutionContext.setConfigYaml(configYaml);
+ private K8sTaskExecutionContext setK8sTaskRelation(TaskInstance
taskInstance) {
+ K8sTaskExecutionContext k8sTaskExecutionContext = null;
+ String namespace = "";
+ switch (taskInstance.getTaskType()) {
+ case "K8S":
+ case "KUBEFLOW":
+ K8sTaskParameters k8sTaskParameters =
+ JSONUtils.parseObject(taskInstance.getTaskParams(),
K8sTaskParameters.class);
+ namespace = k8sTaskParameters.getNamespace();
+ break;
+ case "SPARK":
+ SparkParameters sparkParameters =
+ JSONUtils.parseObject(taskInstance.getTaskParams(),
SparkParameters.class);
+ if (StringUtils.isNotEmpty(sparkParameters.getNamespace())) {
+ namespace = sparkParameters.getNamespace();
+ }
+ break;
+ default:
+ break;
+ }
+
+ if (StringUtils.isNotEmpty(namespace)) {
+ String clusterName = JSONUtils.toMap(namespace).get(CLUSTER);
+ String configYaml =
processService.findConfigYamlByName(clusterName);
+ if (configYaml != null) {
+ k8sTaskExecutionContext =
+ new K8sTaskExecutionContext(configYaml,
JSONUtils.toMap(namespace).get(NAMESPACE_NAME));
+ }
}
+ return k8sTaskExecutionContext;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index 3979401425..441556cb79 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -227,7 +227,7 @@ public class MasterFailoverService {
/**
* failover task instance
* <p>
- * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
+ * 1. kill yarn/k8s job if run on worker and there are yarn/k8s jobs in
tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*
@@ -248,10 +248,10 @@ public class MasterFailoverService {
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create();
- if (masterConfig.isKillYarnJobWhenTaskFailover()) {
- // only kill yarn job if exists , the local thread has exited
- log.info("TaskInstance failover begin kill the task related
yarn job");
- ProcessUtils.killYarnJob(logClient, taskExecutionContext);
+ if (masterConfig.isKillApplicationWhenTaskFailover()) {
+ // only kill yarn/k8s job if exists , the local thread has
exited
+ log.info("TaskInstance failover begin kill the task related
yarn or k8s job");
+ ProcessUtils.killApplication(logClient, taskExecutionContext);
}
// kill worker task, When the master failover and worker failover
happened in the same time,
// the task may not be failover if we don't set
NEED_FAULT_TOLERANCE.
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index 928580fb82..a04ce18a93 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -154,7 +154,7 @@ public class WorkerFailoverService {
/**
* failover task instance
* <p>
- * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
+ * 1. kill yarn/k8s job if run on worker and there are yarn/k8s jobs in
tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*
@@ -175,10 +175,10 @@ public class WorkerFailoverService {
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create();
- if (masterConfig.isKillYarnJobWhenTaskFailover()) {
- // only kill yarn job if exists , the local thread has exited
- log.info("TaskInstance failover begin kill the task related
yarn job");
- ProcessUtils.killYarnJob(logClient, taskExecutionContext);
+ if (masterConfig.isKillApplicationWhenTaskFailover()) {
+ // only kill yarn/k8s job if exists , the local thread has
exited
+ log.info("TaskInstance failover begin kill the task related
yarn or k8s job");
+ ProcessUtils.killApplication(logClient, taskExecutionContext);
}
} else {
log.info("The failover taskInstance is a master task");
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml
b/dolphinscheduler-master/src/main/resources/application.yaml
index 7f9c948e77..e333b566b6 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -108,8 +108,8 @@ master:
reserved-memory: 0.3
# failover interval, the unit is minute
failover-interval: 10m
- # kill yarn jon when failover taskInstance, default true
- kill-yarn-job-when-task-failover: true
+ # kill yarn / k8s application when failover taskInstance, default true
+ kill-application-when-task-failover: true
registry-disconnect-strategy:
# The disconnect strategy: stop, waiting
strategy: waiting
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java
index 78f5cb44a6..f926d3ee31 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClient;
@@ -161,8 +162,8 @@ public class ProcessUtils {
* @param taskExecutionContext taskExecutionContext
* @return yarn application ids
*/
- public static @Nullable List<String> killYarnJob(@NonNull LogClient
logClient,
- @NonNull
TaskExecutionContext taskExecutionContext) {
+ public static @Nullable List<String> killApplication(@NonNull LogClient
logClient,
+ @NonNull
TaskExecutionContext taskExecutionContext) {
if (taskExecutionContext.getLogPath() == null) {
return Collections.emptyList();
}
@@ -172,6 +173,7 @@ public class ProcessUtils {
List<String> appIds = logClient.getAppIds(host.getIp(),
host.getPort(), taskExecutionContext.getLogPath(),
taskExecutionContext.getAppInfoPath());
if (CollectionUtils.isNotEmpty(appIds)) {
+
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds));
if
(StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
taskExecutionContext
.setExecutePath(FileUtils.getProcessExecDir(
@@ -183,9 +185,7 @@ public class ProcessUtils {
taskExecutionContext.getTaskInstanceId()));
}
FileUtils.createWorkDirIfAbsent(taskExecutionContext.getExecutePath());
-
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(appIds,
log,
- taskExecutionContext.getTenantCode(),
- taskExecutionContext.getExecutePath());
+
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(taskExecutionContext);
return appIds;
} else {
log.info("The current appId is empty, don't need to kill the
yarn job, taskInstanceId: {}",
diff --git
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 5a995a6dcc..afacc8f7d7 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -146,8 +146,8 @@ master:
reserved-memory: 0.03
# failover interval
failover-interval: 10m
- # kill yarn jon when failover taskInstance, default true
- kill-yarn-job-when-task-failover: true
+ # kill yarn/k8s application when failover taskInstance, default true
+ kill-application-when-task-failover: true
worker-group-refresh-interval: 10s
worker:
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index faceab2b3d..e2e0ce9338 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -23,6 +23,7 @@ import static
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.get
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import
org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
@@ -38,6 +39,7 @@ import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -229,8 +231,11 @@ public abstract class AbstractCommandExecutor {
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
+ TaskExecutionStatus kubernetesStatus =
+
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(),
taskRequest.getTaskAppId());
+
// if SHELL task exit
- if (status) {
+ if (status && kubernetesStatus.isSuccess()) {
// SHELL task state
result.setExitStatusCode(process.exitValue());
@@ -334,7 +339,11 @@ public abstract class AbstractCommandExecutor {
LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<>(1);
markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
-
+ String logs = appendPodLogIfNeeded();
+ if (StringUtils.isNotEmpty(logs)) {
+ logBuffer.add("Dump logs from driver pod:");
+ logBuffer.add(logs);
+ }
if (!logBuffer.isEmpty()) {
// log handle
logHandler.accept(logBuffer);
@@ -348,6 +357,13 @@ public abstract class AbstractCommandExecutor {
}
}
+ private String appendPodLogIfNeeded() {
+ if (Objects.isNull(taskRequest.getK8sTaskExecutionContext())) {
+ return "";
+ }
+ return
ProcessUtils.getPodLog(taskRequest.getK8sTaskExecutionContext(),
taskRequest.getTaskAppId());
+ }
+
/**
* get the standard output of the process
*
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
index 7e39f184fa..aa5bf62fe2 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
@@ -19,26 +19,34 @@ package org.apache.dolphinscheduler.plugin.task.api;
import java.io.Serializable;
+import lombok.Value;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
/**
* k8s Task ExecutionContext
*/
-
+@Value
public class K8sTaskExecutionContext implements Serializable {
private String configYaml;
- public String getConfigYaml() {
- return configYaml;
- }
+ private String namespace;
- public void setConfigYaml(String configYaml) {
+ @JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
+ public K8sTaskExecutionContext(
+ @JsonProperty("configYaml") String
configYaml,
+ @JsonProperty("namespace") String
namespace) {
this.configYaml = configYaml;
+ this.namespace = namespace;
}
@Override
public String toString() {
return "K8sTaskExecutionContext{"
- + "configYaml='" + configYaml + '\''
+ + "namespace=" + namespace
+ + ", configYaml='" + configYaml + '\''
+ '}';
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
index 18799227b1..1a30864a94 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
@@ -29,6 +29,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
+import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
@@ -115,6 +116,15 @@ public class ShellCommandExecutor extends
AbstractCommandExecutor {
if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
sb.append(taskRequest.getEnvironmentConfig()).append(System.lineSeparator());
}
+ if (Objects.nonNull(taskRequest.getK8sTaskExecutionContext())) {
+ String configYaml =
taskRequest.getK8sTaskExecutionContext().getConfigYaml();
+ Path kubeConfigPath =
Paths.get(org.apache.dolphinscheduler.common.utils.FileUtils
+ .getKubeConfigPath(taskRequest.getExecutePath()));
+ FileUtils.createFileWith755(kubeConfigPath);
+ Files.write(kubeConfigPath, configYaml.getBytes(),
StandardOpenOption.APPEND);
+ sb.append("export KUBECONFIG=" +
kubeConfigPath).append(System.lineSeparator());
+ logger.info("Create kubernetes configuration file: {}.",
kubeConfigPath);
+ }
}
sb.append(execCommand);
String commandContent = sb.toString();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index a0928d0eac..2187102a7d 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -430,7 +430,7 @@ public class TaskConstants {
public static final String TASK_TYPE_DATA_QUALITY = "DATA_QUALITY";
- public static final String TASK_TYPE_K8S = "K8S";
+ public static final String DEPLOY_MODE_KUBERNETES = "Kubernetes";
public static final Set<String> TASK_TYPE_SET_K8S = Sets.newHashSet("K8S",
"KUBEFLOW");
@@ -480,6 +480,11 @@ public class TaskConstants {
public static final String CLUSTER = "cluster";
public static final Pattern COMMAND_SPLIT_REGEX =
Pattern.compile("[^\\s\"'`]+|\"([^\"]+)\"|'([^']+)'|`([^`]+)`");
+ /**
+ * spark / flink on k8s label name
+ */
+ public static final String UNIQUE_LABEL_NAME = "dolphinscheduler-label";
+
/**
* conda config used by jupyter task plugin
*/
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManager.java
similarity index 59%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManager.java
index 7e39f184fa..0d7761ae6d 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManager.java
@@ -15,30 +15,25 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.api;
+package org.apache.dolphinscheduler.plugin.task.api.am;
-import java.io.Serializable;
+import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
-/**
- * k8s Task ExecutionContext
- */
-
-public class K8sTaskExecutionContext implements Serializable {
-
- private String configYaml;
+public interface ApplicationManager {
- public String getConfigYaml() {
- return configYaml;
- }
+ /**
+ * kill application by application manager context
+ *
+ * @param applicationManagerContext
+ * @return
+ */
+ boolean killApplication(ApplicationManagerContext
applicationManagerContext);
- public void setConfigYaml(String configYaml) {
- this.configYaml = configYaml;
- }
+ /**
+ * get resource manager type
+ *
+ * @return ResourceManagerType yarn / kubernetes
+ */
+ ResourceManagerType getResourceManagerType();
- @Override
- public String toString() {
- return "K8sTaskExecutionContext{"
- + "configYaml='" + configYaml + '\''
- + '}';
- }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManagerContext.java
similarity index 59%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManagerContext.java
index 7e39f184fa..b6fad893de 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManagerContext.java
@@ -15,30 +15,8 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.api;
+package org.apache.dolphinscheduler.plugin.task.api.am;
-import java.io.Serializable;
+public interface ApplicationManagerContext {
-/**
- * k8s Task ExecutionContext
- */
-
-public class K8sTaskExecutionContext implements Serializable {
-
- private String configYaml;
-
- public String getConfigYaml() {
- return configYaml;
- }
-
- public void setConfigYaml(String configYaml) {
- this.configYaml = configYaml;
- }
-
- @Override
- public String toString() {
- return "K8sTaskExecutionContext{"
- + "configYaml='" + configYaml + '\''
- + '}';
- }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
new file mode 100644
index 0000000000..551bd4465d
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.am;
+
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;
+
+import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.auto.service.AutoService;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
+
+@Slf4j
+@AutoService(ApplicationManager.class)
+public class KubernetesApplicationManager implements ApplicationManager {
+
+ private static final String PENDING = "Pending";
+ private static final String RUNNING = "Running";
+ private static final String FINISH = "Succeeded";
+ private static final String FAILED = "Failed";
+ private static final String UNKNOWN = "Unknown";
+
+ /**
+ * cache k8s client for same task
+ */
+ private final Map<String, KubernetesClient> cacheClientMap = new
ConcurrentHashMap<>();
+
+ @Override
+ public boolean killApplication(ApplicationManagerContext
applicationManagerContext) throws TaskException {
+ KubernetesApplicationManagerContext
kubernetesApplicationManagerContext =
+ (KubernetesApplicationManagerContext)
applicationManagerContext;
+
+ boolean isKill;
+ String labelValue =
kubernetesApplicationManagerContext.getLabelValue();
+ FilterWatchListDeletable<Pod, PodList> watchList =
getDriverPod(kubernetesApplicationManagerContext);
+ try {
+ if (getApplicationStatus(kubernetesApplicationManagerContext,
watchList).isFailure()) {
+ log.error("Driver pod is in FAILED or UNKNOWN status.");
+ isKill = false;
+ } else {
+ watchList.delete();
+ isKill = true;
+ }
+ } catch (Exception e) {
+ throw new TaskException("Failed to kill Kubernetes application
with label " + labelValue, e);
+ } finally {
+ // remove client cache after killing application
+ removeCache(labelValue);
+ }
+
+ return isKill;
+ }
+
+ @Override
+ public ResourceManagerType getResourceManagerType() {
+ return ResourceManagerType.KUBERNETES;
+ }
+
+ /**
+ * get driver pod
+ *
+ * @param kubernetesApplicationManagerContext
+ * @return
+ */
+ private FilterWatchListDeletable<Pod, PodList>
getDriverPod(KubernetesApplicationManagerContext
kubernetesApplicationManagerContext) {
+ KubernetesClient client =
getClient(kubernetesApplicationManagerContext);
+ String labelValue =
kubernetesApplicationManagerContext.getLabelValue();
+ FilterWatchListDeletable<Pod, PodList> watchList =
+ client.pods()
+
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
+ .withLabel(UNIQUE_LABEL_NAME, labelValue);
+ List<Pod> podList = watchList.list().getItems();
+ if (podList.size() != 1) {
+ log.warn("Expected driver pod 1, but get {}.", podList.size());
+ }
+ return watchList;
+ }
+
+ /**
+ * create client or get from cache map
+ *
+ * @param kubernetesApplicationManagerContext
+ * @return
+ */
+ private KubernetesClient getClient(KubernetesApplicationManagerContext
kubernetesApplicationManagerContext) {
+ K8sTaskExecutionContext k8sTaskExecutionContext =
+
kubernetesApplicationManagerContext.getK8sTaskExecutionContext();
+ return
cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(),
+ key -> new
DefaultKubernetesClient(Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml())));
+ }
+
+ public void removeCache(String cacheKey) {
+ try (KubernetesClient ignored = cacheClientMap.remove(cacheKey)) {
+ }
+ }
+
+ /**
+ * get application execution status
+ *
+ * @param kubernetesApplicationManagerContext
+ * @return TaskExecutionStatus SUCCESS / FAILURE
+ * @throws TaskException
+ */
+ public TaskExecutionStatus
getApplicationStatus(KubernetesApplicationManagerContext
kubernetesApplicationManagerContext) throws TaskException {
+ return getApplicationStatus(kubernetesApplicationManagerContext, null);
+ }
+
+ /**
+ * get application (driver pod) status
+ *
+ * @param kubernetesApplicationManagerContext
+ * @param watchList
+ * @return
+ * @throws TaskException
+ */
+ private TaskExecutionStatus
getApplicationStatus(KubernetesApplicationManagerContext
kubernetesApplicationManagerContext,
+
FilterWatchListDeletable<Pod, PodList> watchList) throws TaskException {
+ String phase;
+ try {
+ if (Objects.isNull(watchList)) {
+ watchList = getDriverPod(kubernetesApplicationManagerContext);
+ }
+ List<Pod> driverPod = watchList.list().getItems();
+ if (!driverPod.isEmpty()) {
+ // cluster mode
+ Pod driver = driverPod.get(0);
+ phase = driver.getStatus().getPhase();
+ } else {
+ // client mode
+ phase = FINISH;
+ }
+ } catch (Exception e) {
+ throw new TaskException("Failed to get Kubernetes application
status", e);
+ }
+
+ return phase.equals(FAILED) || phase.equals(UNKNOWN) ?
TaskExecutionStatus.FAILURE
+ : TaskExecutionStatus.SUCCESS;
+ }
+
+ /**
+ * collect pod's log
+ *
+ * @param kubernetesApplicationManagerContext
+ * @return
+ */
+ public String collectPodLog(KubernetesApplicationManagerContext
kubernetesApplicationManagerContext) {
+ try {
+ KubernetesClient client =
getClient(kubernetesApplicationManagerContext);
+ FilterWatchListDeletable<Pod, PodList> watchList =
getDriverPod(kubernetesApplicationManagerContext);
+ List<Pod> driverPod = watchList.list().getItems();
+ if (!driverPod.isEmpty()) {
+ Pod driver = driverPod.get(0);
+ String driverPodName = driver.getMetadata().getName();
+ String logs = client.pods()
+
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
+ .withName(driverPodName).getLog();
+
+ // delete driver pod only after successful execution
+ killApplication(kubernetesApplicationManagerContext);
+ return logs;
+ }
+ } catch (Exception e) {
+ log.error("Collect pod log failed:", e.getMessage());
+ }
+ return "";
+ }
+
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
similarity index 59%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
index 7e39f184fa..19dd223477 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
@@ -15,30 +15,25 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.api;
+package org.apache.dolphinscheduler.plugin.task.api.am;
-import java.io.Serializable;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
-/**
- * k8s Task ExecutionContext
- */
-
-public class K8sTaskExecutionContext implements Serializable {
+import lombok.RequiredArgsConstructor;
+import lombok.Value;
- private String configYaml;
+@Value
+@RequiredArgsConstructor
+public class KubernetesApplicationManagerContext implements
ApplicationManagerContext {
- public String getConfigYaml() {
- return configYaml;
- }
+ /**
+ * kubernetes execution context
+ */
+ private final K8sTaskExecutionContext k8sTaskExecutionContext;
- public void setConfigYaml(String configYaml) {
- this.configYaml = configYaml;
- }
+ /**
+ * driver pod label value
+ */
+ private final String labelValue;
- @Override
- public String toString() {
- return "K8sTaskExecutionContext{"
- + "configYaml='" + configYaml + '\''
- + '}';
- }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
similarity index 72%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
index abaa688c9a..d7f3ffcf10 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
@@ -15,51 +15,31 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.api.utils;
+package org.apache.dolphinscheduler.plugin.task.api.am;
import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.exception.BaseException;
+import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
import org.apache.dolphinscheduler.common.utils.HttpUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.KerberosHttpClient;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
-import org.slf4j.Logger;
-
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.service.AutoService;
@Slf4j
-public final class ProcessUtils {
-
- private ProcessUtils() {
- throw new IllegalStateException("Utility class");
- }
-
- /**
- * Initialization regularization, solve the problem of pre-compilation
performance,
- * avoid the thread safety problem of multi-thread operation
- */
- private static final Pattern MACPATTERN =
Pattern.compile("-[+|-]-\\s(\\d+)");
-
- /**
- * Expression of PID recognition in Windows scene
- */
- private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)");
+@AutoService(ApplicationManager.class)
+public class YarnApplicationManager implements ApplicationManager {
private static final String RM_HA_IDS =
PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
private static final String APP_ADDRESS =
PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
@@ -68,74 +48,13 @@ public final class ProcessUtils {
private static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE =
PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
- /**
- * kill tasks according to different task types.
- */
- public static boolean kill(@NonNull TaskExecutionContext request) {
- try {
- log.info("Begin kill task instance, processId: {}",
request.getProcessId());
- int processId = request.getProcessId();
- if (processId == 0) {
- log.error("Task instance kill failed, processId is not exist");
- return false;
- }
-
- String cmd = String.format("kill -9 %s", getPidsStr(processId));
- cmd = OSUtils.getSudoCmd(request.getTenantCode(), cmd);
- log.info("process id:{}, cmd:{}", processId, cmd);
-
- OSUtils.exeCmd(cmd);
- log.info("Success kill task instance, processId: {}",
request.getProcessId());
- return true;
- } catch (Exception e) {
- log.error("Kill task instance error, processId: {}",
request.getProcessId(), e);
- return false;
- }
- }
-
- /**
- * get pids str.
- *
- * @param processId process id
- * @return pids pid String
- * @throws Exception exception
- */
- public static String getPidsStr(int processId) throws Exception {
- StringBuilder sb = new StringBuilder();
- Matcher mat = null;
- // pstree pid get sub pids
- if (SystemUtils.IS_OS_MAC) {
- String pids = OSUtils.exeCmd(String.format("%s -sp %d",
TaskConstants.PSTREE, processId));
- if (null != pids) {
- mat = MACPATTERN.matcher(pids);
- }
- } else {
- String pids = OSUtils.exeCmd(String.format("%s -p %d",
TaskConstants.PSTREE, processId));
- mat = WINDOWSATTERN.matcher(pids);
- }
-
- if (null != mat) {
- while (mat.find()) {
- sb.append(mat.group(1)).append(" ");
- }
- }
-
- return sb.toString().trim();
- }
-
- /**
- * kill yarn application.
- *
- * @param appIds app id list
- * @param logger logger
- * @param tenantCode tenant code
- * @param executePath execute path
- */
- public static void cancelApplication(List<String> appIds, Logger logger,
String tenantCode, String executePath) {
- if (appIds == null || appIds.isEmpty()) {
- return;
- }
-
+ @Override
+ public boolean killApplication(ApplicationManagerContext
applicationManagerContext) throws TaskException {
+ YarnApplicationManagerContext yarnApplicationManagerContext =
+ (YarnApplicationManagerContext) applicationManagerContext;
+ String executePath = yarnApplicationManagerContext.getExecutePath();
+ String tenantCode = yarnApplicationManagerContext.getTenantCode();
+ List<String> appIds = yarnApplicationManagerContext.getAppIds();
for (String appId : appIds) {
try {
TaskExecutionStatus applicationStatus =
getApplicationStatus(appId);
@@ -143,12 +62,19 @@ public final class ProcessUtils {
if (!applicationStatus.isFinished()) {
String commandFile = String.format("%s/%s.kill",
executePath, appId);
String cmd = getKerberosInitCommand() + "yarn application
-kill " + appId;
- execYarnKillCommand(logger, tenantCode, appId,
commandFile, cmd);
+ execYarnKillCommand(tenantCode, appId, commandFile, cmd);
}
} catch (Exception e) {
log.error("Get yarn application app id [{}}] status failed",
appId, e);
+ throw new TaskException(e.getMessage());
}
}
+ return true;
+ }
+
+ @Override
+ public ResourceManagerType getResourceManagerType() {
+ return ResourceManagerType.YARN;
}
/**
@@ -157,7 +83,7 @@ public final class ProcessUtils {
* @param applicationId application id
* @return the return may be null or there may be other parse exceptions
*/
- public static TaskExecutionStatus getApplicationStatus(String
applicationId) throws BaseException {
+ public TaskExecutionStatus getApplicationStatus(String applicationId)
throws TaskException {
if (StringUtils.isEmpty(applicationId)) {
return null;
}
@@ -210,17 +136,17 @@ public final class ProcessUtils {
* @param applicationId application id
* @return url of application
*/
- private static String getApplicationUrl(String applicationId) throws
BaseException {
+ private String getApplicationUrl(String applicationId) throws
TaskException {
String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS :
getAppAddress(APP_ADDRESS, RM_HA_IDS);
if (StringUtils.isBlank(appUrl)) {
- throw new BaseException("yarn application url generation failed");
+ throw new TaskException("yarn application url generation failed");
}
log.debug("yarn application url:{}, applicationId:{}", appUrl,
applicationId);
return String.format(appUrl,
HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId);
}
- private static String getJobHistoryUrl(String applicationId) {
+ private String getJobHistoryUrl(String applicationId) {
// eg:application_1587475402360_712719 -> job_1587475402360_712719
String jobId = applicationId.replace("application", "job");
return String.format(JOB_HISTORY_ADDRESS, jobId);
@@ -229,14 +155,13 @@ public final class ProcessUtils {
/**
* build kill command for yarn application
*
- * @param logger logger
* @param tenantCode tenant code
* @param appId app id
* @param commandFile command file
* @param cmd cmd
*/
- private static void execYarnKillCommand(Logger logger, String tenantCode,
String appId, String commandFile,
- String cmd) {
+ private void execYarnKillCommand(String tenantCode, String appId, String
commandFile,
+ String cmd) {
try {
StringBuilder sb = new StringBuilder();
sb.append("#!/bin/sh\n");
@@ -262,7 +187,7 @@ public final class ProcessUtils {
}
}
- private static TaskExecutionStatus getExecutionStatus(String result) {
+ private TaskExecutionStatus getExecutionStatus(String result) {
switch (result) {
case Constants.ACCEPTED:
return TaskExecutionStatus.SUBMITTED_SUCCESS;
@@ -289,7 +214,7 @@ public final class ProcessUtils {
* @param rmHa resource manager ha
* @return app address
*/
- private static String getAppAddress(String appAddress, String rmHa) {
+ private String getAppAddress(String appAddress, String rmHa) {
String[] split1 = appAddress.split(Constants.DOUBLE_SLASH);
@@ -319,7 +244,7 @@ public final class ProcessUtils {
/**
* get kerberos init command
*/
- private static String getKerberosInitCommand() {
+ private String getKerberosInitCommand() {
log.info("get kerberos init command");
StringBuilder kerberosCommandBuilder = new StringBuilder();
boolean hadoopKerberosState =
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManagerContext.java
similarity index 59%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManagerContext.java
index 7e39f184fa..44dc047f13 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManagerContext.java
@@ -15,30 +15,30 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.api;
+package org.apache.dolphinscheduler.plugin.task.api.am;
-import java.io.Serializable;
+import java.util.List;
-/**
- * k8s Task ExecutionContext
- */
+import lombok.RequiredArgsConstructor;
+import lombok.Value;
-public class K8sTaskExecutionContext implements Serializable {
+@Value
+@RequiredArgsConstructor
+public class YarnApplicationManagerContext implements
ApplicationManagerContext {
- private String configYaml;
+ /**
+ * execute path
+ */
+ private final String executePath;
- public String getConfigYaml() {
- return configYaml;
- }
+ /**
+ * tenant code
+ */
+ private final String tenantCode;
- public void setConfigYaml(String configYaml) {
- this.configYaml = configYaml;
- }
+ /**
+ * appId list
+ */
+ private final List<String> appIds;
- @Override
- public String toString() {
- return "K8sTaskExecutionContext{"
- + "configYaml='" + configYaml + '\''
- + '}';
- }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
index abaa688c9a..9ec8480589 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
@@ -17,32 +17,39 @@
package org.apache.dolphinscheduler.plugin.task.api.utils;
-import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.exception.BaseException;
-import org.apache.dolphinscheduler.common.utils.HttpUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.KerberosHttpClient;
+import static
org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
+import static
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMMA;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S;
+
+import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.am.ApplicationManager;
+import
org.apache.dolphinscheduler.plugin.task.api.am.KubernetesApplicationManager;
+import
org.apache.dolphinscheduler.plugin.task.api.am.KubernetesApplicationManagerContext;
+import
org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManagerContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
-import java.io.File;
-import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.ServiceLoader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
-import org.slf4j.Logger;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
@Slf4j
public final class ProcessUtils {
@@ -50,6 +57,14 @@ public final class ProcessUtils {
throw new IllegalStateException("Utility class");
}
+ private static final Map<ResourceManagerType, ApplicationManager>
applicationManagerMap = new HashMap<>();
+
+ static {
+ ServiceLoader.load(ApplicationManager.class)
+ .forEach(applicationManager ->
applicationManagerMap.put(applicationManager.getResourceManagerType(),
+ applicationManager));
+ }
+
/**
* Initialization regularization, solve the problem of pre-compilation
performance,
* avoid the thread safety problem of multi-thread operation
@@ -61,13 +76,6 @@ public final class ProcessUtils {
*/
private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)");
- private static final String RM_HA_IDS =
PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
- private static final String APP_ADDRESS =
PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
- private static final String JOB_HISTORY_ADDRESS =
- PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
- private static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE =
-
PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
-
/**
* kill tasks according to different task types.
*/
@@ -124,278 +132,83 @@ public final class ProcessUtils {
}
/**
- * kill yarn application.
+ * cacel k8s / yarn application
*
- * @param appIds app id list
- * @param logger logger
- * @param tenantCode tenant code
- * @param executePath execute path
+ * @param taskExecutionContext
+ * @return
*/
- public static void cancelApplication(List<String> appIds, Logger logger,
String tenantCode, String executePath) {
- if (appIds == null || appIds.isEmpty()) {
- return;
- }
-
- for (String appId : appIds) {
- try {
- TaskExecutionStatus applicationStatus =
getApplicationStatus(appId);
-
- if (!applicationStatus.isFinished()) {
- String commandFile = String.format("%s/%s.kill",
executePath, appId);
- String cmd = getKerberosInitCommand() + "yarn application
-kill " + appId;
- execYarnKillCommand(logger, tenantCode, appId,
commandFile, cmd);
+ public static void cancelApplication(TaskExecutionContext
taskExecutionContext) {
+ try {
+ if
(Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext()) &&
+
!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) {
+ applicationManagerMap.get(ResourceManagerType.KUBERNETES)
+ .killApplication(new
KubernetesApplicationManagerContext(
+
taskExecutionContext.getK8sTaskExecutionContext(),
+ taskExecutionContext.getTaskAppId()));
+ } else {
+ String host = taskExecutionContext.getHost();
+ String executePath = taskExecutionContext.getExecutePath();
+ String tenantCode = taskExecutionContext.getTenantCode();
+ List<String> appIds;
+ if (StringUtils.isNotEmpty(taskExecutionContext.getAppIds())) {
+ // is failover
+ appIds =
Arrays.asList(taskExecutionContext.getAppIds().split(COMMA));
+ } else {
+ String logPath = taskExecutionContext.getLogPath();
+ String appInfoPath = taskExecutionContext.getAppInfoPath();
+ if (logPath == null || appInfoPath == null || executePath
== null || tenantCode == null) {
+ log.error(
+ "Kill yarn job error, the input params is
illegal, host: {}, logPath: {}, appInfoPath: {}, executePath: {}, tenantCode:
{}",
+ host, logPath, appInfoPath, executePath,
tenantCode);
+ throw new TaskException("Cancel application failed!");
+ }
+ log.info("Get appIds from worker {}, taskLogPath: {}",
host, logPath);
+ appIds = LogUtils.getAppIds(logPath, appInfoPath,
+ PropertyUtils.getString(APPID_COLLECT,
DEFAULT_COLLECT_WAY));
+
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds));
}
- } catch (Exception e) {
- log.error("Get yarn application app id [{}}] status failed",
appId, e);
- }
- }
- }
-
- /**
- * get the state of an application
- *
- * @param applicationId application id
- * @return the return may be null or there may be other parse exceptions
- */
- public static TaskExecutionStatus getApplicationStatus(String
applicationId) throws BaseException {
- if (StringUtils.isEmpty(applicationId)) {
- return null;
- }
-
- String result;
- String applicationUrl = getApplicationUrl(applicationId);
- log.debug("generate yarn application url, applicationUrl={}",
applicationUrl);
-
- String responseContent = Boolean.TRUE
-
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
false))
- ? KerberosHttpClient.get(applicationUrl)
- : HttpUtils.get(applicationUrl);
- if (responseContent != null) {
- ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
- if (!jsonObject.has("app")) {
- return TaskExecutionStatus.FAILURE;
- }
- result = jsonObject.path("app").path("finalStatus").asText();
-
- } else {
- // may be in job history
- String jobHistoryUrl = getJobHistoryUrl(applicationId);
- log.debug("generate yarn job history application url,
jobHistoryUrl={}", jobHistoryUrl);
- responseContent = Boolean.TRUE
-
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
false))
- ? KerberosHttpClient.get(jobHistoryUrl)
- : HttpUtils.get(jobHistoryUrl);
-
- if (null != responseContent) {
- ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
- if (!jsonObject.has("job")) {
- return TaskExecutionStatus.FAILURE;
+ if (CollectionUtils.isEmpty(appIds)) {
+ log.info("The appId is empty");
}
- result = jsonObject.path("job").path("state").asText();
- } else {
- return TaskExecutionStatus.FAILURE;
+ ApplicationManager applicationManager =
applicationManagerMap.get(ResourceManagerType.YARN);
+ applicationManager.killApplication(new
YarnApplicationManagerContext(executePath, tenantCode, appIds));
}
- }
-
- return getExecutionStatus(result);
- }
-
- /**
- * get application url
- * if rmHaIds contains xx, it signs not use resourcemanager
- * otherwise:
- * if rmHaIds is empty, single resourcemanager enabled
- * if rmHaIds not empty: resourcemanager HA enabled
- *
- * @param applicationId application id
- * @return url of application
- */
- private static String getApplicationUrl(String applicationId) throws
BaseException {
-
- String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS :
getAppAddress(APP_ADDRESS, RM_HA_IDS);
- if (StringUtils.isBlank(appUrl)) {
- throw new BaseException("yarn application url generation failed");
- }
- log.debug("yarn application url:{}, applicationId:{}", appUrl,
applicationId);
- return String.format(appUrl,
HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId);
- }
-
- private static String getJobHistoryUrl(String applicationId) {
- // eg:application_1587475402360_712719 -> job_1587475402360_712719
- String jobId = applicationId.replace("application", "job");
- return String.format(JOB_HISTORY_ADDRESS, jobId);
- }
-
- /**
- * build kill command for yarn application
- *
- * @param logger logger
- * @param tenantCode tenant code
- * @param appId app id
- * @param commandFile command file
- * @param cmd cmd
- */
- private static void execYarnKillCommand(Logger logger, String tenantCode,
String appId, String commandFile,
- String cmd) {
- try {
- StringBuilder sb = new StringBuilder();
- sb.append("#!/bin/sh\n");
- sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
- sb.append("cd $BASEDIR\n");
-
- sb.append("\n\n");
- sb.append(cmd);
-
- File f = new File(commandFile);
-
- if (!f.exists()) {
- org.apache.commons.io.FileUtils.writeStringToFile(new
File(commandFile), sb.toString(),
- StandardCharsets.UTF_8);
- }
-
- String runCmd = String.format("%s %s", Constants.SH, commandFile);
- runCmd =
org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
- log.info("kill cmd:{}", runCmd);
- org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
} catch (Exception e) {
- log.error(String.format("Kill yarn application app id [%s] failed:
[%s]", appId, e.getMessage()));
- }
- }
-
- private static TaskExecutionStatus getExecutionStatus(String result) {
- switch (result) {
- case Constants.ACCEPTED:
- return TaskExecutionStatus.SUBMITTED_SUCCESS;
- case Constants.SUCCEEDED:
- case Constants.ENDED:
- return TaskExecutionStatus.SUCCESS;
- case Constants.NEW:
- case Constants.NEW_SAVING:
- case Constants.SUBMITTED:
- case Constants.FAILED:
- return TaskExecutionStatus.FAILURE;
- case Constants.KILLED:
- return TaskExecutionStatus.KILL;
- case Constants.RUNNING:
- default:
- return TaskExecutionStatus.RUNNING_EXECUTION;
+ log.error("Cancel application failed: {}", e.getMessage());
}
}
/**
- * getAppAddress
+ * get k8s application status
*
- * @param appAddress app address
- * @param rmHa resource manager ha
- * @return app address
+ * @param k8sTaskExecutionContext
+ * @param taskAppId
+ * @return
*/
- private static String getAppAddress(String appAddress, String rmHa) {
-
- String[] split1 = appAddress.split(Constants.DOUBLE_SLASH);
-
- if (split1.length != 2) {
- return null;
- }
-
- String start = split1[0] + Constants.DOUBLE_SLASH;
- String[] split2 = split1[1].split(Constants.COLON);
-
- if (split2.length != 2) {
- return null;
- }
-
- String end = Constants.COLON + split2[1];
-
- // get active ResourceManager
- String activeRM = YarnHAAdminUtils.getActiveRMName(start, rmHa);
-
- if (StringUtils.isEmpty(activeRM)) {
- return null;
- }
-
- return start + activeRM + end;
+ public static TaskExecutionStatus
getApplicationStatus(K8sTaskExecutionContext k8sTaskExecutionContext,
+ String taskAppId) {
+ if (Objects.isNull(k8sTaskExecutionContext)) {
+ return TaskExecutionStatus.SUCCESS;
+ }
+ KubernetesApplicationManager applicationManager =
+ (KubernetesApplicationManager)
applicationManagerMap.get(ResourceManagerType.KUBERNETES);
+ return applicationManager
+ .getApplicationStatus(new
KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId));
}
/**
- * get kerberos init command
- */
- private static String getKerberosInitCommand() {
- log.info("get kerberos init command");
- StringBuilder kerberosCommandBuilder = new StringBuilder();
- boolean hadoopKerberosState =
-
PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
false);
- if (hadoopKerberosState) {
- kerberosCommandBuilder.append("export KRB5_CONFIG=")
-
.append(PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH))
- .append("\n\n")
- .append(String.format("kinit -k -t %s %s || true",
-
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH),
-
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME)))
- .append("\n\n");
- log.info("kerberos init command: {}", kerberosCommandBuilder);
- }
- return kerberosCommandBuilder.toString();
- }
-
- /**
- * yarn ha admin utils
+ * get driver pod logs
+ *
+ * @param k8sTaskExecutionContext
+ * @param taskAppId
+ * @return
*/
- private static final class YarnHAAdminUtils {
-
- /**
- * get active resourcemanager node
- *
- * @param protocol http protocol
- * @param rmIds yarn ha ids
- * @return yarn active node
- */
- public static String getActiveRMName(String protocol, String rmIds) {
-
- String[] rmIdArr = rmIds.split(Constants.COMMA);
+ public static String getPodLog(K8sTaskExecutionContext
k8sTaskExecutionContext, String taskAppId) {
+ KubernetesApplicationManager applicationManager =
+ (KubernetesApplicationManager)
applicationManagerMap.get(ResourceManagerType.KUBERNETES);
- String yarnUrl = protocol + "%s:" +
HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info";
-
- try {
-
- /**
- * send http get request to rm
- */
-
- for (String rmId : rmIdArr) {
- String state = getRMState(String.format(yarnUrl, rmId));
- if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
- return rmId;
- }
- }
-
- } catch (Exception e) {
- log.error("yarn ha application url generation failed,
message:{}", e.getMessage());
- }
- return null;
- }
-
- /**
- * get ResourceManager state
- */
- public static String getRMState(String url) {
-
- String retStr = Boolean.TRUE
-
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
false))
- ? KerberosHttpClient.get(url)
- : HttpUtils.get(url);
-
- if (StringUtils.isEmpty(retStr)) {
- return null;
- }
- // to json
- ObjectNode jsonObject = JSONUtils.parseObject(retStr);
-
- // get ResourceManager state
- if (!jsonObject.has("clusterInfo")) {
- return null;
- }
- return jsonObject.get("clusterInfo").path("haState").asText();
- }
+ return applicationManager
+ .collectPodLog(new
KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId));
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/pom.xml
index 6d0d6d79b1..046a89d082 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/pom.xml
@@ -37,5 +37,9 @@
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-client</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
index 0ca069ca34..7ad2e27b07 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
@@ -43,14 +43,14 @@ public class SparkConstants {
public static final String DEPLOY_MODE_LOCAL = "local";
/**
- * --driver-cores NUM
+ * --conf spark.driver.cores NUM
*/
- public static final String DRIVER_CORES = "--driver-cores";
+ public static final String DRIVER_CORES = "--conf spark.driver.cores=%d";
/**
- * --driver-memory MEM
+ * --conf spark.driver.memory MEM
*/
- public static final String DRIVER_MEMORY = "--driver-memory";
+ public static final String DRIVER_MEMORY = "--conf spark.driver.memory=%s";
/**
* master
@@ -59,20 +59,32 @@ public class SparkConstants {
public static final String SPARK_ON_YARN = "yarn";
+ public static final String SPARK_ON_K8S_MASTER_PREFIX = "k8s://";
+
+ /**
+ * add label for driver pod
+ */
+ public static final String DRIVER_LABEL_CONF = "--conf
spark.kubernetes.driver.label.%s=%s";
+
+ /**
+ * spark kubernetes namespace
+ */
+ public static final String SPARK_KUBERNETES_NAMESPACE = "--conf
spark.kubernetes.namespace=%s";
+
/**
- * --num-executors NUM
+ * --conf spark.executor.instances NUM
*/
- public static final String NUM_EXECUTORS = "--num-executors";
+ public static final String NUM_EXECUTORS = "--conf
spark.executor.instances=%d";
/**
- * --executor-cores NUM
+ * --conf spark.executor.cores NUM
*/
- public static final String EXECUTOR_CORES = "--executor-cores";
+ public static final String EXECUTOR_CORES = "--conf
spark.executor.cores=%d";
/**
- * --executor-memory MEM
+ * --conf spark.executor.memory MEM
*/
- public static final String EXECUTOR_MEMORY = "--executor-memory";
+ public static final String EXECUTOR_MEMORY = "--conf
spark.executor.memory=%s";
/**
* -f <filename> SQL from files
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
index 2532c978b9..aa7cd27c85 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
@@ -39,7 +39,7 @@ public class SparkParameters extends AbstractParameters {
private String mainClass;
/**
- * deploy mode
+ * deploy mode local / cluster / client
*/
private String deployMode;
@@ -100,6 +100,19 @@ public class SparkParameters extends AbstractParameters {
*/
private String rawScript;
+ /**
+ * kubernetes cluster namespace
+ */
+ private String namespace;
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
/**
* resource list
*/
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index 80d5be232f..16533397da 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -17,7 +17,12 @@
package org.apache.dolphinscheduler.plugin.task.spark;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;
+import static
org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.DRIVER_LABEL_CONF;
+import static
org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.SPARK_KUBERNETES_NAMESPACE;
+import static
org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.SPARK_ON_K8S_MASTER_PREFIX;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
@@ -45,6 +50,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import io.fabric8.kubernetes.client.Config;
+
public class SparkTask extends AbstractYarnTask {
/**
@@ -132,8 +139,15 @@ public class SparkTask extends AbstractYarnTask {
String deployMode =
StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ?
sparkParameters.getDeployMode()
: SparkConstants.DEPLOY_MODE_LOCAL;
+
+ boolean onNativeKubernetes =
StringUtils.isNotEmpty(sparkParameters.getNamespace());
+
+ String masterUrl = onNativeKubernetes ? SPARK_ON_K8S_MASTER_PREFIX +
+
Config.fromKubeconfig(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml()).getMasterUrl()
+ : SparkConstants.SPARK_ON_YARN;
+
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
- args.add(SparkConstants.SPARK_ON_YARN);
+ args.add(masterUrl);
args.add(SparkConstants.DEPLOY_MODE);
}
args.add(deployMode);
@@ -168,6 +182,13 @@ public class SparkTask extends AbstractYarnTask {
args.add(others);
}
+ // add driver label for spark on native kubernetes
+ if (onNativeKubernetes) {
+ args.add(String.format(DRIVER_LABEL_CONF, UNIQUE_LABEL_NAME,
taskExecutionContext.getTaskAppId()));
+ args.add(String.format(SPARK_KUBERNETES_NAMESPACE,
+
JSONUtils.toMap(sparkParameters.getNamespace()).get(NAMESPACE_NAME)));
+ }
+
ResourceInfo mainJar = sparkParameters.getMainJar();
if (programType != ProgramType.SQL) {
args.add(mainJar.getRes());
@@ -189,32 +210,27 @@ public class SparkTask extends AbstractYarnTask {
private void populateSparkResourceDefinitions(List<String> args) {
int driverCores = sparkParameters.getDriverCores();
if (driverCores > 0) {
- args.add(SparkConstants.DRIVER_CORES);
- args.add(String.format("%d", driverCores));
+ args.add(String.format(SparkConstants.DRIVER_CORES, driverCores));
}
String driverMemory = sparkParameters.getDriverMemory();
if (StringUtils.isNotEmpty(driverMemory)) {
- args.add(SparkConstants.DRIVER_MEMORY);
- args.add(driverMemory);
+ args.add(String.format(SparkConstants.DRIVER_MEMORY,
driverMemory));
}
int numExecutors = sparkParameters.getNumExecutors();
if (numExecutors > 0) {
- args.add(SparkConstants.NUM_EXECUTORS);
- args.add(String.format("%d", numExecutors));
+ args.add(String.format(SparkConstants.NUM_EXECUTORS,
numExecutors));
}
int executorCores = sparkParameters.getExecutorCores();
if (executorCores > 0) {
- args.add(SparkConstants.EXECUTOR_CORES);
- args.add(String.format("%d", executorCores));
+ args.add(String.format(SparkConstants.EXECUTOR_CORES,
executorCores));
}
String executorMemory = sparkParameters.getExecutorMemory();
if (StringUtils.isNotEmpty(executorMemory)) {
- args.add(SparkConstants.EXECUTOR_MEMORY);
- args.add(executorMemory);
+ args.add(String.format(SparkConstants.EXECUTOR_MEMORY,
executorMemory));
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
index 53a05b9eb7..3c2c41e787 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
@@ -46,11 +46,11 @@ public class SparkTaskTest {
"${SPARK_HOME}/bin/spark-sql " +
"--master yarn " +
"--deploy-mode client " +
- "--driver-cores 1 " +
- "--driver-memory 512M " +
- "--num-executors 2 " +
- "--executor-cores 2 " +
- "--executor-memory 1G " +
+ "--conf spark.driver.cores=1 " +
+ "--conf spark.driver.memory=512M " +
+ "--conf spark.executor.instances=2 " +
+ "--conf spark.executor.cores=2 " +
+ "--conf spark.executor.memory=1G " +
"--name sparksql " +
"-f /tmp/5536_node.sql");
}
@@ -67,11 +67,11 @@ public class SparkTaskTest {
"--master yarn " +
"--deploy-mode client " +
"--class
org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
- "--driver-cores 1 " +
- "--driver-memory 512M " +
- "--num-executors 2 " +
- "--executor-cores 2 " +
- "--executor-memory 1G " +
+ "--conf spark.driver.cores=1 " +
+ "--conf spark.driver.memory=512M " +
+ "--conf spark.executor.instances=2 " +
+ "--conf spark.executor.cores=2 " +
+ "--conf spark.executor.memory=1G " +
"--name spark " +
"lib/dolphinscheduler-task-spark.jar");
}
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts
index fc4a8925f1..2c0e52aaad 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts
@@ -57,7 +57,8 @@ export function useNamespace(): IJsonItem {
name: t('project.node.namespace_cluster'),
props: {
loading,
- 'render-label': renderLabel
+ 'render-label': renderLabel,
+ 'clearable': true
},
options: [
{
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
index de100d770e..7c68a25920 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
@@ -25,6 +25,7 @@ import {
useExecutorMemory,
useExecutorCores,
useMainJar,
+ useNamespace,
useResources
} from '.'
import type { IJsonItem } from '../types'
@@ -90,6 +91,7 @@ export function useSpark(model: { [field: string]: any }):
IJsonItem[] {
}
},
useDeployMode(24, ref(true), showCluster),
+ useNamespace(),
{
type: 'input',
field: 'appName',
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 7d227db0e7..75e69079df 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -60,6 +60,9 @@ export function formatParams(data: INodeData): {
taskParams.appName = data.appName
taskParams.mainArgs = data.mainArgs
taskParams.others = data.others
+ if (data.namespace) {
+ taskParams.namespace = data.namespace
+ }
}
if (data.taskType === 'SPARK') {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 3c99ea492e..9afc4708c7 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,14 +17,11 @@
package org.apache.dolphinscheduler.server.worker.processor;
-import static
org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
-import static
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
-
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -35,17 +32,11 @@ import
org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
-import org.apache.commons.collections4.CollectionUtils;
-
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
@@ -55,7 +46,6 @@ import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import io.micrometer.core.lang.NonNull;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@@ -113,11 +103,10 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
// if processId > 0, it should call cancelApplication to cancel
remote application too.
this.cancelApplication(taskInstanceId);
- Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
+ boolean result = doKill(taskExecutionContext);
taskExecutionContext.setCurrentExecutionStatus(
- result.getLeft() ? TaskExecutionStatus.SUCCESS :
TaskExecutionStatus.FAILURE);
- taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA,
result.getRight()));
+ result ? TaskExecutionStatus.SUCCESS :
TaskExecutionStatus.FAILURE);
sendTaskKillResponseCommand(channel, taskExecutionContext);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
@@ -155,17 +144,17 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
*
* @return kill result
*/
- private Pair<Boolean, List<String>> doKill(TaskExecutionContext
taskExecutionContext) {
+ private boolean doKill(TaskExecutionContext taskExecutionContext) {
// kill system process
boolean processFlag =
killProcess(taskExecutionContext.getTenantCode(),
taskExecutionContext.getProcessId());
- // find log and kill yarn job
- Pair<Boolean, List<String>> yarnResult =
killYarnJob(Host.of(taskExecutionContext.getHost()),
- taskExecutionContext.getLogPath(),
- taskExecutionContext.getAppInfoPath(),
- taskExecutionContext.getExecutePath(),
- taskExecutionContext.getTenantCode());
- return Pair.of(processFlag && yarnResult.getLeft(),
yarnResult.getRight());
+ // kill yarn or k8s application
+ try {
+ ProcessUtils.cancelApplication(taskExecutionContext);
+ } catch (TaskException e) {
+ return false;
+ }
+ return processFlag;
}
/**
@@ -216,42 +205,4 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
return processFlag;
}
- /**
- * kill yarn job
- *
- * @param host host
- * @param logPath logPath
- * @param executePath executePath
- * @param tenantCode tenantCode
- * @return Pair<Boolean, List < String>> yarn kill result
- */
- private Pair<Boolean, List<String>> killYarnJob(@NonNull Host host,
- String logPath,
- String appInfoPath,
- String executePath,
- String tenantCode) {
- if (logPath == null || appInfoPath == null || executePath == null ||
tenantCode == null) {
- log.error(
- "Kill yarn job error, the input params is illegal, host:
{}, logPath: {}, appInfoPath: {}, executePath: {}, tenantCode: {}",
- host, logPath, appInfoPath, executePath, tenantCode);
- return Pair.of(false, Collections.emptyList());
- }
- try {
- log.info("Get appIds from worker {}:{} taskLogPath: {}",
host.getIp(), host.getPort(), logPath);
- List<String> appIds = LogUtils.getAppIds(logPath, appInfoPath,
- PropertyUtils.getString(APPID_COLLECT,
DEFAULT_COLLECT_WAY));
- if (CollectionUtils.isEmpty(appIds)) {
- log.info("The appId is empty");
- return Pair.of(true, Collections.emptyList());
- }
-
- ProcessUtils.cancelApplication(appIds, log, tenantCode,
executePath);
- return Pair.of(true, appIds);
- } catch (Exception e) {
- log.error("Kill yarn job error, host: {}, logPath: {},
executePath: {}, tenantCode: {}", host, logPath,
- executePath, tenantCode, e);
- }
- return Pair.of(false, Collections.emptyList());
- }
-
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
index 103e253800..2bc30ba346 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.worker.runner;
-import static
org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
-import static
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
import static
org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES;
import static
org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLASH;
@@ -26,7 +24,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@@ -52,12 +49,9 @@ import
org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import
org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils;
-import org.apache.commons.collections4.CollectionUtils;
-
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
-import java.util.List;
import javax.annotation.Nullable;
@@ -143,13 +137,7 @@ public abstract class WorkerTaskExecuteRunnable implements
Runnable {
if (task != null) {
try {
task.cancel();
- List<String> appIds =
- LogUtils.getAppIds(taskExecutionContext.getLogPath(),
taskExecutionContext.getExecutePath(),
- PropertyUtils.getString(APPID_COLLECT,
DEFAULT_COLLECT_WAY));
- if (CollectionUtils.isNotEmpty(appIds)) {
- ProcessUtils.cancelApplication(appIds, log,
taskExecutionContext.getTenantCode(),
- taskExecutionContext.getExecutePath());
- }
+ ProcessUtils.cancelApplication(taskExecutionContext);
} catch (Exception e) {
log.error(
"Task execute failed and cancel the application
failed, this will not affect the taskInstance status, but you need to check
manual",