This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9115062 [Plugin][Task]Fix Task log may be lost (#6294)
9115062 is described below
commit 91150628385bbac709d7e0030ae042e5e0cc396b
Author: Kirs <[email protected]>
AuthorDate: Thu Sep 23 09:07:12 2021 +0800
[Plugin][Task]Fix Task log may be lost (#6294)
---
.../plugin/task/api/AbstractCommandExecutor.java | 17 +++++++++--------
.../plugin/task/api/AbstractTaskExecutor.java | 12 ++++++++++--
.../plugin/task/api/ShellCommandExecutor.java | 5 +++--
.../plugin/task/python/PythonCommandExecutor.java | 4 ++--
4 files changed, 24 insertions(+), 14 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 3f0bfad..9405756 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -40,6 +40,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -68,7 +69,7 @@ public abstract class AbstractCommandExecutor {
/**
* log handler
*/
- protected Consumer<List<String>> logHandler;
+ protected Consumer<LinkedBlockingQueue<String>> logHandler;
/**
* logger
@@ -78,7 +79,7 @@ public abstract class AbstractCommandExecutor {
/**
* log list
*/
- protected List<String> logBuffer;
+ protected LinkedBlockingQueue<String> logBuffer;
protected boolean logOutputIsSuccess = false;
@@ -92,16 +93,16 @@ public abstract class AbstractCommandExecutor {
*/
protected TaskRequest taskRequest;
- public AbstractCommandExecutor(Consumer<List<String>> logHandler,
+ public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>>
logHandler,
TaskRequest taskRequest,
Logger logger) {
this.logHandler = logHandler;
this.taskRequest = taskRequest;
this.logger = logger;
- this.logBuffer = Collections.synchronizedList(new ArrayList<>());
+ this.logBuffer = new LinkedBlockingQueue<>();
}
- public AbstractCommandExecutor(List<String> logBuffer) {
+ public AbstractCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
this.logBuffer = logBuffer;
}
@@ -290,15 +291,15 @@ public abstract class AbstractCommandExecutor {
*/
private void clear() {
- List<String> markerList = new ArrayList<>();
-
markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
+ LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<>(1);
+
markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
if (!logBuffer.isEmpty()) {
// log handle
logHandler.accept(logBuffer);
logBuffer.clear();
}
- logHandler.accept(markerList);
+ logHandler.accept(markerLog);
}
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
index 27c568b..662a15f 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
@@ -17,11 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.api;
+import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
+
import org.apache.dolphinscheduler.plugin.task.util.LoggerUtils;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,12 +56,16 @@ public abstract class AbstractTaskExecutor extends
AbstractTask {
*
* @param logs log list
*/
- public void logHandle(List<String> logs) {
+ public void logHandle(LinkedBlockingQueue<String> logs) {
// note that the "new line" is added here to facilitate log parsing
if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
logger.info(FINALIZE_SESSION_MARKER,
FINALIZE_SESSION_MARKER.toString());
} else {
- logger.info(" -> {}", String.join("\n\t", logs));
+ StringJoiner joiner = new StringJoiner("\n\t");
+ while (!logs.isEmpty()) {
+ joiner.add(logs.poll());
+ }
+ logger.info(" -> {}", joiner);
}
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
index bd1f0b4..5272c0e 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
@@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.slf4j.Logger;
@@ -54,13 +55,13 @@ public class ShellCommandExecutor extends
AbstractCommandExecutor {
* @param taskRequest taskRequest
* @param logger logger
*/
- public ShellCommandExecutor(Consumer<List<String>> logHandler,
+ public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>>
logHandler,
TaskRequest taskRequest,
Logger logger) {
super(logHandler, taskRequest, logger);
}
- public ShellCommandExecutor(List<String> logBuffer) {
+ public ShellCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
super(logBuffer);
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
index 007c5dd..8e118f9 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
@@ -31,7 +31,7 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.regex.Pattern;
@@ -62,7 +62,7 @@ public class PythonCommandExecutor extends
AbstractCommandExecutor {
* @param taskRequest TaskRequest
* @param logger logger
*/
- public PythonCommandExecutor(Consumer<List<String>> logHandler,
+ public PythonCommandExecutor(Consumer<LinkedBlockingQueue<String>>
logHandler,
TaskRequest taskRequest,
Logger logger) {
super(logHandler, taskRequest, logger);