This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch 3.1.8-release
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.8-release by this push:
new ef90774669 [3.1.8] [fix-#14537] the branch that needs to be executed
overlaps with another branch, it may not be able to complete the normal
execution #14563 (#14689)
ef90774669 is described below
commit ef9077466921ab599781caa1c1f4a0b4e739d28e
Author: Kerwin <[email protected]>
AuthorDate: Wed Aug 2 19:59:13 2023 +0800
[3.1.8] [fix-#14537] the branch that needs to be executed overlaps with
another branch, it may not be able to complete the normal execution #14563
(#14689)
Co-authored-by: fuchanghai <[email protected]>
---
.../dolphinscheduler/service/utils/DagHelper.java | 56 ++++++++++++++++++++--
.../service/utils/DagHelperTest.java | 49 +++++++++++++++----
2 files changed, 91 insertions(+), 14 deletions(-)
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
index 5365d1f719..6a9619c02c 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
@@ -413,28 +413,74 @@ public class DagHelper {
return conditionTaskList;
}
- private static List<String> skipTaskNode4Switch(TaskNode taskNode,
Map<String, TaskNode> skipTaskNodeList,
- Map<String, TaskInstance>
completeTaskList,
- DAG<String, TaskNode,
TaskNodeRelation> dag) {
+
+ public static List<String> skipTaskNode4Switch(TaskNode taskNode,
+ Map<String, TaskNode>
skipTaskNodeList,
+ Map<String, TaskInstance>
completeTaskList,
+ DAG<String, TaskNode,
TaskNodeRelation> dag) {
SwitchParameters switchParameters =
completeTaskList.get(Long.toString(taskNode.getCode())).getSwitchDependency();
int resultConditionLocation =
switchParameters.getResultConditionLocation();
List<SwitchResultVo> conditionResultVoList =
switchParameters.getDependTaskList();
+
List<String> switchTaskList =
conditionResultVoList.get(resultConditionLocation).getNextNode();
+ Set<String> switchNeedWorkCodes = new HashSet<>();
if (CollectionUtils.isEmpty(switchTaskList)) {
- switchTaskList = new ArrayList<>();
+ return new ArrayList<>();
+ }
+ // get all downstream nodes of the branch that the switch node needs
to execute
+ for (String switchTaskCode : switchTaskList) {
+ getSwitchNeedWorkCodes(switchTaskCode, dag, switchNeedWorkCodes);
}
conditionResultVoList.remove(resultConditionLocation);
for (SwitchResultVo info : conditionResultVoList) {
if (CollectionUtils.isEmpty(info.getNextNode())) {
continue;
}
- setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList,
skipTaskNodeList);
+ for (String nextNode : info.getNextNode()) {
+ setSwitchTaskNodeSkip(nextNode, dag, completeTaskList,
skipTaskNodeList,
+ switchNeedWorkCodes);
+ }
}
return switchTaskList;
}
+ /**
+ * get all downstream nodes of the branch that the switch node needs to
execute
+ * @param taskCode
+ * @param dag
+ * @param switchNeedWorkCodes
+ */
+ public static void getSwitchNeedWorkCodes(String taskCode, DAG<String,
TaskNode, TaskNodeRelation> dag,
+ Set<String> switchNeedWorkCodes)
{
+ switchNeedWorkCodes.add(taskCode);
+ Set<String> subsequentNodes = dag.getSubsequentNodes(taskCode);
+ if
(org.apache.commons.collections.CollectionUtils.isNotEmpty(subsequentNodes)) {
+ for (String subCode : subsequentNodes) {
+ getSwitchNeedWorkCodes(subCode, dag, switchNeedWorkCodes);
+ }
+ }
+ }
+
+ private static void setSwitchTaskNodeSkip(String skipNodeCode,
+ DAG<String, TaskNode,
TaskNodeRelation> dag,
+ Map<String, TaskInstance>
completeTaskList,
+ Map<String, TaskNode>
skipTaskNodeList,
+ Set<String> switchNeedWorkCodes)
{
+ // ignore when the node that needs to be skipped exists on the branch
that the switch type node needs to execute
+ if (!dag.containsNode(skipNodeCode) ||
switchNeedWorkCodes.contains(skipNodeCode)) {
+ return;
+ }
+ skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode));
+ Collection<String> postNodeList = dag.getSubsequentNodes(skipNodeCode);
+ for (String post : postNodeList) {
+ TaskNode postNode = dag.getNode(post);
+ if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) {
+ setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList);
+ }
+ }
+ }
/**
* set task node and the post nodes skip flag
*/
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
index 4257cd9704..e121bdb8d6 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
@@ -330,6 +330,24 @@ public class DagHelperTest {
Assert.assertEquals(1, postNodes.size());
}
+ @Test
+ public void testSwitchPostNode() throws IOException {
+ DAG<String, TaskNode, TaskNodeRelation> dag = generateDag2();
+ Map<String, TaskNode> skipTaskNodeList = new HashMap<>();
+ Map<String, TaskInstance> completeTaskList = new HashMap<>();
+ completeTaskList.put("0", new TaskInstance());
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setState(TaskExecutionStatus.SUCCESS);
+ taskInstance.setTaskCode(1);
+ Map<String, Object> taskParamsMap = new HashMap<>();
+ taskParamsMap.put(Constants.SWITCH_RESULT, "");
+ taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
+ taskInstance.setSwitchDependency(getSwitchNode());
+ completeTaskList.put("1", taskInstance);
+ DagHelper.skipTaskNode4Switch(dag.getNode("1"), skipTaskNodeList,
completeTaskList, dag);
+ Assert.assertNotNull(skipTaskNodeList.get("2"));
+ Assert.assertEquals(1, skipTaskNodeList.size());
+ }
/**
* process:
* 1->2->3->5->7
@@ -436,11 +454,13 @@ public class DagHelperTest {
/**
* DAG graph:
- * 2
- * ↑
- * 0->1(switch)
- * ↓
- * 4
+ * -> 2->
+ * / \
+ * / \
+ * 0->1(switch)->5 6
+ * \ /
+ * \ /
+ * -> 4->
*
* @return dag
* @throws JsonProcessingException if error throws JsonProcessingException
@@ -484,15 +504,26 @@ public class DagHelperTest {
taskNodeList.add(node4);
TaskNode node5 = new TaskNode();
- node5.setId("4");
- node5.setName("4");
- node5.setCode(4);
+ node5.setId("5");
+ node5.setName("5");
+ node5.setCode(5);
node5.setType("SHELL");
List<String> dep5 = new ArrayList<>();
dep5.add("1");
node5.setPreTasks(JSONUtils.toJsonString(dep5));
taskNodeList.add(node5);
+ TaskNode node6 = new TaskNode();
+ node5.setId("6");
+ node5.setName("6");
+ node5.setCode(6);
+ node5.setType("SHELL");
+ List<String> dep6 = new ArrayList<>();
+ dep5.add("2");
+ dep5.add("4");
+ node5.setPreTasks(JSONUtils.toJsonString(dep6));
+ taskNodeList.add(node6);
+
List<String> startNodes = new ArrayList<>();
List<String> recoveryNodes = new ArrayList<>();
List<TaskNode> destTaskNodeList =
DagHelper.generateFlowNodeListByStartNode(taskNodeList,
@@ -518,7 +549,7 @@ public class DagHelperTest {
conditionsParameters.setDependTaskList(list);
conditionsParameters.setNextNode("5");
conditionsParameters.setRelation("AND");
-
+ conditionsParameters.setResultConditionLocation(1);
// in: AND(AND(1 is SUCCESS))
return conditionsParameters;
}