JorringHsiao opened a new issue, #17788: URL: https://github.com/apache/dolphinscheduler/issues/17788
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/dolphinscheduler/issues?q=is%3Aissue) and found no similar feature requirement. ### Description ## risk Based on the current log processing logic, if the remote executed script outputs a large amount of logs at a certain point in time, the server will generate a large strings. Moreover, if there are many such tasks being executed simultaneously and handling so many logs at the same time, the generation of a large number of strings will cause frequent GC, and even an OOM situation. ## source code ```java public class RemoteExecutor implements AutoCloseable { // ... public void track(String taskId) throws Exception { int logN = 0; String pid; log.info("Remote shell task log:"); TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); do { pid = getTaskPid(taskId); String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN + 1, getRemoteShellHome(), taskId); // 👇 tail -n +XXX XXX.log String logLine = runRemote(trackCommand); if (StringUtils.isEmpty(logLine)) { Thread.sleep(TRACK_INTERVAL); } else { // 👇 Here, many temporary string objects were created in order to obtain the number of lines. logN += logLine.split("\n").length; log.info(logLine); taskOutputParameterParser.appendParseLog(logLine); } } while (StringUtils.isNotEmpty(pid)); taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams()); } public String runRemote(String command) throws IOException { try ( ChannelExec channel = getSession().createExecChannel(command); // 👇 a large amount of logs this time --> large byte array ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream err = new ByteArrayOutputStream()) { channel.setOut(System.out); channel.setOut(out); channel.setErr(err); channel.open(); channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0); channel.close(); Integer exitStatus = channel.getExitStatus(); if (exitStatus == null || exitStatus != 0) { throw new TaskException( "Remote shell task error, exitStatus: " + exitStatus + " error message: " + err); } // 👇 the number of large objects will multiply increase return out.toString(); } } } ``` ## a simple example of improvement remote --> client -> PipedOutputStream -> PipedInputStream --> consumer read line ```java public void runRemote(String command, Consumer<InputStream> handlerOut) throws IOException { try ( ChannelExec channel = getSession().createExecChannel(command); PipedInputStream in = new PipedInputStream(); // 👈 PipedOutputStream out = new PipedOutputStream(in); // 👈 ByteArrayOutputStream err = new ByteArrayOutputStream()) { channel.setOut(System.out); channel.setOut(out); channel.setErr(err); channel.open(); handlerOut.accept(in); // 👈 channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0); channel.close(); Integer exitStatus = channel.getExitStatus(); if (exitStatus == null || exitStatus != 0) { throw new TaskException( "Remote shell task error, exitStatus: " + exitStatus + " error message: " + err); } } } public void track(String taskId) throws Exception { AtomicInteger logN = new AtomicInteger(); String pid; log.info("Remote shell task log:"); TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); do { pid = getTaskPid(taskId); int lastLogN = logN.get(); // 👈 String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN.get() + 1, getRemoteShellHome(), taskId); // 👇 runRemote(trackCommand, in -> { try (InputStreamReader inReader = new InputStreamReader(in); BufferedReader reader = new BufferedReader(inReader) ) { String logLine; while ((logLine = reader.readLine()) != null) { logN.incrementAndGet(); log.info(logLine); taskOutputParameterParser.appendParseLog(logLine); } } catch (IOException e) { // do sth ... } }); if (lastLogN == logN.get()) { Thread.sleep(TRACK_INTERVAL); } } while (StringUtils.isNotEmpty(pid)); taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams()); } ``` ### Are you willing to submit a PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
