This is an automated email from the ASF dual-hosted git repository.
xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f93b27e121 [Worker] Fix will not kill the subprocess in remote when
stop a remote-shell task #15570 (#15629)
f93b27e121 is described below
commit f93b27e1217cab05e93953b71e5d2ea5e40fb8c8
Author: sleo <[email protected]>
AuthorDate: Wed Apr 10 14:13:52 2024 +0800
[Worker] Fix will not kill the subprocess in remote when stop a
remote-shell task #15570 (#15629)
* fix cannot kill the subprocess when stop a remote-shell task
* move parse pid logic into ProcessUtils
* extract common logic
---------
Co-authored-by: 旺阳 <[email protected]>
Co-authored-by: Rick Cheng <[email protected]>
---
.../plugin/task/api/utils/ProcessUtils.java | 43 +++++++++++++--------
.../plugin/task/remoteshell/RemoteExecutor.java | 44 +++++++++++++++++++++-
.../task/remoteshell/RemoteExecutorTest.java | 19 ++++++++++
3 files changed, 89 insertions(+), 17 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
index 7b61a1eaec..e8e31faa6d 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
@@ -39,6 +39,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -117,33 +118,45 @@ public final class ProcessUtils {
* @throws Exception exception
*/
public static String getPidsStr(int processId) throws Exception {
- StringBuilder sb = new StringBuilder();
- Matcher mat = null;
+
+ String rawPidStr;
+
// pstree pid get sub pids
if (SystemUtils.IS_OS_MAC) {
- String pids = OSUtils.exeCmd(String.format("%s -sp %d",
TaskConstants.PSTREE, processId));
- if (StringUtils.isNotEmpty(pids)) {
- mat = MACPATTERN.matcher(pids);
+ rawPidStr = OSUtils.exeCmd(String.format("%s -sp %d",
TaskConstants.PSTREE, processId));
+ } else if (SystemUtils.IS_OS_LINUX) {
+ rawPidStr = OSUtils.exeCmd(String.format("%s -p %d",
TaskConstants.PSTREE, processId));
+ } else {
+ rawPidStr = OSUtils.exeCmd(String.format("%s -p %d",
TaskConstants.PSTREE, processId));
+ }
+
+ return parsePidStr(rawPidStr);
+ }
+
+ public static String parsePidStr(String rawPidStr) {
+
+ log.info("prepare to parse pid, raw pid string: {}", rawPidStr);
+ ArrayList<String> allPidList = new ArrayList<>();
+ Matcher mat = null;
+ if (SystemUtils.IS_OS_MAC) {
+ if (StringUtils.isNotEmpty(rawPidStr)) {
+ mat = MACPATTERN.matcher(rawPidStr);
}
} else if (SystemUtils.IS_OS_LINUX) {
- String pids = OSUtils.exeCmd(String.format("%s -p %d",
TaskConstants.PSTREE, processId));
- if (StringUtils.isNotEmpty(pids)) {
- mat = LINUXPATTERN.matcher(pids);
+ if (StringUtils.isNotEmpty(rawPidStr)) {
+ mat = LINUXPATTERN.matcher(rawPidStr);
}
} else {
- String pids = OSUtils.exeCmd(String.format("%s -p %d",
TaskConstants.PSTREE, processId));
- if (StringUtils.isNotEmpty(pids)) {
- mat = WINDOWSPATTERN.matcher(pids);
+ if (StringUtils.isNotEmpty(rawPidStr)) {
+ mat = WINDOWSPATTERN.matcher(rawPidStr);
}
}
-
if (null != mat) {
while (mat.find()) {
- sb.append(mat.group(1)).append(" ");
+ allPidList.add(mat.group(1));
}
}
-
- return sb.toString().trim();
+ return String.join(" ", allPidList).trim();
}
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
index c590fa9e44..334ebb6195 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
@@ -17,12 +17,16 @@
package org.apache.dolphinscheduler.plugin.task.remoteshell;
+import static
org.apache.dolphinscheduler.plugin.task.remoteshell.RemoteExecutor.COMMAND.PSTREE_COMMAND;
+
import org.apache.dolphinscheduler.plugin.datasource.ssh.SSHUtils;
import
org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import
org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ChannelExec;
import org.apache.sshd.client.channel.ClientChannelEvent;
@@ -50,7 +54,6 @@ public class RemoteExecutor implements AutoCloseable {
static final int TRACK_INTERVAL = 5000;
protected Map<String, String> taskOutputParams = new HashMap<>();
-
private SshClient sshClient;
private ClientSession session;
private SSHConnectionParam sshConnectionParam;
@@ -154,11 +157,45 @@ public class RemoteExecutor implements AutoCloseable {
public void kill(String taskId) throws IOException {
String pid = getTaskPid(taskId);
- String killCommand = String.format(COMMAND.KILL_COMMAND, pid);
+
+ if (StringUtils.isEmpty(pid)) {
+ log.warn("query remote-shell task remote process id with empty");
+ return;
+ }
+ if (!NumberUtils.isParsable(pid)) {
+ log.error("query remote-shell task remote process id error, pid {}
can not parse to number", pid);
+ return;
+ }
+
+ // query all pid
+ String remotePidStr = getAllRemotePidStr(pid);
+ String killCommand = String.format(COMMAND.KILL_COMMAND, remotePidStr);
+ log.info("prepare to execute kill command in host: {}, kill cmd: {}",
sshConnectionParam.getHost(),
+ killCommand);
runRemote(killCommand);
cleanData(taskId);
}
+ protected String getAllRemotePidStr(String pid) {
+
+ String remoteProcessIdStr = "";
+ String cmd = String.format(PSTREE_COMMAND, pid);
+ log.info("query all process id cmd: {}", cmd);
+
+ try {
+ String rawPidStr = runRemote(cmd);
+ remoteProcessIdStr = ProcessUtils.parsePidStr(rawPidStr);
+ if (!remoteProcessIdStr.startsWith(pid)) {
+ log.error("query remote process id error, [{}] first pid not
equal [{}]", remoteProcessIdStr, pid);
+ remoteProcessIdStr = pid;
+ }
+ } catch (Exception e) {
+ log.error("query remote all process id error", e);
+ remoteProcessIdStr = pid;
+ }
+ return remoteProcessIdStr;
+ }
+
public String getTaskPid(String taskId) throws IOException {
String pidCommand = String.format(COMMAND.GET_PID_COMMAND, taskId);
return runRemote(pidCommand).trim();
@@ -238,6 +275,9 @@ public class RemoteExecutor implements AutoCloseable {
static final String ADD_STATUS_COMMAND = "\necho %s$?";
static final String CAT_FINAL_SCRIPT = "cat %s%s.sh";
+
+ static final String PSTREE_COMMAND = "pstree -p %s";
+
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
index 975f059695..3cc9757ce1 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.remoteshell;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -135,4 +136,22 @@ public class RemoteExecutorTest {
doReturn("DOLPHINSCHEDULER-REMOTE-SHELL-TASK-STATUS-1").when(remoteExecutor).runRemote(trackCommand);
Assertions.assertEquals(1, remoteExecutor.getTaskExitCode(taskId));
}
+
+ @Test
+ void getAllRemotePidStr() throws IOException {
+
+ RemoteExecutor remoteExecutor = spy(new
RemoteExecutor(sshConnectionParam));
+
doReturn("bash(9527)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+ String allPidStr = remoteExecutor.getAllRemotePidStr("9527");
+ Assertions.assertEquals("9527 9528", allPidStr);
+
+
doReturn("systemd(1)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+ allPidStr = remoteExecutor.getAllRemotePidStr("9527");
+ Assertions.assertEquals("9527", allPidStr);
+
+ doThrow(new
TaskException()).when(remoteExecutor).runRemote(anyString());
+ allPidStr = remoteExecutor.getAllRemotePidStr("9527");
+ Assertions.assertEquals("9527", allPidStr);
+
+ }
}