This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.1-prepare by this push:
     new 43d9c00259 [Cherry-pick][Bug] [Master] WorkflowExecuteRunnable will 
face a infinite loop #11838 #11864 (#11949)
43d9c00259 is described below

commit 43d9c00259c32020bc442a0f436ed12a2fd8b5bd
Author: caishunfeng <[email protected]>
AuthorDate: Thu Sep 15 10:36:34 2022 +0800

    [Cherry-pick][Bug] [Master] WorkflowExecuteRunnable will face a infinite 
loop #11838 #11864 (#11949)
    
    Co-authored-by: Yann Ann <[email protected]>
---
 .../master/event/TaskTimeoutStateEventHandler.java | 25 ++++++++++++++++++----
 .../master/runner/WorkflowExecuteRunnable.java     |  1 -
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
index 240f10ff2c..f2cdff500b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
@@ -28,17 +28,27 @@ import 
org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
 
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.auto.service.AutoService;
 
 @AutoService(StateEventHandler.class)
 public class TaskTimeoutStateEventHandler implements StateEventHandler {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TaskTimeoutStateEventHandler.class);
+
     @Override
     public boolean handleStateEvent(WorkflowExecuteRunnable 
workflowExecuteRunnable, StateEvent stateEvent)
         throws StateEventHandleError {
         TaskMetrics.incTaskTimeout();
         workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
 
-        TaskInstance taskInstance = 
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).get();
+        TaskInstance taskInstance =
+            
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).orElseThrow(
+                () -> new StateEventHandleError(String.format(
+                    "Cannot find the task instance from workflow execute 
runnable, taskInstanceId: %s",
+                    stateEvent.getTaskInstanceId())));
 
         if (TimeoutFlag.CLOSE == 
taskInstance.getTaskDefine().getTimeoutFlag()) {
             return true;
@@ -47,10 +57,17 @@ public class TaskTimeoutStateEventHandler implements 
StateEventHandler {
         Map<Long, ITaskProcessor> activeTaskProcessMap = 
workflowExecuteRunnable.getActiveTaskProcessMap();
         if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
             || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
-            ITaskProcessor taskProcessor = 
activeTaskProcessMap.get(taskInstance.getTaskCode());
-            taskProcessor.action(TaskAction.TIMEOUT);
+            if (activeTaskProcessMap.containsKey(taskInstance.getTaskCode())) {
+                ITaskProcessor taskProcessor = 
activeTaskProcessMap.get(taskInstance.getTaskCode());
+                taskProcessor.action(TaskAction.TIMEOUT);
+            } else {
+                logger.warn(
+                    "cannot find the task processor for task {}, so skip task 
processor action.",
+                    taskInstance.getTaskCode());
+            }
         }
-        if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || 
TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
+        if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy
+            || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
             workflowExecuteRunnable.processTimeout();
             workflowExecuteRunnable.taskTimeout(taskInstance);
         }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 09e5318ca5..a7c669a735 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -288,7 +288,6 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
             } finally {
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
-
         }
     }
 

Reply via email to