This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new dcc9d64ef6 Fix Switch logic task doesn't check the branch exist
(#15755)
dcc9d64ef6 is described below
commit dcc9d64ef6c5a8353722aa371eb4920d33d6d756
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sun Mar 24 20:52:00 2024 +0800
Fix Switch logic task doesn't check the branch exist (#15755)
---
.../runner/task/switchtask/SwitchLogicTask.java | 116 ++++++++++-----------
1 file changed, 57 insertions(+), 59 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
index 244926c096..1f52f9287d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
@@ -18,10 +18,8 @@
package org.apache.dolphinscheduler.server.master.runner.task.switchtask;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
@@ -34,7 +32,6 @@ import
org.apache.dolphinscheduler.server.master.runner.task.BaseSyncLogicTask;
import org.apache.dolphinscheduler.server.master.utils.SwitchTaskUtils;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
@@ -47,7 +44,7 @@ public class SwitchLogicTask extends
BaseSyncLogicTask<SwitchParameters> {
public static final String TASK_TYPE = "SWITCH";
- private final ProcessInstance processInstance;
+ private final WorkflowExecuteRunnable workflowExecuteRunnable;
private final TaskInstance taskInstance;
public SwitchLogicTask(TaskExecutionContext taskExecutionContext,
@@ -59,9 +56,8 @@ public class SwitchLogicTask extends
BaseSyncLogicTask<SwitchParameters> {
.orElseThrow(() -> new LogicTaskInitializeException(
"Cannot find the task instance in workflow
execute runnable"))
.getSwitchDependency());
- WorkflowExecuteRunnable workflowExecuteRunnable =
+ this.workflowExecuteRunnable =
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId());
- this.processInstance =
workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance();
this.taskInstance =
workflowExecuteRunnable.getTaskInstance(taskExecutionContext.getTaskInstanceId())
.orElseThrow(() -> new LogicTaskInitializeException(
"Cannot find the task instance in workflow execute
runnable"));
@@ -69,84 +65,86 @@ public class SwitchLogicTask extends
BaseSyncLogicTask<SwitchParameters> {
@Override
public void handle() throws MasterTaskExecuteException {
- DependResult conditionResult = calculateConditionResult();
- TaskExecutionStatus status =
- (conditionResult == DependResult.SUCCESS) ?
TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
- log.info("Switch task execute finished, condition result is: {}, task
status is: {}", conditionResult,
- status.name());
- taskExecutionContext.setCurrentExecutionStatus(status);
+ // Calculate the condition result and get the next node
+ if (CollectionUtils.isEmpty(taskParameters.getDependTaskList())) {
+ moveToDefaultBranch();
+ } else {
+ calculateSwitchBranch();
+ }
+ taskInstance.setSwitchDependency(taskParameters);
+ log.info("Switch task execute finished");
+
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
}
- // todo: don't use depend result, use switch result
- private DependResult calculateConditionResult() {
- DependResult conditionResult = DependResult.SUCCESS;
+ private void moveToDefaultBranch() {
+ checkIfBranchExist(taskParameters.getNextNode());
List<SwitchResultVo> switchResultVos =
taskParameters.getDependTaskList();
+ switchResultVos.add(new SwitchResultVo(null,
taskParameters.getNextNode()));
+ taskParameters.setResultConditionLocation(switchResultVos.size() - 1);
- SwitchResultVo switchResultVo = new SwitchResultVo();
- switchResultVo.setNextNode(taskParameters.getNextNode());
- switchResultVos.add(switchResultVo);
- // todo: refactor these calculate code
- int finalConditionLocation = switchResultVos.size() - 1;
- int i = 0;
+ log.info("The condition is not satisfied, move to the default branch:
{}",
+ taskParameters.getNextNode().stream().map(node ->
workflowExecuteRunnable.getWorkflowExecuteContext()
+
.getWorkflowGraph().getDag().getNode(node).getName()).collect(Collectors.toList()));
+ }
- Map<String, Property> globalParams = JSONUtils
- .toList(processInstance.getGlobalParams(), Property.class)
- .stream()
- .collect(Collectors.toMap(Property::getProp, Property ->
Property));
+ private void calculateSwitchBranch() {
+ List<SwitchResultVo> switchResultVos =
taskParameters.getDependTaskList();
+ if (CollectionUtils.isEmpty(switchResultVos)) {
+ moveToDefaultBranch();
+ }
+ Map<String, Property> globalParams =
taskExecutionContext.getPrepareParamsMap();
Map<String, Property> varParams = JSONUtils
.toList(taskInstance.getVarPool(), Property.class)
.stream()
.collect(Collectors.toMap(Property::getProp, Property ->
Property));
- for (SwitchResultVo info : switchResultVos) {
- log.info("Begin to execute {} condition: {} ", (i + 1),
info.getCondition());
- if (StringUtils.isEmpty(info.getCondition())) {
- finalConditionLocation = i;
- break;
- }
- String content =
-
SwitchTaskUtils.generateContentWithTaskParams(info.getCondition(),
globalParams, varParams);
+ int finalConditionLocation = -1;
+ for (int i = 0; i < switchResultVos.size(); i++) {
+ SwitchResultVo switchResultVo = switchResultVos.get(i);
+ log.info("Begin to execute {} condition: {} ", i,
switchResultVo.getCondition());
+ String content =
SwitchTaskUtils.generateContentWithTaskParams(switchResultVo.getCondition(),
globalParams,
+ varParams);
log.info("Format condition sentence::{} successfully", content);
- Boolean result;
+ boolean result;
try {
result = SwitchTaskUtils.evaluate(content);
log.info("Execute condition sentence: {} successfully: {}",
content, result);
+ if (result) {
+ finalConditionLocation = i;
+ }
} catch (Exception e) {
log.info("Execute condition sentence: {} failed", content, e);
- conditionResult = DependResult.FAILED;
- break;
- }
- if (result) {
- finalConditionLocation = i;
- break;
}
- i++;
}
- taskParameters.setDependTaskList(switchResultVos);
- taskParameters.setResultConditionLocation(finalConditionLocation);
- taskInstance.setSwitchDependency(taskParameters);
-
- if (!isValidSwitchResult(switchResultVos.get(finalConditionLocation)))
{
- conditionResult = DependResult.FAILED;
- log.error("The switch task depend result is invalid, result:{},
switch branch:{}", conditionResult,
- finalConditionLocation);
+ if (finalConditionLocation >= 0) {
+
checkIfBranchExist(switchResultVos.get(finalConditionLocation).getNextNode());
+ log.info("The condition is satisfied, move to the branch: {}",
+
switchResultVos.get(finalConditionLocation).getNextNode().stream()
+ .map(node ->
workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowGraph().getDag()
+ .getNode(node).getName())
+ .collect(Collectors.toList()));
+ taskParameters.setResultConditionLocation(finalConditionLocation);
+ } else {
+ log.info("All conditions are not satisfied, move to the default
branch");
+ moveToDefaultBranch();
}
-
- log.info("The switch task depend result:{}, switch branch:{}",
conditionResult, finalConditionLocation);
- return conditionResult;
}
- private boolean isValidSwitchResult(SwitchResultVo switchResult) {
- if (CollectionUtils.isEmpty(switchResult.getNextNode())) {
- return false;
+ private void checkIfBranchExist(List<Long> branchNode) {
+ if (CollectionUtils.isEmpty(branchNode)) {
+ throw new IllegalArgumentException("The branchNode is empty,
please check the switch task configuration");
}
- for (Long nextNode : switchResult.getNextNode()) {
- if (nextNode == null) {
- return false;
+ for (Long branch : branchNode) {
+ if (branch == null) {
+ throw new IllegalArgumentException("The branch is empty,
please check the switch task configuration");
+ }
+ if
(!workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowGraph().getDag().containsNode(branch))
{
+ throw new IllegalArgumentException(
+ "The branch(code= " + branchNode
+ + ") is not in the dag, please check the
switch task configuration");
}
}
- return true;
}
}