This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 35608becac086348712641bd6ccf1f2ae0c0fa8e
Author: Yann Ann <[email protected]>
AuthorDate: Wed Oct 19 09:36:47 2022 +0800

    make sure all failed task will save in errorTaskMap (#12424)
---
 .../master/runner/WorkflowExecuteRunnable.java     | 23 ++++++++--------------
 1 file changed, 8 insertions(+), 15 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 9d6eef3259..a6d7aa134a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -392,13 +392,13 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
                 retryTaskInstance(taskInstance);
             } else if (taskInstance.getState().isFailure()) {
                 completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
+                errorTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
                 // There are child nodes and the failure policy is: CONTINUE
                 if (processInstance.getFailureStrategy() == 
FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
                         Long.toString(taskInstance.getTaskCode()),
                         dag)) {
                     submitPostNode(Long.toString(taskInstance.getTaskCode()));
                 } else {
-                    errorTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
                     if (processInstance.getFailureStrategy() == 
FailureStrategy.END) {
                         killAllTasks();
                     }
@@ -422,7 +422,6 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
     /**
      * release task group
      *
-     * @param taskInstance
      */
     public void releaseTaskGroup(TaskInstance taskInstance) {
         logger.info("Release task group");
@@ -449,7 +448,6 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
     /**
      * crate new task instance to retry, different objects from the original
      *
-     * @param taskInstance
      */
     private void retryTaskInstance(TaskInstance taskInstance) throws 
StateEventHandleException {
         if (!taskInstance.taskCanRetry()) {
@@ -662,10 +660,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
     }
 
     private boolean needComplementProcess() {
-        if (processInstance.isComplementData() && Flag.NO == 
processInstance.getIsSubProcess()) {
-            return true;
-        }
-        return false;
+        return processInstance.isComplementData() && Flag.NO == 
processInstance.getIsSubProcess();
     }
 
     /**
@@ -1069,7 +1064,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
     /**
      * clone a new taskInstance for retry and reset some logic fields
      *
-     * @return
+     * @return taskInstance
      */
     public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
         TaskNode taskNode = 
dag.getNode(Long.toString(taskInstance.getTaskCode()));
@@ -1091,7 +1086,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
     /**
      * clone a new taskInstance for tolerant and reset some logic fields
      *
-     * @return
+     * @return taskInstance
      */
     public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
         TaskNode taskNode = 
dag.getNode(Long.toString(taskInstance.getTaskCode()));
@@ -1111,9 +1106,9 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
     /**
      * new a taskInstance
      *
-     * @param processInstance
-     * @param taskNode
-     * @return
+     * @param processInstance process instance
+     * @param taskNode task node
+     * @return task instance
      */
     public TaskInstance newTaskInstance(ProcessInstance processInstance, 
TaskNode taskNode) {
         TaskInstance taskInstance = new TaskInstance();
@@ -1431,9 +1426,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
             long taskCode = Long.parseLong(dependNodeName);
             Integer taskInstanceId = completeTaskMap.get(taskCode);
             TaskExecutionStatus depTaskState = 
taskInstanceMap.get(taskInstanceId).getState();
-            if (depTaskState.isFailure()) {
-                return false;
-            }
+            return !depTaskState.isFailure();
         }
         return true;
     }

Reply via email to