This is an automated email from the ASF dual-hosted git repository. caishunfeng pushed a commit to branch 3.1.0-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 780a509f673359c0a5c408e4f20cfc35fea024d3 Author: caishunfeng <[email protected]> AuthorDate: Thu Sep 15 09:06:10 2022 +0800 fix workflow keep running when task fail (#11930) --- .../server/master/runner/WorkflowExecuteRunnable.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 9e90b8c8a0..9d6eef3259 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -116,6 +116,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.BeanUtils; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * Workflow execute task, used to execute a workflow instance. @@ -180,9 +181,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>(); /** - * depend failed task map, taskCode as key, taskId as value + * depend failed task set */ - private final Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>(); + private final Set<Long> dependFailedTaskSet = Sets.newConcurrentHashSet(); /** * forbidden task map, code as key @@ -804,7 +805,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { taskFailedSubmit = false; activeTaskProcessorMaps.clear(); - dependFailedTaskMap.clear(); + dependFailedTaskSet.clear(); completeTaskMap.clear(); errorTaskMap.clear(); @@ -904,8 +905,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { } } } - logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}", - dependFailedTaskMap, + logger.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}", + dependFailedTaskSet, completeTaskMap, errorTaskMap); } @@ -1484,7 +1485,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { if (this.errorTaskMap.size() > 0) { return true; } - return this.dependFailedTaskMap.size() > 0; + return this.dependFailedTaskSet.size() > 0; } /** @@ -1835,7 +1836,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { } } else if (DependResult.FAILED == dependResult) { // if the dependency fails, the current node is not submitted and the state changes to failure. - dependFailedTaskMap.put(task.getTaskCode(), task.getId()); + dependFailedTaskSet.add(task.getTaskCode()); removeTaskFromStandbyList(task); logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
