caishunfeng commented on code in PR #16152:
URL:
https://github.com/apache/dolphinscheduler/pull/16152#discussion_r1639136514
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java:
##########
@@ -40,37 +38,34 @@
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class ConditionLogicTask extends BaseSyncLogicTask<DependentParameters>
{
+public class ConditionLogicTask extends
BaseSyncLogicTask<ConditionsParameters> {
public static final String TASK_TYPE = "CONDITIONS";
private final TaskInstanceDao taskInstanceDao;
private final ProcessInstanceDao workflowInstanceDao;
+ private final TaskInstance taskInstance;
+
public ConditionLogicTask(TaskExecutionContext taskExecutionContext,
- ProcessInstanceExecCacheManager
processInstanceExecCacheManager,
+ TaskInstance taskInstance,
TaskInstanceDao taskInstanceDao,
- ProcessInstanceDao workflowInstanceDao) throws
LogicTaskInitializeException {
+ ProcessInstanceDao workflowInstanceDao) {
// todo: we need to change the parameter in front-end, so that we can
directly use json to parse
- super(taskExecutionContext,
-
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId())
-
.getTaskInstance(taskExecutionContext.getTaskInstanceId())
- .orElseThrow(() -> new LogicTaskInitializeException(
- "Cannot find the task instance in workflow
execute runnable"))
- .getDependency());
- // todoļ¼check the parameters, why we don't use conditionTask?
taskInstance.getDependency();
+ super(taskExecutionContext, taskInstance.getConditionsParameters());
this.taskInstanceDao = taskInstanceDao;
this.workflowInstanceDao = workflowInstanceDao;
+ this.taskInstance = taskInstance;
}
@Override
public void handle() {
// calculate the conditionResult
DependResult conditionResult = calculateConditionResult();
- TaskExecutionStatus taskExecutionStatus =
- (conditionResult == DependResult.SUCCESS) ?
TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
- log.info("The condition result is {}, task instance statue will be:
{}", conditionResult, taskExecutionStatus);
- taskExecutionContext.setCurrentExecutionStatus(taskExecutionStatus);
+ log.info("The condition result is {}", conditionResult);
+ taskParameters.setConditionSuccess(conditionResult ==
DependResult.SUCCESS);
+ taskInstance.setConditionsParameters(taskParameters);
+
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
Review Comment:
Need to update the docs too.
see
https://dolphinscheduler.apache.org/zh-cn/docs/3.2.1/guide/task/conditions
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java:
##########
@@ -2015,6 +2013,24 @@ private void saveCacheTaskInstance(TaskInstance
taskInstance) {
}
}
+ /**
+ * Whether the task instance need to put into {@link #errorTaskMap}.
+ * Only the task instance is failed or killed, and it is parent of
condition task.
+ * Then it should be put into {@link #errorTaskMap}.
+ * <p> Once a task instance is put into {@link #errorTaskMap}, it will be
thought as failed and make the workflow be failed.
+ */
Review Comment:
:+1:
##########
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java:
##########
@@ -291,43 +298,13 @@ public void testConditionPostNode() throws IOException {
// 3.complete 1/2/3/4/5/8 expect post:7 skip:6
skipNodeList.clear();
TaskInstance taskInstance1 = new TaskInstance();
- taskInstance.setState(TaskExecutionStatus.SUCCESS);
completeTaskList.put(5L, taskInstance1);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag,
completeTaskList);
Assertions.assertEquals(1, postNodes.size());
Assertions.assertTrue(postNodes.contains(7L));
Assertions.assertEquals(1, skipNodeList.size());
Assertions.assertTrue(skipNodeList.containsKey(6L));
- // dag: 1-2-3-5-7 4-3-6
- // 3-if , complete:1/2/3/4
- // 1.failure:3 expect post:6 skip:5/7
- skipNodeList.clear();
- completeTaskList.remove(3L);
- taskInstance = new TaskInstance();
-
- Map<String, Object> taskParamsMap = new HashMap<>();
- taskParamsMap.put(Constants.SWITCH_RESULT, "");
- taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
- taskInstance.setState(TaskExecutionStatus.FAILURE);
- completeTaskList.put(3L, taskInstance);
- postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag,
completeTaskList);
- Assertions.assertEquals(1, postNodes.size());
- Assertions.assertTrue(postNodes.contains(6L));
- Assertions.assertEquals(2, skipNodeList.size());
- Assertions.assertTrue(skipNodeList.containsKey(5L));
- Assertions.assertTrue(skipNodeList.containsKey(7L));
-
- // dag: 1-2-3-5-7 4-3-6
- // 3-if , complete:1/2/3/4
- // 1.failure:3 expect post:6 skip:5/7
- dag = generateDag2();
- skipNodeList.clear();
- completeTaskList.clear();
- taskInstance.setSwitchDependency(getSwitchNode());
- completeTaskList.put(1L, taskInstance);
- postNodes = DagHelper.parsePostNodes(1L, skipNodeList, dag,
completeTaskList);
- Assertions.assertEquals(1, postNodes.size());
Review Comment:
Maybe should keep the switch UT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]