This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.8-release
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.8-release by this push:
     new ef90774669 [3.1.8] [fix-#14537] the branch that needs to be executed 
overlaps with another branch, it may not be able to complete the normal 
execution #14563 (#14689)
ef90774669 is described below

commit ef9077466921ab599781caa1c1f4a0b4e739d28e
Author: Kerwin <[email protected]>
AuthorDate: Wed Aug 2 19:59:13 2023 +0800

    [3.1.8] [fix-#14537] the branch that needs to be executed overlaps with 
another branch, it may not be able to complete the normal execution #14563 
(#14689)
    
    Co-authored-by: fuchanghai <[email protected]>
---
 .../dolphinscheduler/service/utils/DagHelper.java  | 56 ++++++++++++++++++++--
 .../service/utils/DagHelperTest.java               | 49 +++++++++++++++----
 2 files changed, 91 insertions(+), 14 deletions(-)

diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
index 5365d1f719..6a9619c02c 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
@@ -413,28 +413,74 @@ public class DagHelper {
         return conditionTaskList;
     }
 
-    private static List<String> skipTaskNode4Switch(TaskNode taskNode, 
Map<String, TaskNode> skipTaskNodeList,
-                                                    Map<String, TaskInstance> 
completeTaskList,
-                                                    DAG<String, TaskNode, 
TaskNodeRelation> dag) {
+
+    public static List<String> skipTaskNode4Switch(TaskNode taskNode,
+                                                 Map<String, TaskNode> 
skipTaskNodeList,
+                                                 Map<String, TaskInstance> 
completeTaskList,
+                                                 DAG<String, TaskNode, 
TaskNodeRelation> dag) {
 
         SwitchParameters switchParameters =
                 
completeTaskList.get(Long.toString(taskNode.getCode())).getSwitchDependency();
         int resultConditionLocation = 
switchParameters.getResultConditionLocation();
         List<SwitchResultVo> conditionResultVoList = 
switchParameters.getDependTaskList();
+
         List<String> switchTaskList = 
conditionResultVoList.get(resultConditionLocation).getNextNode();
+        Set<String> switchNeedWorkCodes = new HashSet<>();
         if (CollectionUtils.isEmpty(switchTaskList)) {
-            switchTaskList = new ArrayList<>();
+            return new ArrayList<>();
+        }
+        // get all downstream nodes of the branch that the switch node needs 
to execute
+        for (String switchTaskCode : switchTaskList) {
+            getSwitchNeedWorkCodes(switchTaskCode, dag, switchNeedWorkCodes);
         }
         conditionResultVoList.remove(resultConditionLocation);
         for (SwitchResultVo info : conditionResultVoList) {
             if (CollectionUtils.isEmpty(info.getNextNode())) {
                 continue;
             }
-            setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList, 
skipTaskNodeList);
+            for (String nextNode : info.getNextNode()) {
+                setSwitchTaskNodeSkip(nextNode, dag, completeTaskList, 
skipTaskNodeList,
+                        switchNeedWorkCodes);
+            }
         }
         return switchTaskList;
     }
 
+    /**
+     * get all downstream nodes of the branch that the switch node needs to 
execute
+     * @param taskCode
+     * @param dag
+     * @param switchNeedWorkCodes
+     */
+    public static void getSwitchNeedWorkCodes(String taskCode, DAG<String, 
TaskNode, TaskNodeRelation> dag,
+                                              Set<String> switchNeedWorkCodes) 
{
+        switchNeedWorkCodes.add(taskCode);
+        Set<String> subsequentNodes = dag.getSubsequentNodes(taskCode);
+        if 
(org.apache.commons.collections.CollectionUtils.isNotEmpty(subsequentNodes)) {
+            for (String subCode : subsequentNodes) {
+                getSwitchNeedWorkCodes(subCode, dag, switchNeedWorkCodes);
+            }
+        }
+    }
+
+    private static void setSwitchTaskNodeSkip(String skipNodeCode,
+                                              DAG<String, TaskNode, 
TaskNodeRelation> dag,
+                                              Map<String, TaskInstance> 
completeTaskList,
+                                              Map<String, TaskNode> 
skipTaskNodeList,
+                                              Set<String> switchNeedWorkCodes) 
{
+        // ignore when the node that needs to be skipped exists on the branch 
that the switch type node needs to execute
+        if (!dag.containsNode(skipNodeCode) || 
switchNeedWorkCodes.contains(skipNodeCode)) {
+            return;
+        }
+        skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode));
+        Collection<String> postNodeList = dag.getSubsequentNodes(skipNodeCode);
+        for (String post : postNodeList) {
+            TaskNode postNode = dag.getNode(post);
+            if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) {
+                setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList);
+            }
+        }
+    }
     /**
      * set task node and the post nodes skip flag
      */
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
index 4257cd9704..e121bdb8d6 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
@@ -330,6 +330,24 @@ public class DagHelperTest {
         Assert.assertEquals(1, postNodes.size());
     }
 
