This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 99b8ec6  [Bug-7319][MasterServer] fix taskNode NPE when switch else 
branch is empty (#7320)
99b8ec6 is described below

commit 99b8ec649213270df91b5a2808a26868c675a0d3
Author: wind <[email protected]>
AuthorDate: Mon Dec 13 22:46:48 2021 +0800

    [Bug-7319][MasterServer] fix taskNode NPE when switch else branch is empty 
(#7320)
    
    Co-authored-by: caishunfeng <[email protected]>
---
 .../dolphinscheduler/dao/utils/DagHelper.java      |  9 ++++----
 .../master/runner/task/SwitchTaskProcessor.java    | 25 ++++++++++++++++++++--
 2 files changed, 27 insertions(+), 7 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index d419505..e7404fe 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -50,7 +50,6 @@ public class DagHelper {
 
     private static final Logger logger = 
LoggerFactory.getLogger(DagHelper.class);
 
-
     /**
      * generate flow node relation list by task node list;
      * Edges that are not in the task Node List will not be added to the result
@@ -135,7 +134,6 @@ public class DagHelper {
         return destTaskNodeList;
     }
 
-
     /**
      * find all the nodes that depended on the start node
      *
@@ -160,7 +158,6 @@ public class DagHelper {
         return resultList;
     }
 
-
     /**
      * find all nodes that start nodes depend on.
      *
@@ -310,6 +307,10 @@ public class DagHelper {
         }
         for (String subsequent : startVertexes) {
             TaskNode taskNode = dag.getNode(subsequent);
+            if (taskNode == null) {
+                logger.error("taskNode {} is null, please check dag", 
subsequent);
+                continue;
+            }
             if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) {
                 setTaskNodeSkip(subsequent, dag, completeTaskList, 
skipTaskNodeList);
                 continue;
@@ -343,7 +344,6 @@ public class DagHelper {
         return true;
     }
 
-
     /**
      * parse condition task find the branch process
      * set skip flag for another one.
@@ -443,7 +443,6 @@ public class DagHelper {
         }
     }
 
-
     /***
      * build dag graph
      * @param processDag processDag
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 5378649..0fa6e5d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -32,6 +32,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 
 import java.util.Date;
@@ -75,7 +76,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
         taskDefinition = processService.findTaskDefinition(
                 taskInstance.getTaskCode(), 
taskInstance.getTaskDefinitionVersion()
         );
-        
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),processInstance.getProcessDefinitionCode(),
+        
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
 processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion(),
                 taskInstance.getProcessInstanceId(),
                 taskInstance.getId()));
@@ -176,7 +177,13 @@ public class SwitchTaskProcessor extends BaseTaskProcessor 
{
         switchParameters.setResultConditionLocation(finalConditionLocation);
         taskInstance.setSwitchDependency(switchParameters);
 
-        logger.info("the switch task depend result : {}", conditionResult);
+        if (!isValidSwitchResult(switchResultVos.get(finalConditionLocation))) 
{
+            conditionResult = DependResult.FAILED;
+            logger.error("the switch task depend result is invalid, result:{}, 
switch branch:{}", conditionResult, finalConditionLocation);
+            return true;
+        }
+
+        logger.info("the switch task depend result:{}, switch branch:{}", 
conditionResult, finalConditionLocation);
         return true;
     }
 
@@ -221,4 +228,18 @@ public class SwitchTaskProcessor extends BaseTaskProcessor 
{
         return content;
     }
 
+    /**
+     * check whether switch result is valid
+     */
+    private boolean isValidSwitchResult(SwitchResultVo switchResult) {
+        if (CollectionUtils.isEmpty(switchResult.getNextNode())) {
+            return false;
+        }
+        for (String nextNode : switchResult.getNextNode()) {
+            if (StringUtils.isEmpty(nextNode)) {
+                return false;
+            }
+        }
+        return true;
+    }
 }

Reply via email to