This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 703f9991b42ef99ee9d153e527f13eea3f414c79
Author: sgw <[email protected]>
AuthorDate: Thu Sep 29 16:01:29 2022 +0800

    [DS-12154][worker] Optimize the log printing of the worker module (#12183)
    
    * [DS-12154][worker] Optimize the log printing of the worker module 
according to the log specification.
---
 .../server/worker/processor/TaskDispatchProcessor.java    |  5 +++--
 .../worker/processor/TaskExecuteResultAckProcessor.java   |  2 +-
 .../server/worker/processor/TaskRejectAckProcessor.java   |  3 +++
 .../server/worker/processor/TaskSavePointProcessor.java   | 15 +++++++++++----
 .../server/worker/runner/WorkerManagerThread.java         |  1 +
 5 files changed, 19 insertions(+), 7 deletions(-)

diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
index 4bb0c92124..288a5d9b33 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
@@ -95,7 +95,7 @@ public class TaskDispatchProcessor implements 
NettyRequestProcessor {
             return;
         }
         final String workflowMasterAddress = 
taskDispatchCommand.getMessageSenderAddress();
-        logger.info("task execute request message: {}", taskDispatchCommand);
+        logger.info("Receive task dispatch request, command: {}", 
taskDispatchCommand);
 
         TaskExecutionContext taskExecutionContext = 
taskDispatchCommand.getTaskExecutionContext();
 
@@ -133,7 +133,8 @@ public class TaskDispatchProcessor implements 
NettyRequestProcessor {
             if (!offer) {
                 logger.warn("submit task to wait queue error, queue is full, 
current queue size is {}, will send a task reject message to master", 
workerManager.getWaitSubmitQueueSize());
                 workerMessageSender.sendMessageWithRetry(taskExecutionContext, 
workflowMasterAddress, CommandType.TASK_REJECT);
-            }
+            } else
+                logger.info("Submit task to wait queue success, current queue 
size is {}", workerManager.getWaitSubmitQueueSize());
         } finally {
             LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
         }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
index e59902d6fc..bff92e5d11 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
@@ -54,10 +54,10 @@ public class TaskExecuteResultAckProcessor implements 
NettyRequestProcessor {
             logger.error("task execute response ack command is null");
             return;
         }
-        logger.info("task execute response ack command : {}", 
taskExecuteAckMessage);
 
         try {
             
LoggerUtils.setTaskInstanceIdMDC(taskExecuteAckMessage.getTaskInstanceId());
+            logger.info("Receive task execute response ack command : {}", 
taskExecuteAckMessage);
             if (taskExecuteAckMessage.isSuccess()) {
                 
messageRetryRunner.removeRetryMessage(taskExecuteAckMessage.getTaskInstanceId(),
                         CommandType.TASK_EXECUTE_RESULT);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
index a18223b90d..b6eb954b5d 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
@@ -47,10 +47,13 @@ public class TaskRejectAckProcessor implements 
NettyRequestProcessor {
         TaskRejectAckCommand taskRejectAckMessage = 
JSONUtils.parseObject(command.getBody(),
                 TaskRejectAckCommand.class);
         if (taskRejectAckMessage == null) {
+            logger.error("Receive task reject response, the response message 
is null");
             return;
         }
+
         try {
             
LoggerUtils.setTaskInstanceIdMDC(taskRejectAckMessage.getTaskInstanceId());
+            logger.info("Receive task reject response ack command: {}", 
taskRejectAckMessage);
             if (taskRejectAckMessage.isSuccess()) {
                 
messageRetryRunner.removeRetryMessage(taskRejectAckMessage.getTaskInstanceId(),
                         CommandType.TASK_REJECT);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
index 899ac7d9b7..1a621a39e7 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
@@ -22,6 +22,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
@@ -67,7 +68,7 @@ public class TaskSavePointProcessor implements 
NettyRequestProcessor {
             logger.error("task savepoint request command is null");
             return;
         }
-        logger.info("task savepoint command : {}", 
taskSavePointRequestCommand);
+        logger.info("Receive task savepoint command : {}", 
taskSavePointRequestCommand);
 
         int taskInstanceId = taskSavePointRequestCommand.getTaskInstanceId();
         TaskExecutionContext taskExecutionContext = 
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
@@ -76,9 +77,14 @@ public class TaskSavePointProcessor implements 
NettyRequestProcessor {
             return;
         }
 
-        doSavePoint(taskInstanceId);
+        try {
+            LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
+            doSavePoint(taskInstanceId);
 
-        sendTaskSavePointResponseCommand(channel, taskExecutionContext);
+            sendTaskSavePointResponseCommand(channel, taskExecutionContext);
+        } finally {
+            LoggerUtils.removeTaskInstanceIdMDC();
+        }
     }
 
     private void sendTaskSavePointResponseCommand(Channel channel, 
TaskExecutionContext taskExecutionContext) {
@@ -89,7 +95,8 @@ public class TaskSavePointProcessor implements 
NettyRequestProcessor {
             public void operationComplete(ChannelFuture future) throws 
Exception {
                 if (!future.isSuccess()) {
                     logger.error("Submit kill response to master error, kill 
command: {}", taskSavePointResponseCommand);
-                }
+                } else
+                    logger.info("Submit kill response to master success, kill 
command: {}", taskSavePointResponseCommand);
             }
         });
     }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 41824a9a44..1e6a0ba6b7 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -93,6 +93,7 @@ public class WorkerManagerThread implements Runnable {
 
     public boolean offer(WorkerDelayTaskExecuteRunnable 
workerDelayTaskExecuteRunnable) {
         if (waitSubmitQueue.size() > workerExecThreads) {
+            logger.warn("Wait submit queue is full, will retry submit task 
later");
             WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
             // if waitSubmitQueue is full, it will wait 1s, then try add
             ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);

Reply via email to