This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 12dd60fa46 Fix task wake up failed will block the event (#13466)
12dd60fa46 is described below

commit 12dd60fa46046659021d1c93d4af2849f22accdf
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Feb 1 15:31:04 2023 +0800

    Fix task wake up failed will block the event (#13466)
---
 .../apache/dolphinscheduler/common/enums/StateEventType.java   |  2 +-
 .../server/master/event/TaskWaitTaskGroupStateHandler.java     | 10 ++++++----
 .../server/master/processor/TaskEventProcessor.java            |  2 +-
 .../server/master/runner/WorkflowExecuteRunnable.java          |  2 +-
 .../server/master/runner/WorkflowExecuteThreadPool.java        |  2 +-
 .../dolphinscheduler/service/process/ProcessServiceImpl.java   |  1 +
 6 files changed, 11 insertions(+), 8 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
index bc021e5e08..5afadaaf06 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
@@ -25,7 +25,7 @@ public enum StateEventType {
     TASK_STATE_CHANGE(1, "task state change"),
     PROCESS_TIMEOUT(2, "process timeout"),
     TASK_TIMEOUT(3, "task timeout"),
-    WAIT_TASK_GROUP(4, "wait task group"),
+    WAKE_UP_TASK_GROUP(4, "wait task group"),
     TASK_RETRY(5, "task retry"),
     PROCESS_BLOCKED(6, "process blocked");
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
index bb0c7b8b70..b5fd02258e 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
@@ -32,16 +32,18 @@ public class TaskWaitTaskGroupStateHandler implements 
StateEventHandler {
 
     @Override
     public boolean handleStateEvent(WorkflowExecuteRunnable 
workflowExecuteRunnable,
-                                    StateEvent stateEvent) throws 
StateEventHandleFailure {
+                                    StateEvent stateEvent) {
         logger.info("Handle task instance wait task group event, 
taskInstanceId: {}", stateEvent.getTaskInstanceId());
-        if (!workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) {
-            throw new StateEventHandleFailure("Task state event handle failed 
due to robing taskGroup resource failed");
+        if (workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) {
+            logger.info("Success wake up task instance, taskInstanceId: {}", 
stateEvent.getTaskInstanceId());
+        } else {
+            logger.info("Failed to wake up task instance, taskInstanceId: {}", 
stateEvent.getTaskInstanceId());
         }
         return true;
     }
 
     @Override
     public StateEventType getEventType() {
-        return StateEventType.WAIT_TASK_GROUP;
+        return StateEventType.WAKE_UP_TASK_GROUP;
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
index 43822fce58..e5f0e0f099 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
@@ -58,7 +58,7 @@ public class TaskEventProcessor implements 
NettyRequestProcessor {
                 
.processInstanceId(taskEventChangeCommand.getProcessInstanceId())
                 .taskInstanceId(taskEventChangeCommand.getTaskInstanceId())
                 .key(taskEventChangeCommand.getKey())
-                .type(StateEventType.WAIT_TASK_GROUP)
+                .type(StateEventType.WAKE_UP_TASK_GROUP)
                 .build();
         try {
             
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 5f981cf92e..7c75a6517f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -482,7 +482,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
                     TaskStateEvent nextEvent = TaskStateEvent.builder()
                             .processInstanceId(processInstance.getId())
                             .taskInstanceId(nextTaskInstance.getId())
-                            .type(StateEventType.WAIT_TASK_GROUP)
+                            .type(StateEventType.WAKE_UP_TASK_GROUP)
                             .build();
                     this.stateEvents.add(nextEvent);
                 } else {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 509028277a..be4d55eeed 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -109,7 +109,7 @@ public class WorkflowExecuteThreadPool extends 
ThreadPoolTaskExecutor {
             return;
         }
         if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
-            logger.warn("The workflow has been executed by another thread");
+            logger.debug("The workflow has been executed by another thread");
             return;
         }
         multiThreadFilterMap.put(workflowExecuteThread.getKey(), 
workflowExecuteThread);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 3591bec514..c2dad29de1 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -2637,6 +2637,7 @@ public class ProcessServiceImpl implements ProcessService 
{
                 processInstance.getId(), taskId);
         Host host = new Host(processInstance.getHost());
         stateEventCallbackService.sendResult(host, 
taskEventChangeCommand.convert2Command(taskType));
+        logger.info("Success send command to master: {}, command: {}", host, 
taskEventChangeCommand);
     }
 
     @Override

Reply via email to