This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3eb4c0ed1d [Improvement-14054][Worker] Kill multiple yarn apps at the
same time (#14055)
3eb4c0ed1d is described below
commit 3eb4c0ed1d0ab15280744bbd893e52ce05140052
Author: Rick Cheng <[email protected]>
AuthorDate: Sat May 6 19:13:12 2023 +0800
[Improvement-14054][Worker] Kill multiple yarn apps at the same time
(#14055)
* [Improvement-14054][Worker] Kill multiple yarn apps at the same time
* Update
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
Co-authored-by: Aaron Wang <[email protected]>
---
.../plugin/task/api/am/YarnApplicationManager.java | 262 ++-------------------
.../worker/processor/WorkerTaskKillProcessor.java | 5 +-
2 files changed, 26 insertions(+), 241 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
index d7f3ffcf10..9ab19d3e24 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
@@ -19,14 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.api.am;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
-import org.apache.dolphinscheduler.common.utils.HttpUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.KerberosHttpClient;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-
-import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
@@ -34,20 +28,12 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.service.AutoService;
@Slf4j
@AutoService(ApplicationManager.class)
public class YarnApplicationManager implements ApplicationManager {
- private static final String RM_HA_IDS =
PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
- private static final String APP_ADDRESS =
PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
- private static final String JOB_HISTORY_ADDRESS =
- PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
- private static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE =
-
PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
-
@Override
public boolean killApplication(ApplicationManagerContext
applicationManagerContext) throws TaskException {
YarnApplicationManagerContext yarnApplicationManagerContext =
@@ -55,20 +41,16 @@ public class YarnApplicationManager implements
ApplicationManager {
String executePath = yarnApplicationManagerContext.getExecutePath();
String tenantCode = yarnApplicationManagerContext.getTenantCode();
List<String> appIds = yarnApplicationManagerContext.getAppIds();
- for (String appId : appIds) {
- try {
- TaskExecutionStatus applicationStatus =
getApplicationStatus(appId);
- if (!applicationStatus.isFinished()) {
- String commandFile = String.format("%s/%s.kill",
executePath, appId);
- String cmd = getKerberosInitCommand() + "yarn application
-kill " + appId;
- execYarnKillCommand(tenantCode, appId, commandFile, cmd);
- }
- } catch (Exception e) {
- log.error("Get yarn application app id [{}}] status failed",
appId, e);
- throw new TaskException(e.getMessage());
- }
+ try {
+ String commandFile = String.format("%s/%s.kill", executePath,
String.join(Constants.UNDERLINE, appIds));
+ String cmd = getKerberosInitCommand() + "yarn application -kill "
+ String.join(Constants.SPACE, appIds);
+ execYarnKillCommand(tenantCode, commandFile, cmd);
+ } catch (Exception e) {
+ log.error("Kill yarn application [{}] failed", appIds, e);
+ throw new TaskException(e.getMessage());
}
+
return true;
}
@@ -77,168 +59,34 @@ public class YarnApplicationManager implements
ApplicationManager {
return ResourceManagerType.YARN;
}
- /**
- * get the state of an application
- *
- * @param applicationId application id
- * @return the return may be null or there may be other parse exceptions
- */
- public TaskExecutionStatus getApplicationStatus(String applicationId)
throws TaskException {
- if (StringUtils.isEmpty(applicationId)) {
- return null;
- }
-
- String result;
- String applicationUrl = getApplicationUrl(applicationId);
- log.debug("generate yarn application url, applicationUrl={}",
applicationUrl);
-
- String responseContent = Boolean.TRUE
-
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
false))
- ? KerberosHttpClient.get(applicationUrl)
- : HttpUtils.get(applicationUrl);
- if (responseContent != null) {
- ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
- if (!jsonObject.has("app")) {
- return TaskExecutionStatus.FAILURE;
- }
- result = jsonObject.path("app").path("finalStatus").asText();
-
- } else {
- // may be in job history
- String jobHistoryUrl = getJobHistoryUrl(applicationId);
- log.debug("generate yarn job history application url,
jobHistoryUrl={}", jobHistoryUrl);
- responseContent = Boolean.TRUE
-
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
false))
- ? KerberosHttpClient.get(jobHistoryUrl)
- : HttpUtils.get(jobHistoryUrl);
-
- if (null != responseContent) {
- ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
- if (!jsonObject.has("job")) {
- return TaskExecutionStatus.FAILURE;
- }
- result = jsonObject.path("job").path("state").asText();
- } else {
- return TaskExecutionStatus.FAILURE;
- }
- }
-
- return getExecutionStatus(result);
- }
-
- /**
- * get application url
- * if rmHaIds contains xx, it signs not use resourcemanager
- * otherwise:
- * if rmHaIds is empty, single resourcemanager enabled
- * if rmHaIds not empty: resourcemanager HA enabled
- *
- * @param applicationId application id
- * @return url of application
- */
- private String getApplicationUrl(String applicationId) throws
TaskException {
-
- String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS :
getAppAddress(APP_ADDRESS, RM_HA_IDS);
- if (StringUtils.isBlank(appUrl)) {
- throw new TaskException("yarn application url generation failed");
- }
- log.debug("yarn application url:{}, applicationId:{}", appUrl,
applicationId);
- return String.format(appUrl,
HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId);
- }
-
- private String getJobHistoryUrl(String applicationId) {
- // eg:application_1587475402360_712719 -> job_1587475402360_712719
- String jobId = applicationId.replace("application", "job");
- return String.format(JOB_HISTORY_ADDRESS, jobId);
- }
-
/**
* build kill command for yarn application
*
* @param tenantCode tenant code
- * @param appId app id
* @param commandFile command file
* @param cmd cmd
*/
- private void execYarnKillCommand(String tenantCode, String appId, String
commandFile,
- String cmd) {
- try {
- StringBuilder sb = new StringBuilder();
- sb.append("#!/bin/sh\n");
- sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
- sb.append("cd $BASEDIR\n");
+ private void execYarnKillCommand(String tenantCode, String commandFile,
+ String cmd) throws Exception {
+ StringBuilder sb = new StringBuilder();
+ sb.append("#!/bin/sh\n");
+ sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
+ sb.append("cd $BASEDIR\n");
- sb.append("\n\n");
- sb.append(cmd);
+ sb.append("\n\n");
+ sb.append(cmd);
- File f = new File(commandFile);
+ File f = new File(commandFile);
- if (!f.exists()) {
- org.apache.commons.io.FileUtils.writeStringToFile(new
File(commandFile), sb.toString(),
- StandardCharsets.UTF_8);
- }
-
- String runCmd = String.format("%s %s", Constants.SH, commandFile);
- runCmd =
org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
- log.info("kill cmd:{}", runCmd);
- org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
- } catch (Exception e) {
- log.error(String.format("Kill yarn application app id [%s] failed:
[%s]", appId, e.getMessage()));
+ if (!f.exists()) {
+ org.apache.commons.io.FileUtils.writeStringToFile(new
File(commandFile), sb.toString(),
+ StandardCharsets.UTF_8);
}
- }
- private TaskExecutionStatus getExecutionStatus(String result) {
- switch (result) {
- case Constants.ACCEPTED:
- return TaskExecutionStatus.SUBMITTED_SUCCESS;
- case Constants.SUCCEEDED:
- case Constants.ENDED:
- return TaskExecutionStatus.SUCCESS;
- case Constants.NEW:
- case Constants.NEW_SAVING:
- case Constants.SUBMITTED:
- case Constants.FAILED:
- return TaskExecutionStatus.FAILURE;
- case Constants.KILLED:
- return TaskExecutionStatus.KILL;
- case Constants.RUNNING:
- default:
- return TaskExecutionStatus.RUNNING_EXECUTION;
- }
- }
-
- /**
- * getAppAddress
- *
- * @param appAddress app address
- * @param rmHa resource manager ha
- * @return app address
- */
- private String getAppAddress(String appAddress, String rmHa) {
-
- String[] split1 = appAddress.split(Constants.DOUBLE_SLASH);
-
- if (split1.length != 2) {
- return null;
- }
-
- String start = split1[0] + Constants.DOUBLE_SLASH;
- String[] split2 = split1[1].split(Constants.COLON);
-
- if (split2.length != 2) {
- return null;
- }
-
- String end = Constants.COLON + split2[1];
-
- // get active ResourceManager
- String activeRM = YarnHAAdminUtils.getActiveRMName(start, rmHa);
-
- if (StringUtils.isEmpty(activeRM)) {
- return null;
- }
-
- return start + activeRM + end;
+ String runCmd = String.format("%s %s", Constants.SH, commandFile);
+ runCmd =
org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
+ log.info("kill cmd:{}", runCmd);
+ org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
}
/**
@@ -261,66 +109,4 @@ public class YarnApplicationManager implements
ApplicationManager {
}
return kerberosCommandBuilder.toString();
}
-
- /**
- * yarn ha admin utils
- */
- private static final class YarnHAAdminUtils {
-
- /**
- * get active resourcemanager node
- *
- * @param protocol http protocol
- * @param rmIds yarn ha ids
- * @return yarn active node
- */
- public static String getActiveRMName(String protocol, String rmIds) {
-
- String[] rmIdArr = rmIds.split(Constants.COMMA);
-
- String yarnUrl = protocol + "%s:" +
HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info";
-
- try {
-
- /**
- * send http get request to rm
- */
-
- for (String rmId : rmIdArr) {
- String state = getRMState(String.format(yarnUrl, rmId));
- if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
- return rmId;
- }
- }
-
- } catch (Exception e) {
- log.error("yarn ha application url generation failed,
message:{}", e.getMessage());
- }
- return null;
- }
-
- /**
- * get ResourceManager state
- */
- public static String getRMState(String url) {
-
- String retStr = Boolean.TRUE
-
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
false))
- ? KerberosHttpClient.get(url)
- : HttpUtils.get(url);
-
- if (StringUtils.isEmpty(retStr)) {
- return null;
- }
- // to json
- ObjectNode jsonObject = JSONUtils.parseObject(retStr);
-
- // get ResourceManager state
- if (!jsonObject.has("clusterInfo")) {
- return null;
- }
- return jsonObject.get("clusterInfo").path("haState").asText();
- }
- }
-
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java
index e2f4d74ced..01bd5c85e0 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java
@@ -85,9 +85,10 @@ public class WorkerTaskKillProcessor implements
WorkerRpcProcessor {
return;
}
+ this.cancelApplication(taskInstanceId);
+
int processId = taskExecutionContext.getProcessId();
if (processId == 0) {
- this.cancelApplication(taskInstanceId);
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
@@ -96,8 +97,6 @@ public class WorkerTaskKillProcessor implements
WorkerRpcProcessor {
return;
}
- // if processId > 0, it should call cancelApplication to cancel
remote application too.
- this.cancelApplication(taskInstanceId);
boolean result = doKill(taskExecutionContext);
taskExecutionContext.setCurrentExecutionStatus(