This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3eb4c0ed1d [Improvement-14054][Worker] Kill multiple yarn apps at the 
same time (#14055)
3eb4c0ed1d is described below

commit 3eb4c0ed1d0ab15280744bbd893e52ce05140052
Author: Rick Cheng <[email protected]>
AuthorDate: Sat May 6 19:13:12 2023 +0800

    [Improvement-14054][Worker] Kill multiple yarn apps at the same time 
(#14055)
    
    * [Improvement-14054][Worker] Kill multiple yarn apps at the same time
    
    * Update 
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
    
    Co-authored-by: Aaron Wang <[email protected]>
---
 .../plugin/task/api/am/YarnApplicationManager.java | 262 ++-------------------
 .../worker/processor/WorkerTaskKillProcessor.java  |   5 +-
 2 files changed, 26 insertions(+), 241 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
index d7f3ffcf10..9ab19d3e24 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
@@ -19,14 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.api.am;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
-import org.apache.dolphinscheduler.common.utils.HttpUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.KerberosHttpClient;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-
-import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
@@ -34,20 +28,12 @@ import java.util.List;
 
 import lombok.extern.slf4j.Slf4j;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.auto.service.AutoService;
 
 @Slf4j
 @AutoService(ApplicationManager.class)
 public class YarnApplicationManager implements ApplicationManager {
 
-    private static final String RM_HA_IDS = 
PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
-    private static final String APP_ADDRESS = 
PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
-    private static final String JOB_HISTORY_ADDRESS =
-            PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
-    private static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE =
-            
PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
-
     @Override
     public boolean killApplication(ApplicationManagerContext 
applicationManagerContext) throws TaskException {
         YarnApplicationManagerContext yarnApplicationManagerContext =
@@ -55,20 +41,16 @@ public class YarnApplicationManager implements 
ApplicationManager {
         String executePath = yarnApplicationManagerContext.getExecutePath();
         String tenantCode = yarnApplicationManagerContext.getTenantCode();
         List<String> appIds = yarnApplicationManagerContext.getAppIds();
-        for (String appId : appIds) {
-            try {
-                TaskExecutionStatus applicationStatus = 
getApplicationStatus(appId);
 
-                if (!applicationStatus.isFinished()) {
-                    String commandFile = String.format("%s/%s.kill", 
executePath, appId);
-                    String cmd = getKerberosInitCommand() + "yarn application 
-kill " + appId;
-                    execYarnKillCommand(tenantCode, appId, commandFile, cmd);
-                }
-            } catch (Exception e) {
-                log.error("Get yarn application app id [{}}] status failed", 
appId, e);
-                throw new TaskException(e.getMessage());
-            }
+        try {
+            String commandFile = String.format("%s/%s.kill", executePath, 
String.join(Constants.UNDERLINE, appIds));
+            String cmd = getKerberosInitCommand() + "yarn application -kill " 
+ String.join(Constants.SPACE, appIds);
+            execYarnKillCommand(tenantCode, commandFile, cmd);
+        } catch (Exception e) {
+            log.error("Kill yarn application [{}] failed", appIds, e);
+            throw new TaskException(e.getMessage());
         }
+
         return true;
     }
 
@@ -77,168 +59,34 @@ public class YarnApplicationManager implements 
ApplicationManager {
         return ResourceManagerType.YARN;
     }
 
-    /**
-     * get the state of an application
-     *
-     * @param applicationId application id
-     * @return the return may be null or there may be other parse exceptions
-     */
-    public TaskExecutionStatus getApplicationStatus(String applicationId) 
throws TaskException {
-        if (StringUtils.isEmpty(applicationId)) {
-            return null;
-        }
-
-        String result;
-        String applicationUrl = getApplicationUrl(applicationId);
-        log.debug("generate yarn application url, applicationUrl={}", 
applicationUrl);
-
-        String responseContent = Boolean.TRUE
-                
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
 false))
-                        ? KerberosHttpClient.get(applicationUrl)
-                        : HttpUtils.get(applicationUrl);
-        if (responseContent != null) {
-            ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
-            if (!jsonObject.has("app")) {
-                return TaskExecutionStatus.FAILURE;
-            }
-            result = jsonObject.path("app").path("finalStatus").asText();
-
-        } else {
-            // may be in job history
-            String jobHistoryUrl = getJobHistoryUrl(applicationId);
-            log.debug("generate yarn job history application url, 
jobHistoryUrl={}", jobHistoryUrl);
-            responseContent = Boolean.TRUE
-                    
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
 false))
-                            ? KerberosHttpClient.get(jobHistoryUrl)
-                            : HttpUtils.get(jobHistoryUrl);
-
-            if (null != responseContent) {
-                ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
-                if (!jsonObject.has("job")) {
-                    return TaskExecutionStatus.FAILURE;
-                }
-                result = jsonObject.path("job").path("state").asText();
-            } else {
-                return TaskExecutionStatus.FAILURE;
-            }
-        }
-
-        return getExecutionStatus(result);
-    }
-
-    /**
-     * get application url
-     * if rmHaIds contains xx, it signs not use resourcemanager
-     * otherwise:
-     * if rmHaIds is empty, single resourcemanager enabled
-     * if rmHaIds not empty: resourcemanager HA enabled
-     *
-     * @param applicationId application id
-     * @return url of application
-     */
-    private String getApplicationUrl(String applicationId) throws 
TaskException {
-
-        String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS : 
getAppAddress(APP_ADDRESS, RM_HA_IDS);
-        if (StringUtils.isBlank(appUrl)) {
-            throw new TaskException("yarn application url generation failed");
-        }
-        log.debug("yarn application url:{}, applicationId:{}", appUrl, 
applicationId);
-        return String.format(appUrl, 
HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId);
-    }
-
-    private String getJobHistoryUrl(String applicationId) {
-        // eg:application_1587475402360_712719 -> job_1587475402360_712719
-        String jobId = applicationId.replace("application", "job");
-        return String.format(JOB_HISTORY_ADDRESS, jobId);
-    }
-
     /**
      * build kill command for yarn application
      *
      * @param tenantCode tenant code
-     * @param appId app id
      * @param commandFile command file
      * @param cmd cmd
      */
-    private void execYarnKillCommand(String tenantCode, String appId, String 
commandFile,
-                                     String cmd) {
-        try {
-            StringBuilder sb = new StringBuilder();
-            sb.append("#!/bin/sh\n");
-            sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
-            sb.append("cd $BASEDIR\n");
+    private void execYarnKillCommand(String tenantCode, String commandFile,
+                                     String cmd) throws Exception {
+        StringBuilder sb = new StringBuilder();
+        sb.append("#!/bin/sh\n");
+        sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
+        sb.append("cd $BASEDIR\n");
 
-            sb.append("\n\n");
-            sb.append(cmd);
+        sb.append("\n\n");
+        sb.append(cmd);
 
-            File f = new File(commandFile);
+        File f = new File(commandFile);
 
-            if (!f.exists()) {
-                org.apache.commons.io.FileUtils.writeStringToFile(new 
File(commandFile), sb.toString(),
-                        StandardCharsets.UTF_8);
-            }
-
-            String runCmd = String.format("%s %s", Constants.SH, commandFile);
-            runCmd = 
org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
-            log.info("kill cmd:{}", runCmd);
-            org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
-        } catch (Exception e) {
-            log.error(String.format("Kill yarn application app id [%s] failed: 
[%s]", appId, e.getMessage()));
+        if (!f.exists()) {
+            org.apache.commons.io.FileUtils.writeStringToFile(new 
File(commandFile), sb.toString(),
+                    StandardCharsets.UTF_8);
         }
-    }
 
-    private TaskExecutionStatus getExecutionStatus(String result) {
-        switch (result) {
-            case Constants.ACCEPTED:
-                return TaskExecutionStatus.SUBMITTED_SUCCESS;
-            case Constants.SUCCEEDED:
-            case Constants.ENDED:
-                return TaskExecutionStatus.SUCCESS;
-            case Constants.NEW:
-            case Constants.NEW_SAVING:
-            case Constants.SUBMITTED:
-            case Constants.FAILED:
-                return TaskExecutionStatus.FAILURE;
-            case Constants.KILLED:
-                return TaskExecutionStatus.KILL;
-            case Constants.RUNNING:
-            default:
-                return TaskExecutionStatus.RUNNING_EXECUTION;
-        }
-    }
-
-    /**
-     * getAppAddress
-     *
-     * @param appAddress app address
-     * @param rmHa       resource manager ha
-     * @return app address
-     */
-    private String getAppAddress(String appAddress, String rmHa) {
-
-        String[] split1 = appAddress.split(Constants.DOUBLE_SLASH);
-
-        if (split1.length != 2) {
-            return null;
-        }
-
-        String start = split1[0] + Constants.DOUBLE_SLASH;
-        String[] split2 = split1[1].split(Constants.COLON);
-
-        if (split2.length != 2) {
-            return null;
-        }
-
-        String end = Constants.COLON + split2[1];
-
-        // get active ResourceManager
-        String activeRM = YarnHAAdminUtils.getActiveRMName(start, rmHa);
-
-        if (StringUtils.isEmpty(activeRM)) {
-            return null;
-        }
-
-        return start + activeRM + end;
+        String runCmd = String.format("%s %s", Constants.SH, commandFile);
+        runCmd = 
org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
+        log.info("kill cmd:{}", runCmd);
+        org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
     }
 
     /**
@@ -261,66 +109,4 @@ public class YarnApplicationManager implements 
ApplicationManager {
         }
         return kerberosCommandBuilder.toString();
     }
-
-    /**
-     * yarn ha admin utils
-     */
-    private static final class YarnHAAdminUtils {
-
-        /**
-         * get active resourcemanager node
-         *
-         * @param protocol http protocol
-         * @param rmIds    yarn ha ids
-         * @return yarn active node
-         */
-        public static String getActiveRMName(String protocol, String rmIds) {
-
-            String[] rmIdArr = rmIds.split(Constants.COMMA);
-
-            String yarnUrl = protocol + "%s:" + 
HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info";
-
-            try {
-
-                /**
-                 * send http get request to rm
-                 */
-
-                for (String rmId : rmIdArr) {
-                    String state = getRMState(String.format(yarnUrl, rmId));
-                    if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
-                        return rmId;
-                    }
-                }
-
-            } catch (Exception e) {
-                log.error("yarn ha application url generation failed, 
message:{}", e.getMessage());
-            }
-            return null;
-        }
-
-        /**
-         * get ResourceManager state
-         */
-        public static String getRMState(String url) {
-
-            String retStr = Boolean.TRUE
-                    
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
 false))
-                            ? KerberosHttpClient.get(url)
-                            : HttpUtils.get(url);
-
-            if (StringUtils.isEmpty(retStr)) {
-                return null;
-            }
-            // to json
-            ObjectNode jsonObject = JSONUtils.parseObject(retStr);
-
-            // get ResourceManager state
-            if (!jsonObject.has("clusterInfo")) {
-                return null;
-            }
-            return jsonObject.get("clusterInfo").path("haState").asText();
-        }
-    }
-
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java
index e2f4d74ced..01bd5c85e0 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java
@@ -85,9 +85,10 @@ public class WorkerTaskKillProcessor implements 
WorkerRpcProcessor {
                 return;
             }
 
+            this.cancelApplication(taskInstanceId);
+
             int processId = taskExecutionContext.getProcessId();
             if (processId == 0) {
-                this.cancelApplication(taskInstanceId);
                 
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
                 
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL);
                 
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
@@ -96,8 +97,6 @@ public class WorkerTaskKillProcessor implements 
WorkerRpcProcessor {
                 return;
             }
 
-            // if processId > 0, it should call cancelApplication to cancel 
remote application too.
-            this.cancelApplication(taskInstanceId);
             boolean result = doKill(taskExecutionContext);
 
             taskExecutionContext.setCurrentExecutionStatus(

Reply via email to