This is an automated email from the ASF dual-hosted git repository.

xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f93b27e121 [Worker] Fix will not kill the subprocess in remote when 
stop a remote-shell task #15570 (#15629)
f93b27e121 is described below

commit f93b27e1217cab05e93953b71e5d2ea5e40fb8c8
Author: sleo <[email protected]>
AuthorDate: Wed Apr 10 14:13:52 2024 +0800

    [Worker] Fix will not kill the subprocess in remote when stop a 
remote-shell task #15570 (#15629)
    
    * fix cannot kill the subprocess when stop a remote-shell task
    
    * move parse pid logic into ProcessUtils
    
    * extract common logic
    
    ---------
    
    Co-authored-by: 旺阳 <[email protected]>
    Co-authored-by: Rick Cheng <[email protected]>
---
 .../plugin/task/api/utils/ProcessUtils.java        | 43 +++++++++++++--------
 .../plugin/task/remoteshell/RemoteExecutor.java    | 44 +++++++++++++++++++++-
 .../task/remoteshell/RemoteExecutorTest.java       | 19 ++++++++++
 3 files changed, 89 insertions(+), 17 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
index 7b61a1eaec..e8e31faa6d 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
@@ -39,6 +39,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.SystemUtils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -117,33 +118,45 @@ public final class ProcessUtils {
      * @throws Exception exception
      */
     public static String getPidsStr(int processId) throws Exception {
-        StringBuilder sb = new StringBuilder();
-        Matcher mat = null;
+
+        String rawPidStr;
+
         // pstree pid get sub pids
         if (SystemUtils.IS_OS_MAC) {
-            String pids = OSUtils.exeCmd(String.format("%s -sp %d", 
TaskConstants.PSTREE, processId));
-            if (StringUtils.isNotEmpty(pids)) {
-                mat = MACPATTERN.matcher(pids);
+            rawPidStr = OSUtils.exeCmd(String.format("%s -sp %d", 
TaskConstants.PSTREE, processId));
+        } else if (SystemUtils.IS_OS_LINUX) {
+            rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", 
TaskConstants.PSTREE, processId));
+        } else {
+            rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", 
TaskConstants.PSTREE, processId));
+        }
+
+        return parsePidStr(rawPidStr);
+    }
+
+    public static String parsePidStr(String rawPidStr) {
+
+        log.info("prepare to parse pid, raw pid string: {}", rawPidStr);
+        ArrayList<String> allPidList = new ArrayList<>();
+        Matcher mat = null;
+        if (SystemUtils.IS_OS_MAC) {
+            if (StringUtils.isNotEmpty(rawPidStr)) {
+                mat = MACPATTERN.matcher(rawPidStr);
             }
         } else if (SystemUtils.IS_OS_LINUX) {
-            String pids = OSUtils.exeCmd(String.format("%s -p %d", 
TaskConstants.PSTREE, processId));
-            if (StringUtils.isNotEmpty(pids)) {
-                mat = LINUXPATTERN.matcher(pids);
+            if (StringUtils.isNotEmpty(rawPidStr)) {
+                mat = LINUXPATTERN.matcher(rawPidStr);
             }
         } else {
-            String pids = OSUtils.exeCmd(String.format("%s -p %d", 
TaskConstants.PSTREE, processId));
-            if (StringUtils.isNotEmpty(pids)) {
-                mat = WINDOWSPATTERN.matcher(pids);
+            if (StringUtils.isNotEmpty(rawPidStr)) {
+                mat = WINDOWSPATTERN.matcher(rawPidStr);
             }
         }
-
         if (null != mat) {
             while (mat.find()) {
-                sb.append(mat.group(1)).append(" ");
+                allPidList.add(mat.group(1));
             }
         }
-
-        return sb.toString().trim();
+        return String.join(" ", allPidList).trim();
     }
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
index c590fa9e44..334ebb6195 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
@@ -17,12 +17,16 @@
 
 package org.apache.dolphinscheduler.plugin.task.remoteshell;
 
+import static 
org.apache.dolphinscheduler.plugin.task.remoteshell.RemoteExecutor.COMMAND.PSTREE_COMMAND;
+
 import org.apache.dolphinscheduler.plugin.datasource.ssh.SSHUtils;
 import 
org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import 
org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.channel.ChannelExec;
 import org.apache.sshd.client.channel.ClientChannelEvent;
@@ -50,7 +54,6 @@ public class RemoteExecutor implements AutoCloseable {
     static final int TRACK_INTERVAL = 5000;
 
     protected Map<String, String> taskOutputParams = new HashMap<>();
-
     private SshClient sshClient;
     private ClientSession session;
     private SSHConnectionParam sshConnectionParam;
@@ -154,11 +157,45 @@ public class RemoteExecutor implements AutoCloseable {
 
     public void kill(String taskId) throws IOException {
         String pid = getTaskPid(taskId);
-        String killCommand = String.format(COMMAND.KILL_COMMAND, pid);
+
+        if (StringUtils.isEmpty(pid)) {
+            log.warn("query remote-shell task remote process id with empty");
+            return;
+        }
+        if (!NumberUtils.isParsable(pid)) {
+            log.error("query remote-shell task remote process id error, pid {} 
can not parse to number", pid);
+            return;
+        }
+
+        // query all pid
+        String remotePidStr = getAllRemotePidStr(pid);
+        String killCommand = String.format(COMMAND.KILL_COMMAND, remotePidStr);
+        log.info("prepare to execute kill command in host: {}, kill cmd: {}", 
sshConnectionParam.getHost(),
+                killCommand);
         runRemote(killCommand);
         cleanData(taskId);
     }
 
+    protected String getAllRemotePidStr(String pid) {
+
+        String remoteProcessIdStr = "";
+        String cmd = String.format(PSTREE_COMMAND, pid);
+        log.info("query all process id cmd: {}", cmd);
+
+        try {
+            String rawPidStr = runRemote(cmd);
+            remoteProcessIdStr = ProcessUtils.parsePidStr(rawPidStr);
+            if (!remoteProcessIdStr.startsWith(pid)) {
+                log.error("query remote process id error, [{}] first pid not 
equal [{}]", remoteProcessIdStr, pid);
+                remoteProcessIdStr = pid;
+            }
+        } catch (Exception e) {
+            log.error("query remote all process id error", e);
+            remoteProcessIdStr = pid;
+        }
+        return remoteProcessIdStr;
+    }
+
     public String getTaskPid(String taskId) throws IOException {
         String pidCommand = String.format(COMMAND.GET_PID_COMMAND, taskId);
         return runRemote(pidCommand).trim();
@@ -238,6 +275,9 @@ public class RemoteExecutor implements AutoCloseable {
         static final String ADD_STATUS_COMMAND = "\necho %s$?";
 
         static final String CAT_FINAL_SCRIPT = "cat %s%s.sh";
+
+        static final String PSTREE_COMMAND = "pstree -p %s";
+
     }
 
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
index 975f059695..3cc9757ce1 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.plugin.task.remoteshell;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -135,4 +136,22 @@ public class RemoteExecutorTest {
         
doReturn("DOLPHINSCHEDULER-REMOTE-SHELL-TASK-STATUS-1").when(remoteExecutor).runRemote(trackCommand);
         Assertions.assertEquals(1, remoteExecutor.getTaskExitCode(taskId));
     }
+
+    @Test
+    void getAllRemotePidStr() throws IOException {
+
+        RemoteExecutor remoteExecutor = spy(new 
RemoteExecutor(sshConnectionParam));
+        
doReturn("bash(9527)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+        String allPidStr = remoteExecutor.getAllRemotePidStr("9527");
+        Assertions.assertEquals("9527 9528", allPidStr);
+
+        
doReturn("systemd(1)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+        allPidStr = remoteExecutor.getAllRemotePidStr("9527");
+        Assertions.assertEquals("9527", allPidStr);
+
+        doThrow(new 
TaskException()).when(remoteExecutor).runRemote(anyString());
+        allPidStr = remoteExecutor.getAllRemotePidStr("9527");
+        Assertions.assertEquals("9527", allPidStr);
+
+    }
 }

Reply via email to