This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b391b74df4 [fix-#14537] the branch that needs to be executed overlaps
with another branch, it may not be able to complete the normal execution
(#14563)
b391b74df4 is described below
commit b391b74df4baf7b6f90df79df584e391ee799279
Author: fuchanghai <[email protected]>
AuthorDate: Tue Aug 1 11:04:31 2023 +0800
[fix-#14537] the branch that needs to be executed overlaps with another
branch, it may not be able to complete the normal execution (#14563)
---
.../dolphinscheduler/service/utils/DagHelper.java | 55 +++++++++++++++++++---
.../service/utils/DagHelperTest.java | 49 +++++++++++++++----
2 files changed, 89 insertions(+), 15 deletions(-)
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
index b966d93997..ee5e97cf55 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
@@ -414,29 +414,72 @@ public class DagHelper {
return conditionTaskList;
}
- private static List<Long> skipTaskNode4Switch(TaskNode taskNode,
- Map<Long, TaskNode>
skipTaskNodeList,
- Map<Long, TaskInstance>
completeTaskList,
- DAG<Long, TaskNode,
TaskNodeRelation> dag) {
+ public static List<Long> skipTaskNode4Switch(TaskNode taskNode,
+ Map<Long, TaskNode>
skipTaskNodeList,
+ Map<Long, TaskInstance>
completeTaskList,
+ DAG<Long, TaskNode,
TaskNodeRelation> dag) {
SwitchParameters switchParameters =
completeTaskList.get(taskNode.getCode()).getSwitchDependency();
int resultConditionLocation =
switchParameters.getResultConditionLocation();
List<SwitchResultVo> conditionResultVoList =
switchParameters.getDependTaskList();
List<Long> switchTaskList =
conditionResultVoList.get(resultConditionLocation).getNextNode();
+ Set<Long> switchNeedWorkCodes = new HashSet<>();
if (CollectionUtils.isEmpty(switchTaskList)) {
- switchTaskList = new ArrayList<>();
+ return new ArrayList<>();
+ }
+ // get all downstream nodes of the branch that the switch node needs
to execute
+ for (Long switchTaskCode : switchTaskList) {
+ getSwitchNeedWorkCodes(switchTaskCode, dag, switchNeedWorkCodes);
}
// conditionResultVoList.remove(resultConditionLocation);
for (SwitchResultVo info : conditionResultVoList) {
if (CollectionUtils.isEmpty(info.getNextNode())) {
continue;
}
- setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList,
skipTaskNodeList);
+ for (Long nextNode : info.getNextNode()) {
+ setSwitchTaskNodeSkip(nextNode, dag, completeTaskList,
skipTaskNodeList,
+ switchNeedWorkCodes);
+ }
}
return switchTaskList;
}
+ /**
+ * get all downstream nodes of the branch that the switch node needs to
execute
+ * @param taskCode
+ * @param dag
+ * @param switchNeedWorkCodes
+ */
+ public static void getSwitchNeedWorkCodes(Long taskCode, DAG<Long,
TaskNode, TaskNodeRelation> dag,
+ Set<Long> switchNeedWorkCodes) {
+ switchNeedWorkCodes.add(taskCode);
+ Set<Long> subsequentNodes = dag.getSubsequentNodes(taskCode);
+ if
(org.apache.commons.collections.CollectionUtils.isNotEmpty(subsequentNodes)) {
+ for (Long subCode : subsequentNodes) {
+ getSwitchNeedWorkCodes(subCode, dag, switchNeedWorkCodes);
+ }
+ }
+ }
+
+ private static void setSwitchTaskNodeSkip(Long skipNodeCode,
+ DAG<Long, TaskNode,
TaskNodeRelation> dag,
+ Map<Long, TaskInstance>
completeTaskList,
+ Map<Long, TaskNode>
skipTaskNodeList,
+ Set<Long> switchNeedWorkCodes) {
+ // ignore when the node that needs to be skipped exists on the branch
that the switch type node needs to execute
+ if (!dag.containsNode(skipNodeCode) ||
switchNeedWorkCodes.contains(skipNodeCode)) {
+ return;
+ }
+ skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode));
+ Collection<Long> postNodeList = dag.getSubsequentNodes(skipNodeCode);
+ for (Long post : postNodeList) {
+ TaskNode postNode = dag.getNode(post);
+ if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) {
+ setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList);
+ }
+ }
+ }
/**
* set task node and the post nodes skip flag
*/
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
index 85c9296248..c19812303c 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
@@ -330,6 +330,24 @@ public class DagHelperTest {
Assertions.assertEquals(1, postNodes.size());
}
+ @Test
+ public void testSwitchPostNode() throws IOException {
+ DAG<Long, TaskNode, TaskNodeRelation> dag = generateDag2();
+ Map<Long, TaskNode> skipTaskNodeList = new HashMap<>();
+ Map<Long, TaskInstance> completeTaskList = new HashMap<>();
+ completeTaskList.put(0l, new TaskInstance());
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setState(TaskExecutionStatus.SUCCESS);
+ taskInstance.setTaskCode(1l);
+ Map<String, Object> taskParamsMap = new HashMap<>();
+ taskParamsMap.put(Constants.SWITCH_RESULT, "");
+ taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
+ taskInstance.setSwitchDependency(getSwitchNode());
+ completeTaskList.put(1l, taskInstance);
+ DagHelper.skipTaskNode4Switch(dag.getNode(1l), skipTaskNodeList,
completeTaskList, dag);
+ Assertions.assertNotNull(skipTaskNodeList.get(2L));
+ Assertions.assertEquals(1, skipTaskNodeList.size());
+ }
/**
* process:
* 1->2->3->5->7
@@ -436,11 +454,13 @@ public class DagHelperTest {
/**
* DAG graph:
- * 2
- * ↑
- * 0->1(switch)
- * ↓
- * 4
+ * -> 2->
+ * / \
+ * / \
+ * 0->1(switch)->5 6
+ * \ /
+ * \ /
+ * -> 4->
*
* @return dag
* @throws JsonProcessingException if error throws JsonProcessingException
@@ -484,15 +504,26 @@ public class DagHelperTest {
taskNodeList.add(node4);
TaskNode node5 = new TaskNode();
- node5.setId("4");
- node5.setName("4");
- node5.setCode(4);
+ node5.setId("5");
+ node5.setName("5");
+ node5.setCode(5);
node5.setType("SHELL");
List<Long> dep5 = new ArrayList<>();
dep5.add(1L);
node5.setPreTasks(JSONUtils.toJsonString(dep5));
taskNodeList.add(node5);
+ TaskNode node6 = new TaskNode();
+ node5.setId("6");
+ node5.setName("6");
+ node5.setCode(6);
+ node5.setType("SHELL");
+ List<Long> dep6 = new ArrayList<>();
+ dep5.add(2L);
+ dep5.add(4L);
+ node5.setPreTasks(JSONUtils.toJsonString(dep6));
+ taskNodeList.add(node6);
+
List<Long> startNodes = new ArrayList<>();
List<Long> recoveryNodes = new ArrayList<>();
List<TaskNode> destTaskNodeList =
DagHelper.generateFlowNodeListByStartNode(taskNodeList,
@@ -518,7 +549,7 @@ public class DagHelperTest {
conditionsParameters.setDependTaskList(list);
conditionsParameters.setNextNode(5L);
conditionsParameters.setRelation("AND");
-
+ conditionsParameters.setResultConditionLocation(1);
// in: AND(AND(1 is SUCCESS))
return conditionsParameters;
}