This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new dcc9d64ef6 Fix Switch logic task doesn't check the branch exist 
(#15755)
dcc9d64ef6 is described below

commit dcc9d64ef6c5a8353722aa371eb4920d33d6d756
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sun Mar 24 20:52:00 2024 +0800

    Fix Switch logic task doesn't check the branch exist (#15755)
---
 .../runner/task/switchtask/SwitchLogicTask.java    | 116 ++++++++++-----------
 1 file changed, 57 insertions(+), 59 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
index 244926c096..1f52f9287d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
@@ -18,10 +18,8 @@
 package org.apache.dolphinscheduler.server.master.runner.task.switchtask;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
@@ -34,7 +32,6 @@ import 
org.apache.dolphinscheduler.server.master.runner.task.BaseSyncLogicTask;
 import org.apache.dolphinscheduler.server.master.utils.SwitchTaskUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
 import java.util.Map;
@@ -47,7 +44,7 @@ public class SwitchLogicTask extends 
BaseSyncLogicTask<SwitchParameters> {
 
     public static final String TASK_TYPE = "SWITCH";
 
-    private final ProcessInstance processInstance;
+    private final WorkflowExecuteRunnable workflowExecuteRunnable;
     private final TaskInstance taskInstance;
 
     public SwitchLogicTask(TaskExecutionContext taskExecutionContext,
@@ -59,9 +56,8 @@ public class SwitchLogicTask extends 
BaseSyncLogicTask<SwitchParameters> {
                         .orElseThrow(() -> new LogicTaskInitializeException(
                                 "Cannot find the task instance in workflow 
execute runnable"))
                         .getSwitchDependency());
-        WorkflowExecuteRunnable workflowExecuteRunnable =
+        this.workflowExecuteRunnable =
                 
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-        this.processInstance = 
workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance();
         this.taskInstance = 
workflowExecuteRunnable.getTaskInstance(taskExecutionContext.getTaskInstanceId())
                 .orElseThrow(() -> new LogicTaskInitializeException(
                         "Cannot find the task instance in workflow execute 
runnable"));
@@ -69,84 +65,86 @@ public class SwitchLogicTask extends 
BaseSyncLogicTask<SwitchParameters> {
 
     @Override
     public void handle() throws MasterTaskExecuteException {
-        DependResult conditionResult = calculateConditionResult();
-        TaskExecutionStatus status =
-                (conditionResult == DependResult.SUCCESS) ? 
TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
-        log.info("Switch task execute finished, condition result is: {}, task 
status is: {}", conditionResult,
-                status.name());
-        taskExecutionContext.setCurrentExecutionStatus(status);
+        // Calculate the condition result and get the next node
+        if (CollectionUtils.isEmpty(taskParameters.getDependTaskList())) {
+            moveToDefaultBranch();
+        } else {
+            calculateSwitchBranch();
+        }
+        taskInstance.setSwitchDependency(taskParameters);
+        log.info("Switch task execute finished");
+        
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
     }
 
-    // todo: don't use depend result, use switch result
-    private DependResult calculateConditionResult() {
-        DependResult conditionResult = DependResult.SUCCESS;
+    private void moveToDefaultBranch() {
+        checkIfBranchExist(taskParameters.getNextNode());
 
         List<SwitchResultVo> switchResultVos = 
taskParameters.getDependTaskList();
+        switchResultVos.add(new SwitchResultVo(null, 
taskParameters.getNextNode()));
+        taskParameters.setResultConditionLocation(switchResultVos.size() - 1);
 
-        SwitchResultVo switchResultVo = new SwitchResultVo();
-        switchResultVo.setNextNode(taskParameters.getNextNode());
-        switchResultVos.add(switchResultVo);
-        // todo: refactor these calculate code
-        int finalConditionLocation = switchResultVos.size() - 1;
-        int i = 0;
+        log.info("The condition is not satisfied, move to the default branch: 
{}",
+                taskParameters.getNextNode().stream().map(node -> 
workflowExecuteRunnable.getWorkflowExecuteContext()
+                        
.getWorkflowGraph().getDag().getNode(node).getName()).collect(Collectors.toList()));
+    }
 
-        Map<String, Property> globalParams = JSONUtils
-                .toList(processInstance.getGlobalParams(), Property.class)
-                .stream()
-                .collect(Collectors.toMap(Property::getProp, Property -> 
Property));
+    private void calculateSwitchBranch() {
+        List<SwitchResultVo> switchResultVos = 
taskParameters.getDependTaskList();
+        if (CollectionUtils.isEmpty(switchResultVos)) {
+            moveToDefaultBranch();
+        }
+        Map<String, Property> globalParams = 
taskExecutionContext.getPrepareParamsMap();
         Map<String, Property> varParams = JSONUtils
                 .toList(taskInstance.getVarPool(), Property.class)
                 .stream()
                 .collect(Collectors.toMap(Property::getProp, Property -> 
Property));
 
-        for (SwitchResultVo info : switchResultVos) {
-            log.info("Begin to execute {} condition: {} ", (i + 1), 
info.getCondition());
-            if (StringUtils.isEmpty(info.getCondition())) {
-                finalConditionLocation = i;
-                break;
-            }
-            String content =
-                    
SwitchTaskUtils.generateContentWithTaskParams(info.getCondition(), 
globalParams, varParams);
+        int finalConditionLocation = -1;
+        for (int i = 0; i < switchResultVos.size(); i++) {
+            SwitchResultVo switchResultVo = switchResultVos.get(i);
+            log.info("Begin to execute {} condition: {} ", i, 
switchResultVo.getCondition());
+            String content = 
SwitchTaskUtils.generateContentWithTaskParams(switchResultVo.getCondition(), 
globalParams,
+                    varParams);
             log.info("Format condition sentence::{} successfully", content);
-            Boolean result;
+            boolean result;
             try {
                 result = SwitchTaskUtils.evaluate(content);
                 log.info("Execute condition sentence: {} successfully: {}", 
content, result);
+                if (result) {
+                    finalConditionLocation = i;
+                }
             } catch (Exception e) {
                 log.info("Execute condition sentence: {} failed", content, e);
-                conditionResult = DependResult.FAILED;
-                break;
-            }
-            if (result) {
-                finalConditionLocation = i;
-                break;
             }
-            i++;
         }
-        taskParameters.setDependTaskList(switchResultVos);
-        taskParameters.setResultConditionLocation(finalConditionLocation);
-        taskInstance.setSwitchDependency(taskParameters);
-
-        if (!isValidSwitchResult(switchResultVos.get(finalConditionLocation))) 
{
-            conditionResult = DependResult.FAILED;
-            log.error("The switch task depend result is invalid, result:{}, 
switch branch:{}", conditionResult,
-                    finalConditionLocation);
+        if (finalConditionLocation >= 0) {
+            
checkIfBranchExist(switchResultVos.get(finalConditionLocation).getNextNode());
+            log.info("The condition is satisfied, move to the branch: {}",
+                    
switchResultVos.get(finalConditionLocation).getNextNode().stream()
+                            .map(node -> 
workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowGraph().getDag()
+                                    .getNode(node).getName())
+                            .collect(Collectors.toList()));
+            taskParameters.setResultConditionLocation(finalConditionLocation);
+        } else {
+            log.info("All conditions are not satisfied, move to the default 
branch");
+            moveToDefaultBranch();
         }
-
-        log.info("The switch task depend result:{}, switch branch:{}", 
conditionResult, finalConditionLocation);
-        return conditionResult;
     }
 
-    private boolean isValidSwitchResult(SwitchResultVo switchResult) {
-        if (CollectionUtils.isEmpty(switchResult.getNextNode())) {
-            return false;
+    private void checkIfBranchExist(List<Long> branchNode) {
+        if (CollectionUtils.isEmpty(branchNode)) {
+            throw new IllegalArgumentException("The branchNode is empty, 
please check the switch task configuration");
         }
-        for (Long nextNode : switchResult.getNextNode()) {
-            if (nextNode == null) {
-                return false;
+        for (Long branch : branchNode) {
+            if (branch == null) {
+                throw new IllegalArgumentException("The branch is empty, 
please check the switch task configuration");
+            }
+            if 
(!workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowGraph().getDag().containsNode(branch))
 {
+                throw new IllegalArgumentException(
+                        "The branch(code= " + branchNode
+                                + ") is not in the dag, please check the 
switch task configuration");
             }
         }
-        return true;
     }
 
 }

Reply via email to