This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5c5b44c7d0 [Improvement-17788] [TaskPlugin] Optimization of log 
processing for RemoteShellTask (#17800)
5c5b44c7d0 is described below

commit 5c5b44c7d03b74f14645002329d70efdf331407f
Author: Qiong Zhou <[email protected]>
AuthorDate: Mon Jan 12 10:56:38 2026 +0800

    [Improvement-17788] [TaskPlugin] Optimization of log processing for 
RemoteShellTask (#17800)
---
 .../plugin/task/remoteshell/RemoteExecutor.java    |  41 +++++--
 .../task/remoteshell/RemoteExecutorTest.java       | 130 ++++++++++++++++++++-
 2 files changed, 154 insertions(+), 17 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
index 907d496d3d..7271e4524f 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
@@ -34,14 +34,19 @@ import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.sftp.client.SftpClientFactory;
 import org.apache.sshd.sftp.client.fs.SftpFileSystem;
 
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Consumer;
 
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -109,13 +114,15 @@ public class RemoteExecutor implements AutoCloseable {
         do {
             pid = getTaskPid(taskId);
             String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN + 
1, getRemoteShellHome(), taskId);
-            String logLine = runRemote(trackCommand);
-            if (StringUtils.isEmpty(logLine)) {
-                Thread.sleep(TRACK_INTERVAL);
+            int readLines = runRemoteAndProcessLines(trackCommand, line -> {
+                log.info(line);
+                taskOutputParameterParser.appendParseLog(line);
+            });
+            if (readLines > 0) {
+                logN += readLines;
+
             } else {
-                logN += logLine.split("\n").length;
-                log.info(logLine);
-                taskOutputParameterParser.appendParseLog(logLine);
+                Thread.sleep(TRACK_INTERVAL);
             }
         } while (StringUtils.isNotEmpty(pid));
         
taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams());
@@ -220,23 +227,33 @@ public class RemoteExecutor implements AutoCloseable {
     }
 
     public String runRemote(String command) throws IOException {
+        StringBuilder out = new StringBuilder();
+        runRemoteAndProcessLines(command, line -> 
out.append(line).append(System.lineSeparator()));
+        return out.toString();
+    }
+
+    private int runRemoteAndProcessLines(String command, Consumer<String> 
lineConsumer) throws IOException {
         try (
                 ChannelExec channel = getSession().createExecChannel(command);
-                ByteArrayOutputStream out = new ByteArrayOutputStream();
                 ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
-            channel.setOut(System.out);
-            channel.setOut(out);
+            InputStream out = channel.getInvertedOut();
             channel.setErr(err);
             channel.open();
+            int readLines = 0;
+            try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(out, StandardCharsets.UTF_8))) {
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    readLines++;
+                    lineConsumer.accept(line);
+                }
+            }
             channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0);
-            channel.close();
             Integer exitStatus = channel.getExitStatus();
             if (exitStatus == null || exitStatus != 0) {
                 throw new TaskException(
                         "Remote shell task error, exitStatus: " + exitStatus + 
" error message: " + err);
             }
-            return out.toString();
+            return readLines;
         }
     }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
index 1971140e09..81ae92f67c 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java
@@ -32,22 +32,28 @@ import 
org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHDataSourcePara
 import 
