This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.2.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.2.2-prepare by this push:
new 31e79d10ea Fix task nodes under switch task will not be skipped
(#16109)
31e79d10ea is described below
commit 31e79d10ea230da9e9470cd06134e23129b547b7
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Jun 5 23:41:52 2024 +0800
Fix task nodes under switch task will not be skipped (#16109)
(cherry picked from commit f687a7d851c206ae30377ac3f9dc025cec35ffb2)
---
.../runner/task/switchtask/SwitchLogicTask.java | 39 ++++++++++++++++------
.../server/master/utils/SwitchTaskUtils.java | 5 ---
.../server/master/utils/SwitchTaskUtilsTest.java | 12 +------
.../plugin/task/api/log/TaskInstanceLogHeader.java | 8 ++---
4 files changed, 33 insertions(+), 31 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
index 1f52f9287d..58f2105394 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
@@ -61,26 +61,30 @@ public class SwitchLogicTask extends
BaseSyncLogicTask<SwitchParameters> {
this.taskInstance =
workflowExecuteRunnable.getTaskInstance(taskExecutionContext.getTaskInstanceId())
.orElseThrow(() -> new LogicTaskInitializeException(
"Cannot find the task instance in workflow execute
runnable"));
+ // Since the default branch is not in the dependTaskList, we need to
add it to the end
+ // otherwise the default branch will never be skipped in DAGHelper
+ addDefaultBranchToEnd();
}
@Override
public void handle() throws MasterTaskExecuteException {
- // Calculate the condition result and get the next node
if (CollectionUtils.isEmpty(taskParameters.getDependTaskList())) {
+ // If the branch is empty then will go into the default branch
+ // This case shouldn't happen, we can directly throw exception and
forbid the user to set branch
moveToDefaultBranch();
} else {
calculateSwitchBranch();
}
taskInstance.setSwitchDependency(taskParameters);
- log.info("Switch task execute finished");
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
+ log.info("Switch task execute finished: {}",
taskExecutionContext.getCurrentExecutionStatus().name());
}
private void moveToDefaultBranch() {
- checkIfBranchExist(taskParameters.getNextNode());
-
List<SwitchResultVo> switchResultVos =
taskParameters.getDependTaskList();
- switchResultVos.add(new SwitchResultVo(null,
taskParameters.getNextNode()));
+ SwitchResultVo defaultSwitchResultVo = getDefaultSwitchResultVo();
+ checkIfBranchExist(defaultSwitchResultVo.getNextNode());
+
taskParameters.setResultConditionLocation(switchResultVos.size() - 1);
log.info("The condition is not satisfied, move to the default branch:
{}",
@@ -90,9 +94,6 @@ public class SwitchLogicTask extends
BaseSyncLogicTask<SwitchParameters> {
private void calculateSwitchBranch() {
List<SwitchResultVo> switchResultVos =
taskParameters.getDependTaskList();
- if (CollectionUtils.isEmpty(switchResultVos)) {
- moveToDefaultBranch();
- }
Map<String, Property> globalParams =
taskExecutionContext.getPrepareParamsMap();
Map<String, Property> varParams = JSONUtils
.toList(taskInstance.getVarPool(), Property.class)
@@ -100,7 +101,8 @@ public class SwitchLogicTask extends
BaseSyncLogicTask<SwitchParameters> {
.collect(Collectors.toMap(Property::getProp, Property ->
Property));
int finalConditionLocation = -1;
- for (int i = 0; i < switchResultVos.size(); i++) {
+ // The last one is the default branch, no need to calculate
+ for (int i = 0; i < switchResultVos.size() - 1; i++) {
SwitchResultVo switchResultVo = switchResultVos.get(i);
log.info("Begin to execute {} condition: {} ", i,
switchResultVo.getCondition());
String content =
SwitchTaskUtils.generateContentWithTaskParams(switchResultVo.getCondition(),
globalParams,
@@ -111,14 +113,18 @@ public class SwitchLogicTask extends
BaseSyncLogicTask<SwitchParameters> {
result = SwitchTaskUtils.evaluate(content);
log.info("Execute condition sentence: {} successfully: {}",
content, result);
if (result) {
+ // If matched, break the loop
finalConditionLocation = i;
+ break;
}
} catch (Exception e) {
log.info("Execute condition sentence: {} failed", content, e);
}
}
+ // If the finalConditionLocation is -1, then the default branch will
be executed
if (finalConditionLocation >= 0) {
-
checkIfBranchExist(switchResultVos.get(finalConditionLocation).getNextNode());
+ List<Long> nextNodes =
switchResultVos.get(finalConditionLocation).getNextNode();
+ checkIfBranchExist(nextNodes);
log.info("The condition is satisfied, move to the branch: {}",
switchResultVos.get(finalConditionLocation).getNextNode().stream()
.map(node ->
workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowGraph().getDag()
@@ -126,7 +132,6 @@ public class SwitchLogicTask extends
BaseSyncLogicTask<SwitchParameters> {
.collect(Collectors.toList()));
taskParameters.setResultConditionLocation(finalConditionLocation);
} else {
- log.info("All conditions are not satisfied, move to the default
branch");
moveToDefaultBranch();
}
}
@@ -147,4 +152,16 @@ public class SwitchLogicTask extends
BaseSyncLogicTask<SwitchParameters> {
}
}
+ private void addDefaultBranchToEnd() {
+ SwitchResultVo switchResultVo = new SwitchResultVo(null,
taskParameters.getNextNode());
+ List<SwitchResultVo> dependTaskList =
taskParameters.getDependTaskList();
+ if (!dependTaskList.contains(switchResultVo)) {
+ dependTaskList.add(switchResultVo);
+ }
+ }
+
+ private SwitchResultVo getDefaultSwitchResultVo() {
+ return
taskParameters.getDependTaskList().get(taskParameters.getDependTaskList().size()
- 1);
+ }
+
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtils.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtils.java
index 52f274cbcb..1676df7e01 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtils.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtils.java
@@ -63,7 +63,6 @@ public class SwitchTaskUtils {
if (MapUtils.isNotEmpty(varParams)) {
params.putAll(varParams);
}
- String originContent = content;
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
while (m.find()) {
@@ -82,10 +81,6 @@ public class SwitchTaskUtils {
content = content.replace("${" + paramName + "}", value);
}
- // if not replace any params, throw exception to avoid illegal
condition
- if (originContent.equals(content)) {
- throw new IllegalArgumentException("condition is not valid, please
check it. condition: " + condition);
- }
return content;
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtilsTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtilsTest.java
index 7f7ae43bbf..34785ada47 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtilsTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtilsTest.java
@@ -50,7 +50,7 @@ public class SwitchTaskUtilsTest {
Map<String, Property> globalParams = new HashMap<>();
Map<String, Property> varParams = new HashMap<>();
globalParams.put("test", new Property("test", Direct.IN,
DataType.INTEGER, "1"));
- Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
+ Assertions.assertDoesNotThrow(() -> {
SwitchTaskUtils.generateContentWithTaskParams(content,
globalParams, varParams);
});
@@ -70,15 +70,5 @@ public class SwitchTaskUtilsTest {
SwitchTaskUtils.evaluate(script);
});
- String contentWithSpecify1 = "cmd.abc";
- Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
- SwitchTaskUtils.generateContentWithTaskParams(contentWithSpecify1,
globalParams, varParams);
- });
-
- String contentWithSpecify2 = "cmd()";
- Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
- SwitchTaskUtils.generateContentWithTaskParams(contentWithSpecify2,
globalParams, varParams);
- });
-
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskInstanceLogHeader.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskInstanceLogHeader.java
index 02ee2e1c99..3aa76dd114 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskInstanceLogHeader.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskInstanceLogHeader.java
@@ -27,11 +27,11 @@ import com.google.common.collect.Lists;
public class TaskInstanceLogHeader {
private static final List<String> INITIALIZE_TASK_CONTEXT_HEADER =
Lists.newArrayList(
-
"***********************************************************************************************",
+
"\n***********************************************************************************************",
"********************************* Initialize task context
***********************************",
"***********************************************************************************************");
private static final List<String> LOAD_TASK_INSTANCE_PLUGIN_HEADER =
Lists.newArrayList(
-
"***********************************************************************************************",
+
"\n***********************************************************************************************",
"********************************* Load task instance plugin
*********************************",
"***********************************************************************************************");
@@ -40,12 +40,12 @@ public class TaskInstanceLogHeader {
}
private static final List<String> EXECUTE_TASK_HEADER = Lists.newArrayList(
-
"***********************************************************************************************",
+
"\n***********************************************************************************************",
"********************************* Execute task instance
*************************************",
"***********************************************************************************************");
private static final List<String> FINALIZE_TASK_HEADER =
Lists.newArrayList(
-
"***********************************************************************************************",
+
"\n***********************************************************************************************",
"********************************* Finalize task instance
************************************",
"***********************************************************************************************");