This is an automated email from the ASF dual-hosted git repository. zhongjiajie pushed a commit to branch 3.0.3-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit f677d3311df8ce7200999ad1f13c1c3cfdf3b619 Author: Yann Ann <[email protected]> AuthorDate: Wed Oct 19 09:36:47 2022 +0800 make sure all failed task will save in errorTaskMap (#12424) (cherry picked from commit 38b643f69b65f4de9dd43809404470934bfadc7b) --- .../server/master/runner/WorkflowExecuteRunnable.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index f3f84408a6..5c0fec018d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -375,13 +375,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { retryTaskInstance(taskInstance); } else if (taskInstance.getState().typeIsFailure()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); // There are child nodes and the failure policy is: CONTINUE if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode( Long.toString(taskInstance.getTaskCode()), dag)) { submitPostNode(Long.toString(taskInstance.getTaskCode())); } else { - errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); if (processInstance.getFailureStrategy() == FailureStrategy.END) { killAllTasks(); } @@ -405,7 +405,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { /** * release task group * - * @param taskInstance */ public void releaseTaskGroup(TaskInstance taskInstance) { logger.info("Release task group"); @@ -431,7 +430,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { /** * crate new task instance to retry, different objects from the original * - * @param taskInstance */ private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException { if (!taskInstance.taskCanRetry()) { @@ -646,10 +644,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { } private boolean needComplementProcess() { - if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) { - return true; - } - return false; + return processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess(); } /** @@ -1022,7 +1017,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { /** * clone a new taskInstance for retry and reset some logic fields * - * @return + * @return taskInstance */ public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) { TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); @@ -1044,7 +1039,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { /** * clone a new taskInstance for tolerant and reset some logic fields * - * @return + * @return taskInstance */ public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) { TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); @@ -1064,9 +1059,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { /** * new a taskInstance * - * @param processInstance - * @param taskNode - * @return + * @param processInstance process instance + * @param taskNode task node + * @return task instance */ public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) { TaskInstance taskInstance = new TaskInstance();
