This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.2.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.2.2-prepare by this push:
     new 31e79d10ea Fix task nodes under switch task will not be skipped 
(#16109)
31e79d10ea is described below

commit 31e79d10ea230da9e9470cd06134e23129b547b7
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Jun 5 23:41:52 2024 +0800

    Fix task nodes under switch task will not be skipped (#16109)
    
    (cherry picked from commit f687a7d851c206ae30377ac3f9dc025cec35ffb2)
---
 .../runner/task/switchtask/SwitchLogicTask.java    | 39 ++++++++++++++++------
 .../server/master/utils/SwitchTaskUtils.java       |  5 ---
 .../server/master/utils/SwitchTaskUtilsTest.java   | 12 +------
 .../plugin/task/api/log/TaskInstanceLogHeader.java |  8 ++---
 4 files changed, 33 insertions(+), 31 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
index 1f52f9287d..58f2105394 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
@@ -61,26 +61,30 @@ public class SwitchLogicTask extends 
BaseSyncLogicTask<SwitchParameters> {
         this.taskInstance = 
workflowExecuteRunnable.getTaskInstance(taskExecutionContext.getTaskInstanceId())
                 .orElseThrow(() -> new LogicTaskInitializeException(
                         "Cannot find the task instance in workflow execute 
runnable"));
+        // Since the default branch is not in the dependTaskList, we need to 
add it to the end
+        // otherwise the default branch will never be skipped in DAGHelper
+        addDefaultBranchToEnd();
     }
 
     @Override
     public void handle() throws MasterTaskExecuteException {
-        // Calculate the condition result and get the next node
         if (CollectionUtils.isEmpty(taskParameters.getDependTaskList())) {
+            // If the branch is empty then will go into the default branch
+            // This case shouldn't happen, we can directly throw exception and 
forbid the user to set branch
             moveToDefaultBranch();
         } else {
             calculateSwitchBranch();
         }
         taskInstance.setSwitchDependency(taskParameters);
-        log.info("Switch task execute finished");
         
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
+        log.info("Switch task execute finished: {}", 
taskExecutionContext.getCurrentExecutionStatus().name());
     }
 
     private void moveToDefaultBranch() {
-        checkIfBranchExist(taskParameters.getNextNode());
-
         List<SwitchResultVo> switchResultVos = 
taskParameters.getDependTaskList();
-        switchResultVos.add(new SwitchResultVo(null, 
taskParameters.getNextNode()));
+        SwitchResultVo defaultSwitchResultVo = getDefaultSwitchResultVo();
+        checkIfBranchExist(defaultSwitchResultVo.getNextNode());
+
         taskParameters.setResultConditionLocation(switchResultVos.size() - 1);
 
         log.info("The condition is not satisfied, move to the default branch: 
{}",
@@ -90,9 +94,6 @@ public class SwitchLogicTask extends 
BaseSyncLogicTask<SwitchParameters> {
 
     private void calculateSwitchBranch() {
         List<SwitchResultVo> switchResultVos = 
taskParameters.getDependTaskList();
-        if (CollectionUtils.isEmpty(switchResultVos)) {
-            moveToDefaultBranch();
-        }
         Map<String, Property> globalParams = 
taskExecutionContext.getPrepareParamsMap();
         Map<String, Property> varParams = JSONUtils
                 .toList(taskInstance.getVarPool(), Property.class)
@@ -100,7 +101,8 @@ public class SwitchLogicTask extends 
BaseSyncLogicTask<SwitchParameters> {
                 .collect(Collectors.toMap(Property::getProp, Property -> 
Property));
 
         int finalConditionLocation = -1;
-        for (int i = 0; i < switchResultVos.size(); i++) {
+        // The last one is the default branch, no need to calculate
+        for (int i = 0; i < switchResultVos.size() - 1; i++) {
             SwitchResultVo switchResultVo = switchResultVos.get(i);
             log.info("Begin to execute {} condition: {} ", i, 
switchResultVo.getCondition());
             String content = 
SwitchTaskUtils.generateContentWithTaskParams(switchResultVo.getCondition(), 
globalParams,
@@ -111,14 +113,18 @@ public class SwitchLogicTask extends 
BaseSyncLogicTask<SwitchParameters> {
                 result = SwitchTaskUtils.evaluate(content);
                 log.info("Execute condition sentence: {} successfully: {}", 
content, result);
                 if (result) {
+                    // If matched, break the loop
                     finalConditionLocation = i;
+                    break;
                 }
             } catch (Exception e) {
                 log.info("Execute condition sentence: {} failed", content, e);
             }
         }
+        // If the finalConditionLocation is -1, then the default branch will 
be executed
         if (finalConditionLocation >= 0) {
-            
checkIfBranchExist(switchResultVos.get(finalConditionLocation).getNextNode());
+            List<Long> nextNodes = 
switchResultVos.get(finalConditionLocation).getNextNode();
+            checkIfBranchExist(nextNodes);
             log.info("The condition is satisfied, move to the branch: {}",
                     
switchResultVos.get(finalConditionLocation).getNextNode().stream()
                             .map(node -> 
workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowGraph().getDag()
@@ -126,7 +132,6 @@ public class SwitchLogicTask extends 
BaseSyncLogicTask<SwitchParameters> {
                             .collect(Collectors.toList()));
             taskParameters.setResultConditionLocation(finalConditionLocation);
         } else {
-            log.info("All conditions are not satisfied, move to the default 
branch");
             moveToDefaultBranch();
         }
     }
@@ -147,4 +152,16 @@ public class SwitchLogicTask extends 
BaseSyncLogicTask<SwitchParameters> {
         }
     }
 
+    private void addDefaultBranchToEnd() {
+        SwitchResultVo switchResultVo = new SwitchResultVo(null, 
taskParameters.getNextNode());
+        List<SwitchResultVo> dependTaskList = 
taskParameters.getDependTaskList();
+        if (!dependTaskList.contains(switchResultVo)) {
+            dependTaskList.add(switchResultVo);
+        }
+    }
+
+    private SwitchResultVo getDefaultSwitchResultVo() {
+        return 
taskParameters.getDependTaskList().get(taskParameters.getDependTaskList().size()
 - 1);
+    }
+
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtils.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtils.java
index 52f274cbcb..1676df7e01 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtils.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtils.java
@@ -63,7 +63,6 @@ public class SwitchTaskUtils {
         if (MapUtils.isNotEmpty(varParams)) {
             params.putAll(varParams);
         }
-        String originContent = content;
         Pattern pattern = Pattern.compile(rgex);
         Matcher m = pattern.matcher(content);
         while (m.find()) {
@@ -82,10 +81,6 @@ public class SwitchTaskUtils {
             content = content.replace("${" + paramName + "}", value);
         }
 
-        // if not replace any params, throw exception to avoid illegal 
condition
-        if (originContent.equals(content)) {
-            throw new IllegalArgumentException("condition is not valid, please 
check it. condition: " + condition);
-        }
         return content;
     }
 
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtilsTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtilsTest.java
index 7f7ae43bbf..34785ada47 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtilsTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtilsTest.java
@@ -50,7 +50,7 @@ public class SwitchTaskUtilsTest {
         Map<String, Property> globalParams = new HashMap<>();
         Map<String, Property> varParams = new HashMap<>();
         globalParams.put("test", new Property("test", Direct.IN, 
DataType.INTEGER, "1"));
-        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
+        Assertions.assertDoesNotThrow(() -> {
             SwitchTaskUtils.generateContentWithTaskParams(content, 
globalParams, varParams);
         });
 
@@ -70,15 +70,5 @@ public class SwitchTaskUtilsTest {
             SwitchTaskUtils.evaluate(script);
         });
 
-        String contentWithSpecify1 = "cmd.abc";
-        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
-            SwitchTaskUtils.generateContentWithTaskParams(contentWithSpecify1, 
globalParams, varParams);
-        });
-
-        String contentWithSpecify2 = "cmd()";
-        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
-            SwitchTaskUtils.generateContentWithTaskParams(contentWithSpecify2, 
globalParams, varParams);
-        });
-
     }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskInstanceLogHeader.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskInstanceLogHeader.java
index 02ee2e1c99..3aa76dd114 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskInstanceLogHeader.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskInstanceLogHeader.java
@@ -27,11 +27,11 @@ import com.google.common.collect.Lists;
 public class TaskInstanceLogHeader {
 
     private static final List<String> INITIALIZE_TASK_CONTEXT_HEADER = 
Lists.newArrayList(
-            
"***********************************************************************************************",
+            
"\n***********************************************************************************************",
             "*********************************  Initialize task context  
***********************************",
             
"***********************************************************************************************");
     private static final List<String> LOAD_TASK_INSTANCE_PLUGIN_HEADER = 
Lists.newArrayList(
-            
"***********************************************************************************************",
+            
"\n***********************************************************************************************",
             "*********************************  Load task instance plugin  
*********************************",
             
"***********************************************************************************************");
 
@@ -40,12 +40,12 @@ public class TaskInstanceLogHeader {
     }
 
     private static final List<String> EXECUTE_TASK_HEADER = Lists.newArrayList(
-            
"***********************************************************************************************",
+            
"\n***********************************************************************************************",
             "*********************************  Execute task instance  
*************************************",
             
"***********************************************************************************************");
 
     private static final List<String> FINALIZE_TASK_HEADER = 
Lists.newArrayList(
-            
"***********************************************************************************************",
+            
"\n***********************************************************************************************",
             "*********************************  Finalize task instance  
************************************",
             
"***********************************************************************************************");
 

Reply via email to