This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.1-release
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.1-release by this push:
new 79ab272 [cherry-pick-2.0.1][Bug-7319][MasterServer] fix NPE when
switch else branch is empty (#7314)
79ab272 is described below
commit 79ab27299ee9511170fb2f8f1aa8c8f911a58c8c
Author: wind <[email protected]>
AuthorDate: Fri Dec 10 16:17:28 2021 +0800
[cherry-pick-2.0.1][Bug-7319][MasterServer] fix NPE when switch else branch
is empty (#7314)
* fix taskNode NPE
* fix switch node NPE when no else branch
Co-authored-by: caishunfeng <[email protected]>
---
.../dolphinscheduler/dao/utils/DagHelper.java | 14 +++++--------
.../master/runner/task/SwitchTaskProcessor.java | 24 +++++++++++++++++++++-
2 files changed, 28 insertions(+), 10 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index d419505..154c584 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -50,7 +50,6 @@ public class DagHelper {
private static final Logger logger =
LoggerFactory.getLogger(DagHelper.class);
-
/**
* generate flow node relation list by task node list;
* Edges that are not in the task Node List will not be added to the result
@@ -110,7 +109,7 @@ public class DagHelper {
List<TaskNode> childNodeList = new ArrayList<>();
if (startNode == null) {
logger.error("start node name [{}] is not in task node
list [{}] ",
- startNodeCode,
+ startNodeCode,
taskNodeList
);
continue;
@@ -135,7 +134,6 @@ public class DagHelper {
return destTaskNodeList;
}
-
/**
* find all the nodes that depended on the start node
*
@@ -160,7 +158,6 @@ public class DagHelper {
return resultList;
}
-
/**
* find all nodes that start nodes depend on.
*
@@ -310,6 +307,10 @@ public class DagHelper {
}
for (String subsequent : startVertexes) {
TaskNode taskNode = dag.getNode(subsequent);
+ if (taskNode == null) {
+ logger.error("taskNode {} is null, please check dag",
subsequent);
+ continue;
+ }
if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) {
setTaskNodeSkip(subsequent, dag, completeTaskList,
skipTaskNodeList);
continue;
@@ -343,7 +344,6 @@ public class DagHelper {
return true;
}
-
/**
* parse condition task find the branch process
* set skip flag for another one.
@@ -382,9 +382,6 @@ public class DagHelper {
/**
* parse condition task find the branch process
* set skip flag for another one.
- *
- * @param nodeCode
- * @return
*/
public static List<String> parseSwitchTask(String nodeCode,
Map<String, TaskNode>
skipTaskNodeList,
@@ -443,7 +440,6 @@ public class DagHelper {
}
}
-
/***
* build dag graph
* @param processDag processDag
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index bcc33ab..8e9316f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.Date;
@@ -171,7 +172,13 @@ public class SwitchTaskProcessor extends BaseTaskProcessor
{
switchParameters.setResultConditionLocation(finalConditionLocation);
taskInstance.setSwitchDependency(switchParameters);
- logger.info("the switch task depend result : {}", conditionResult);
+ if (!isValidSwitchResult(switchResultVos.get(finalConditionLocation)))
{
+ conditionResult = DependResult.FAILED;
+ logger.error("the switch task depend result is invalid, result:{},
switch branch:{}", conditionResult, finalConditionLocation);
+ return true;
+ }
+
+ logger.info("the switch task depend result:{}, switch branch:{}",
conditionResult, finalConditionLocation);
return true;
}
@@ -216,4 +223,19 @@ public class SwitchTaskProcessor extends BaseTaskProcessor
{
return content;
}
+ /**
+ * check whether switch result is valid
+ */
+ private boolean isValidSwitchResult(SwitchResultVo switchResult) {
+ if (CollectionUtils.isEmpty(switchResult.getNextNode())) {
+ return false;
+ }
+ for (String nextNode : switchResult.getNextNode()) {
+ if (StringUtils.isEmpty(nextNode)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
}