org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHDataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.sshd.client.channel.ChannelExec;
+import org.apache.sshd.client.channel.ClientChannelEvent;
 import org.apache.sshd.client.session.ClientSession;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-@Disabled
 @ExtendWith(MockitoExtension.class)
 public class RemoteExecutorTest {
 
@@ -83,9 +89,19 @@ public class RemoteExecutorTest {
         when(clientSession.auth().verify().isSuccess()).thenReturn(true);
         
when(clientSession.createExecChannel(Mockito.anyString())).thenReturn(channel);
         when(channel.getExitStatus()).thenReturn(1);
+        when(channel.getInvertedOut()).thenReturn(new NullInputStream());
         Assertions.assertThrows(TaskException.class, () -> 
remoteExecutor.runRemote("ls -l"));
+
+        // Mock the streaming runRemote to simulate log output
+        String output = "total 26392\n" +
+                "dr-xr-xr-x.   6 root root      3072 Aug 15  2023 boot\n" +
+                "drwxr-xr-x   18 root root      3120 Sep 23  2023 dev\n" +
+                "drwxr-xr-x.  91 root root      4096 Sep 23  2023 etc\n";
+        InputStream inputStream = IOUtils.toInputStream(output, 
StandardCharsets.UTF_8);
+        when(channel.getInvertedOut()).thenReturn(inputStream);
         when(channel.getExitStatus()).thenReturn(0);
-        Assertions.assertDoesNotThrow(() -> remoteExecutor.runRemote("ls -l"));
+        String actualOut = Assertions.assertDoesNotThrow(() -> 
remoteExecutor.runRemote("ls -l"));
+        Assertions.assertEquals(output, actualOut);
     }
 
     @Test
@@ -141,11 +157,20 @@ public class RemoteExecutorTest {
     void getAllRemotePidStr() throws IOException {
 
         RemoteExecutor remoteExecutor = spy(new 
RemoteExecutor(sshConnectionParam));
-        
doReturn("bash(9527)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+        // Mock pstree output based on OS
+        if (SystemUtils.IS_OS_MAC) {
+            doReturn("-+= 9527 root\n \\-+= 9528 
root").when(remoteExecutor).runRemote(anyString());
+        } else {
+            
doReturn("bash(9527)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+        }
         String allPidStr = remoteExecutor.getAllRemotePidStr("9527");
         Assertions.assertEquals("9527 9528", allPidStr);
 
-        
doReturn("systemd(1)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+        if (SystemUtils.IS_OS_MAC) {
+            doReturn("-+= 1 root\n \\-+= 9528 
root").when(remoteExecutor).runRemote(anyString());
+        } else {
+            
doReturn("systemd(1)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
+        }
         allPidStr = remoteExecutor.getAllRemotePidStr("9527");
         Assertions.assertEquals("9527", allPidStr);
 
@@ -154,4 +179,99 @@ public class RemoteExecutorTest {
         Assertions.assertEquals("9527", allPidStr);
 
     }
+
+    @Test
+    void testTrack() throws Exception {
+        RemoteExecutor remoteExecutor = spy(new 
RemoteExecutor(sshConnectionParam));
+        String taskId = "1234";
+        ChannelExec channel = Mockito.mock(ChannelExec.class, 
RETURNS_DEEP_STUBS);
+
+        // Mock getTaskPid to control the loop, return a valid pid 2 times, 
then return empty
+        doReturn("9527")
+                .doReturn("9527")
+                .doReturn("").when(remoteExecutor).getTaskPid(taskId);
+        when(clientSession.auth().verify().isSuccess()).thenReturn(true);
+        when(clientSession.createExecChannel(anyString())).thenReturn(channel);
+
+        // Mock the streaming runRemote to simulate log output
+        String logContent = "some log line 1\n"
+                + "echo \"${setValue(my_prop=my_value)}\"\n"
+                + "some log line 2\n";
+        InputStream inputStream = IOUtils.toInputStream(logContent, 
StandardCharsets.UTF_8);
+        when(channel.getInvertedOut()).thenReturn(inputStream);
+        when(channel.getExitStatus()).thenReturn(0);
+
+        remoteExecutor.track(taskId);
+
+        // Verify that the output parameter was parsed and stored
+        Assertions.assertEquals(1, 
remoteExecutor.getTaskOutputParams().size());
+        Assertions.assertEquals("my_value", 
remoteExecutor.getTaskOutputParams().get("my_prop"));
+    }
+
+    @Test
+    void testRunRemoteWithEmptyOutput() throws Exception {
+        // Test empty output scenario (readLines = 0)
+        RemoteExecutor remoteExecutor = spy(new 
RemoteExecutor(sshConnectionParam));
+        ChannelExec channel = Mockito.mock(ChannelExec.class, 
RETURNS_DEEP_STUBS);
+
+        when(clientSession.auth().verify().isSuccess()).thenReturn(true);
+        when(clientSession.createExecChannel(anyString())).thenReturn(channel);
+        when(channel.getInvertedOut()).thenReturn(new ByteArrayInputStream(new 
byte[0]));
+        when(channel.getExitStatus()).thenReturn(0);
+        when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
+                .thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
+
+        String result = Assertions.assertDoesNotThrow(() -> 
remoteExecutor.runRemote("echo"));
+        Assertions.assertEquals("", result);
+    }
+
+    @Test
+    void testRunRemoteWithNonZeroExitStatus() throws Exception {
+        // Test command failure scenario (exitStatus != 0)
+        RemoteExecutor remoteExecutor = spy(new 
RemoteExecutor(sshConnectionParam));
+        ChannelExec channel = Mockito.mock(ChannelExec.class, 
RETURNS_DEEP_STUBS);
+
+        when(clientSession.auth().verify().isSuccess()).thenReturn(true);
+        when(clientSession.createExecChannel(anyString())).thenReturn(channel);
+        when(channel.getInvertedOut()).thenReturn(IOUtils.toInputStream("error 
output", StandardCharsets.UTF_8));
+        when(channel.getExitStatus()).thenReturn(1);
+        when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
+                .thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
+
+        Assertions.assertThrows(TaskException.class, () -> 
remoteExecutor.runRemote("failing_command"));
+    }
+
+    @Test
+    void testRunRemoteWithNullExitStatus() throws Exception {
+        // Test null exitStatus scenario
+        RemoteExecutor remoteExecutor = spy(new 
RemoteExecutor(sshConnectionParam));
+        ChannelExec channel = Mockito.mock(ChannelExec.class, 
RETURNS_DEEP_STUBS);
+
+        when(clientSession.auth().verify().isSuccess()).thenReturn(true);
+        when(clientSession.createExecChannel(anyString())).thenReturn(channel);
+        when(channel.getInvertedOut()).thenReturn(IOUtils.toInputStream("some 
output", StandardCharsets.UTF_8));
+        when(channel.getExitStatus()).thenReturn(null);
+        when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
+                .thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
+
+        Assertions.assertThrows(TaskException.class, () -> 
remoteExecutor.runRemote("command"));
+    }
+
+    @Test
+    void testTrackWithEmptyLogOutput() throws Exception {
+        // Test track with empty log output (readLines = 0 scenario in track 
loop)
+        RemoteExecutor remoteExecutor = spy(new 
RemoteExecutor(sshConnectionParam));
+        String taskId = "1234";
+        ChannelExec channel = Mockito.mock(ChannelExec.class, 
RETURNS_DEEP_STUBS);
+
+        doReturn("9527").doReturn("").when(remoteExecutor).getTaskPid(taskId);
+        when(clientSession.auth().verify().isSuccess()).thenReturn(true);
+        when(clientSession.createExecChannel(anyString())).thenReturn(channel);
+        when(channel.getInvertedOut()).thenReturn(new ByteArrayInputStream(new 
byte[0]));
+        when(channel.getExitStatus()).thenReturn(0);
+        when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
+                .thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
+
+        Assertions.assertDoesNotThrow(() -> remoteExecutor.track(taskId));
+    }
 }

Reply via email to