This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5a9cb7ee64 Fix task log might delay until task finished due to batch
flush (#13502)
5a9cb7ee64 is described below
commit 5a9cb7ee64783776ae6fa0865937d25234e0159a
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Feb 2 21:48:03 2023 +0800
Fix task log might delay until task finished due to batch flush (#13502)
---
.../plugin/task/api/AbstractCommandExecutor.java | 35 ++++------------------
.../plugin/task/api/TaskConstants.java | 5 ----
2 files changed, 6 insertions(+), 34 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index c79b4a8746..201684ddca 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -359,7 +359,7 @@ public abstract class AbstractCommandExecutor {
}
logOutputIsSuccess = true;
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ logger.error("Parse var pool error", e);
logOutputIsSuccess = true;
}
});
@@ -369,17 +369,16 @@ public abstract class AbstractCommandExecutor {
ExecutorService parseProcessOutputExecutorService =
newDaemonSingleThreadExecutor(threadLoggerInfoName);
parseProcessOutputExecutorService.submit(() -> {
try {
- long lastFlushTime = System.currentTimeMillis();
- while (logBuffer.size() > 0 || !logOutputIsSuccess) {
- if (logBuffer.size() > 0) {
- lastFlushTime = flush(lastFlushTime);
+ while (!logBuffer.isEmpty() || !logOutputIsSuccess) {
+ if (!logBuffer.isEmpty()) {
+ logHandler.accept(logBuffer);
+ logBuffer.clear();
} else {
Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
}
}
} catch (Exception e) {
- Thread.currentThread().interrupt();
- logger.error(e.getMessage(), e);
+ logger.error("Output task log error", e);
} finally {
clear();
}
@@ -438,28 +437,6 @@ public abstract class AbstractCommandExecutor {
return processId;
}
- /**
- * when log buffer siz or flush time reach condition , then flush
- *
- * @param lastFlushTime last flush time
- * @return last flush time
- */
- private long flush(long lastFlushTime) {
- long now = System.currentTimeMillis();
-
- /*
- * when log buffer siz or flush time reach condition , then flush
- */
- if (logBuffer.size() >= TaskConstants.DEFAULT_LOG_ROWS_NUM
- || now - lastFlushTime >
TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL) {
- lastFlushTime = now;
- logHandler.accept(logBuffer);
-
- logBuffer.clear();
- }
- return lastFlushTime;
- }
-
protected abstract String buildCommandFilePath();
protected abstract void createCommandFileIfNotExists(String execCommand,
String commandFile) throws IOException;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index 8337dc6f31..a0928d0eac 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -134,11 +134,6 @@ public class TaskConstants {
public static final String SH = "sh";
- /**
- * default log cache rows num,output when reach the number
- */
- public static final int DEFAULT_LOG_ROWS_NUM = 4 * 16;
-
/**
* log flush interval?output when reach the interval
*/