+    @Test
+    public void testSwitchPostNode() throws IOException {
+        DAG<String, TaskNode, TaskNodeRelation> dag = generateDag2();
+        Map<String, TaskNode> skipTaskNodeList = new HashMap<>();
+        Map<String, TaskInstance> completeTaskList = new HashMap<>();
+        completeTaskList.put("0", new TaskInstance());
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setState(TaskExecutionStatus.SUCCESS);
+        taskInstance.setTaskCode(1);
+        Map<String, Object> taskParamsMap = new HashMap<>();
+        taskParamsMap.put(Constants.SWITCH_RESULT, "");
+        taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
+        taskInstance.setSwitchDependency(getSwitchNode());
+        completeTaskList.put("1", taskInstance);
+        DagHelper.skipTaskNode4Switch(dag.getNode("1"), skipTaskNodeList, 
completeTaskList, dag);
+        Assert.assertNotNull(skipTaskNodeList.get("2"));
+        Assert.assertEquals(1, skipTaskNodeList.size());
+    }
     /**
      * process:
      * 1->2->3->5->7
@@ -436,11 +454,13 @@ public class DagHelperTest {
 
     /**
      * DAG graph:
-     *    2
-     *    ↑
-     * 0->1(switch)
-     *    ↓
-     *    4
+     *           -> 2->
+     *         /        \
+     *      /               \
+     * 0->1(switch)->5          6
+     *      \               /
+     *        \         /
+     *          -> 4->
      *
      * @return dag
      * @throws JsonProcessingException if error throws JsonProcessingException
@@ -484,15 +504,26 @@ public class DagHelperTest {
         taskNodeList.add(node4);
 
         TaskNode node5 = new TaskNode();
-        node5.setId("4");
-        node5.setName("4");
-        node5.setCode(4);
+        node5.setId("5");
+        node5.setName("5");
+        node5.setCode(5);
         node5.setType("SHELL");
         List<String> dep5 = new ArrayList<>();
         dep5.add("1");
         node5.setPreTasks(JSONUtils.toJsonString(dep5));
         taskNodeList.add(node5);
 
+        TaskNode node6 = new TaskNode();
+        node5.setId("6");
+        node5.setName("6");
+        node5.setCode(6);
+        node5.setType("SHELL");
+        List<String> dep6 = new ArrayList<>();
+        dep5.add("2");
+        dep5.add("4");
+        node5.setPreTasks(JSONUtils.toJsonString(dep6));
+        taskNodeList.add(node6);
+
         List<String> startNodes = new ArrayList<>();
         List<String> recoveryNodes = new ArrayList<>();
         List<TaskNode> destTaskNodeList = 
DagHelper.generateFlowNodeListByStartNode(taskNodeList,
@@ -518,7 +549,7 @@ public class DagHelperTest {
         conditionsParameters.setDependTaskList(list);
         conditionsParameters.setNextNode("5");
         conditionsParameters.setRelation("AND");
-
+        conditionsParameters.setResultConditionLocation(1);
         // in: AND(AND(1 is SUCCESS))
         return conditionsParameters;
     }

Reply via email to