This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9115062  [Plugin][Task]Fix Task log may be lost (#6294)
9115062 is described below

commit 91150628385bbac709d7e0030ae042e5e0cc396b
Author: Kirs <[email protected]>
AuthorDate: Thu Sep 23 09:07:12 2021 +0800

    [Plugin][Task]Fix Task log may be lost (#6294)
---
 .../plugin/task/api/AbstractCommandExecutor.java        | 17 +++++++++--------
 .../plugin/task/api/AbstractTaskExecutor.java           | 12 ++++++++++--
 .../plugin/task/api/ShellCommandExecutor.java           |  5 +++--
 .../plugin/task/python/PythonCommandExecutor.java       |  4 ++--
 4 files changed, 24 insertions(+), 14 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 3f0bfad..9405756 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -40,6 +40,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -68,7 +69,7 @@ public abstract class AbstractCommandExecutor {
     /**
      * log handler
      */
-    protected Consumer<List<String>> logHandler;
+    protected Consumer<LinkedBlockingQueue<String>> logHandler;
 
     /**
      * logger
@@ -78,7 +79,7 @@ public abstract class AbstractCommandExecutor {
     /**
      * log list
      */
-    protected List<String> logBuffer;
+    protected LinkedBlockingQueue<String> logBuffer;
 
     protected boolean logOutputIsSuccess = false;
 
@@ -92,16 +93,16 @@ public abstract class AbstractCommandExecutor {
      */
     protected TaskRequest taskRequest;
 
-    public AbstractCommandExecutor(Consumer<List<String>> logHandler,
+    public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> 
logHandler,
                                    TaskRequest taskRequest,
                                    Logger logger) {
         this.logHandler = logHandler;
         this.taskRequest = taskRequest;
         this.logger = logger;
-        this.logBuffer = Collections.synchronizedList(new ArrayList<>());
+        this.logBuffer = new LinkedBlockingQueue<>();
     }
 
-    public AbstractCommandExecutor(List<String> logBuffer) {
+    public AbstractCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
         this.logBuffer = logBuffer;
     }
 
@@ -290,15 +291,15 @@ public abstract class AbstractCommandExecutor {
      */
     private void clear() {
 
-        List<String> markerList = new ArrayList<>();
-        
markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
+        LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<>(1);
+        
markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
 
         if (!logBuffer.isEmpty()) {
             // log handle
             logHandler.accept(logBuffer);
             logBuffer.clear();
         }
-        logHandler.accept(markerList);
+        logHandler.accept(markerLog);
     }
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
index 27c568b..662a15f 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
@@ -17,11 +17,15 @@
 
 package org.apache.dolphinscheduler.plugin.task.api;
 
+import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
+
 import org.apache.dolphinscheduler.plugin.task.util.LoggerUtils;
 import org.apache.dolphinscheduler.spi.task.AbstractTask;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
 import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,12 +56,16 @@ public abstract class AbstractTaskExecutor extends 
AbstractTask {
      *
      * @param logs log list
      */
-    public void logHandle(List<String> logs) {
+    public void logHandle(LinkedBlockingQueue<String> logs) {
         // note that the "new line" is added here to facilitate log parsing
         if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
             logger.info(FINALIZE_SESSION_MARKER, 
FINALIZE_SESSION_MARKER.toString());
         } else {
-            logger.info(" -> {}", String.join("\n\t", logs));
+            StringJoiner joiner = new StringJoiner("\n\t");
+            while (!logs.isEmpty()) {
+                joiner.add(logs.poll());
+            }
+            logger.info(" -> {}", joiner);
         }
     }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
index bd1f0b4..5272c0e 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
@@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 
 import org.slf4j.Logger;
@@ -54,13 +55,13 @@ public class ShellCommandExecutor extends 
AbstractCommandExecutor {
      * @param taskRequest taskRequest
      * @param logger logger
      */
-    public ShellCommandExecutor(Consumer<List<String>> logHandler,
+    public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> 
logHandler,
                                 TaskRequest taskRequest,
                                 Logger logger) {
         super(logHandler, taskRequest, logger);
     }
 
-    public ShellCommandExecutor(List<String> logBuffer) {
+    public ShellCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
         super(logBuffer);
     }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
index 007c5dd..8e118f9 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
@@ -31,7 +31,7 @@ import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 import java.util.regex.Pattern;
 
@@ -62,7 +62,7 @@ public class PythonCommandExecutor extends 
AbstractCommandExecutor {
      * @param taskRequest TaskRequest
      * @param logger logger
      */
-    public PythonCommandExecutor(Consumer<List<String>> logHandler,
+    public PythonCommandExecutor(Consumer<LinkedBlockingQueue<String>> 
logHandler,
                                  TaskRequest taskRequest,
                                  Logger logger) {
         super(logHandler, taskRequest, logger);

Reply via email to