This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.1-prepare by this push:
new 43d9c00259 [Cherry-pick][Bug] [Master] WorkflowExecuteRunnable will
face a infinite loop #11838 #11864 (#11949)
43d9c00259 is described below
commit 43d9c00259c32020bc442a0f436ed12a2fd8b5bd
Author: caishunfeng <[email protected]>
AuthorDate: Thu Sep 15 10:36:34 2022 +0800
[Cherry-pick][Bug] [Master] WorkflowExecuteRunnable will face a infinite
loop #11838 #11864 (#11949)
Co-authored-by: Yann Ann <[email protected]>
---
.../master/event/TaskTimeoutStateEventHandler.java | 25 ++++++++++++++++++----
.../master/runner/WorkflowExecuteRunnable.java | 1 -
2 files changed, 21 insertions(+), 5 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
index 240f10ff2c..f2cdff500b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
@@ -28,17 +28,27 @@ import
org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.auto.service.AutoService;
@AutoService(StateEventHandler.class)
public class TaskTimeoutStateEventHandler implements StateEventHandler {
+
+ private static final Logger logger =
LoggerFactory.getLogger(TaskTimeoutStateEventHandler.class);
+
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable
workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleError {
TaskMetrics.incTaskTimeout();
workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
- TaskInstance taskInstance =
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).get();
+ TaskInstance taskInstance =
+
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).orElseThrow(
+ () -> new StateEventHandleError(String.format(
+ "Cannot find the task instance from workflow execute
runnable, taskInstanceId: %s",
+ stateEvent.getTaskInstanceId())));
if (TimeoutFlag.CLOSE ==
taskInstance.getTaskDefine().getTimeoutFlag()) {
return true;
@@ -47,10 +57,17 @@ public class TaskTimeoutStateEventHandler implements
StateEventHandler {
Map<Long, ITaskProcessor> activeTaskProcessMap =
workflowExecuteRunnable.getActiveTaskProcessMap();
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
|| TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
- ITaskProcessor taskProcessor =
activeTaskProcessMap.get(taskInstance.getTaskCode());
- taskProcessor.action(TaskAction.TIMEOUT);
+ if (activeTaskProcessMap.containsKey(taskInstance.getTaskCode())) {
+ ITaskProcessor taskProcessor =
activeTaskProcessMap.get(taskInstance.getTaskCode());
+ taskProcessor.action(TaskAction.TIMEOUT);
+ } else {
+ logger.warn(
+ "cannot find the task processor for task {}, so skip task
processor action.",
+ taskInstance.getTaskCode());
+ }
}
- if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy ||
TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
+ if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy
+ || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
workflowExecuteRunnable.processTimeout();
workflowExecuteRunnable.taskTimeout(taskInstance);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 09e5318ca5..a7c669a735 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -288,7 +288,6 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
-
}
}