This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5c5b44c7d0 [Improvement-17788] [TaskPlugin] Optimization of log
processing for RemoteShellTask (#17800)
5c5b44c7d0 is described below
commit 5c5b44c7d03b74f14645002329d70efdf331407f
Author: Qiong Zhou <[email protected]>
AuthorDate: Mon Jan 12 10:56:38 2026 +0800
[Improvement-17788] [TaskPlugin] Optimization of log processing for
RemoteShellTask (#17800)
---
.../plugin/task/remoteshell/RemoteExecutor.java | 41 +++++--
.../task/remoteshell/RemoteExecutorTest.java | 130 ++++++++++++++++++++-
2 files changed, 154 insertions(+), 17 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
index 907d496d3d..7271e4524f 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
@@ -34,14 +34,19 @@ import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.sftp.client.SftpClientFactory;
import org.apache.sshd.sftp.client.fs.SftpFileSystem;
+import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.Consumer;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -109,13 +114,15 @@ public class RemoteExecutor implements AutoCloseable {
do {
pid = getTaskPid(taskId);
String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN +
1, getRemoteShellHome(), taskId);
- String logLine = runRemote(trackCommand);
- if (StringUtils.isEmpty(logLine)) {
- Thread.sleep(TRACK_INTERVAL);
+ int readLines = runRemoteAndProcessLines(trackCommand, line -> {
+ log.info(line);
+ taskOutputParameterParser.appendParseLog(line);
+ });
+ if (readLines > 0) {
+ logN += readLines;
+
} else {
- logN += logLine.split("\n").length;
- log.info(logLine);
- taskOutputParameterParser.appendParseLog(logLine);
+ Thread.sleep(TRACK_INTERVAL);
}
} while (StringUtils.isNotEmpty(pid));
taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams());
@@ -220,23 +227,33 @@ public class RemoteExecutor implements AutoCloseable {
}
public String runRemote(String command) throws IOException {
+ StringBuilder out = new StringBuilder();
+ runRemoteAndProcessLines(command, line ->
out.append(line).append(System.lineSeparator()));
+ return out.toString();
+ }
+
+ private int runRemoteAndProcessLines(String command, Consumer<String>
lineConsumer) throws IOException {
try (
ChannelExec channel = getSession().createExecChannel(command);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
- channel.setOut(System.out);
- channel.setOut(out);
+ InputStream out = channel.getInvertedOut();
channel.setErr(err);
channel.open();
+ int readLines = 0;
+ try (BufferedReader reader = new BufferedReader(new
InputStreamReader(out, StandardCharsets.UTF_8))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ readLines++;
+ lineConsumer.accept(line);
+ }
+ }
channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0);
- channel.close();
Integer exitStatus = channel.getExitStatus();
if (exitStatus == null || exitStatus != 0) {
throw new TaskException(
"Remote shell task error, exitStatus: " + exitStatus +
" error message: " + err);
}
- return out.toString();
+ return readLines;
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
index 1971140e09..81ae92f67c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
@@ -32,22 +32,28 @@ import
org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHDataSourcePara
import
org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.sshd.client.channel.ChannelExec;
+import org.apache.sshd.client.channel.ClientChannelEvent;
import org.apache.sshd.client.session.ClientSession;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
-@Disabled
@ExtendWith(MockitoExtension.class)
public class RemoteExecutorTest {
@@ -83,9 +89,19 @@ public class RemoteExecutorTest {
when(clientSession.auth().verify().isSuccess()).thenReturn(true);
when(clientSession.createExecChannel(Mockito.anyString())).thenReturn(channel);
when(channel.getExitStatus()).thenReturn(1);
+ when(channel.getInvertedOut()).thenReturn(new NullInputStream());
Assertions.assertThrows(TaskException.class, () ->
remoteExecutor.runRemote("ls -l"));
+
+ // Mock the streaming runRemote to simulate log output
+ String output = "total 26392\n" +
+ "dr-xr-xr-x. 6 root root 3072 Aug 15 2023 boot\n" +
+ "drwxr-xr-x 18 root root 3120 Sep 23 2023 dev\n" +
+ "drwxr-xr-x. 91 root root 4096 Sep 23 2023 etc\n";
+ InputStream inputStream = IOUtils.toInputStream(output,
StandardCharsets.UTF_8);
+ when(channel.getInvertedOut()).thenReturn(inputStream);
when(channel.getExitStatus()).thenReturn(0);
- Assertions.assertDoesNotThrow(() -> remoteExecutor.runRemote("ls -l"));
+ String actualOut = Assertions.assertDoesNotThrow(() ->
remoteExecutor.runRemote("ls -l"));
+ Assertions.assertEquals(output, actualOut);
}
@Test
@@ -141,11 +157,20 @@ public class RemoteExecutorTest {
void getAllRemotePidStr() throws IOException {
RemoteExecutor remoteExecutor = spy(new
RemoteExecutor(sshConnectionParam));
-
doReturn("bash(9527)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+ // Mock pstree output based on OS
+ if (SystemUtils.IS_OS_MAC) {
+ doReturn("-+= 9527 root\n \\-+= 9528
root").when(remoteExecutor).runRemote(anyString());
+ } else {
+
doReturn("bash(9527)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+ }
String allPidStr = remoteExecutor.getAllRemotePidStr("9527");
Assertions.assertEquals("9527 9528", allPidStr);
-
doReturn("systemd(1)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+ if (SystemUtils.IS_OS_MAC) {
+ doReturn("-+= 1 root\n \\-+= 9528
root").when(remoteExecutor).runRemote(anyString());
+ } else {
+
doReturn("systemd(1)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+ }
allPidStr = remoteExecutor.getAllRemotePidStr("9527");
Assertions.assertEquals("9527", allPidStr);
@@ -154,4 +179,99 @@ public class RemoteExecutorTest {
Assertions.assertEquals("9527", allPidStr);
}
+
+ @Test
+ void testTrack() throws Exception {
+ RemoteExecutor remoteExecutor = spy(new
RemoteExecutor(sshConnectionParam));
+ String taskId = "1234";
+ ChannelExec channel = Mockito.mock(ChannelExec.class,
RETURNS_DEEP_STUBS);
+
+ // Mock getTaskPid to control the loop, return a valid pid 2 times,
then return empty
+ doReturn("9527")
+ .doReturn("9527")
+ .doReturn("").when(remoteExecutor).getTaskPid(taskId);
+ when(clientSession.auth().verify().isSuccess()).thenReturn(true);
+ when(clientSession.createExecChannel(anyString())).thenReturn(channel);
+
+ // Mock the streaming runRemote to simulate log output
+ String logContent = "some log line 1\n"
+ + "echo \"${setValue(my_prop=my_value)}\"\n"
+ + "some log line 2\n";
+ InputStream inputStream = IOUtils.toInputStream(logContent,
StandardCharsets.UTF_8);
+ when(channel.getInvertedOut()).thenReturn(inputStream);
+ when(channel.getExitStatus()).thenReturn(0);
+
+ remoteExecutor.track(taskId);
+
+ // Verify that the output parameter was parsed and stored
+ Assertions.assertEquals(1,
remoteExecutor.getTaskOutputParams().size());
+ Assertions.assertEquals("my_value",
remoteExecutor.getTaskOutputParams().get("my_prop"));
+ }
+
+ @Test
+ void testRunRemoteWithEmptyOutput() throws Exception {
+ // Test empty output scenario (readLines = 0)
+ RemoteExecutor remoteExecutor = spy(new
RemoteExecutor(sshConnectionParam));
+ ChannelExec channel = Mockito.mock(ChannelExec.class,
RETURNS_DEEP_STUBS);
+
+ when(clientSession.auth().verify().isSuccess()).thenReturn(true);
+ when(clientSession.createExecChannel(anyString())).thenReturn(channel);
+ when(channel.getInvertedOut()).thenReturn(new ByteArrayInputStream(new
byte[0]));
+ when(channel.getExitStatus()).thenReturn(0);
+ when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
+ .thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
+
+ String result = Assertions.assertDoesNotThrow(() ->
remoteExecutor.runRemote("echo"));
+ Assertions.assertEquals("", result);
+ }
+
+ @Test
+ void testRunRemoteWithNonZeroExitStatus() throws Exception {
+ // Test command failure scenario (exitStatus != 0)
+ RemoteExecutor remoteExecutor = spy(new
RemoteExecutor(sshConnectionParam));
+ ChannelExec channel = Mockito.mock(ChannelExec.class,
RETURNS_DEEP_STUBS);
+
+ when(clientSession.auth().verify().isSuccess()).thenReturn(true);
+ when(clientSession.createExecChannel(anyString())).thenReturn(channel);
+ when(channel.getInvertedOut()).thenReturn(IOUtils.toInputStream("error
output", StandardCharsets.UTF_8));
+ when(channel.getExitStatus()).thenReturn(1);
+ when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
+ .thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
+
+ Assertions.assertThrows(TaskException.class, () ->
remoteExecutor.runRemote("failing_command"));
+ }
+
+ @Test
+ void testRunRemoteWithNullExitStatus() throws Exception {
+ // Test null exitStatus scenario
+ RemoteExecutor remoteExecutor = spy(new
RemoteExecutor(sshConnectionParam));
+ ChannelExec channel = Mockito.mock(ChannelExec.class,
RETURNS_DEEP_STUBS);
+
+ when(clientSession.auth().verify().isSuccess()).thenReturn(true);
+ when(clientSession.createExecChannel(anyString())).thenReturn(channel);
+ when(channel.getInvertedOut()).thenReturn(IOUtils.toInputStream("some
output", StandardCharsets.UTF_8));
+ when(channel.getExitStatus()).thenReturn(null);
+ when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
+ .thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
+
+ Assertions.assertThrows(TaskException.class, () ->
remoteExecutor.runRemote("command"));
+ }
+
+ @Test
+ void testTrackWithEmptyLogOutput() throws Exception {
+ // Test track with empty log output (readLines = 0 scenario in track
loop)
+ RemoteExecutor remoteExecutor = spy(new
RemoteExecutor(sshConnectionParam));
+ String taskId = "1234";
+ ChannelExec channel = Mockito.mock(ChannelExec.class,
RETURNS_DEEP_STUBS);
+
+ doReturn("9527").doReturn("").when(remoteExecutor).getTaskPid(taskId);
+ when(clientSession.auth().verify().isSuccess()).thenReturn(true);
+ when(clientSession.createExecChannel(anyString())).thenReturn(channel);
+ when(channel.getInvertedOut()).thenReturn(new ByteArrayInputStream(new
byte[0]));
+ when(channel.getExitStatus()).thenReturn(0);
+ when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
+ .thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
+
+ Assertions.assertDoesNotThrow(() -> remoteExecutor.track(taskId));
+ }
}