This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5df4b1cbc3 Fix condition task will cause workflow instance failed
(#16152)
5df4b1cbc3 is described below
commit 5df4b1cbc3c6c6836fc0a1022953f1b01303af2f
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Jun 19 11:28:35 2024 +0800
Fix condition task will cause workflow instance failed (#16152)
---
.../dolphinscheduler/dao/entity/TaskInstance.java | 44 ++++++++++++++++++
.../master/runner/WorkflowExecuteRunnable.java | 36 ++++++++++-----
.../runner/task/condition/ConditionLogicTask.java | 29 +++++-------
.../condition/ConditionLogicTaskPluginFactory.java | 11 ++++-
.../service/alert/ListenerEventAlertManager.java | 9 +++-
.../dolphinscheduler/service/utils/DagHelper.java | 52 +++++++++++-----------
.../alert/ListenerEventAlertManagerTest.java | 20 +++++----
.../service/utils/DagHelperTest.java | 51 ++++++---------------
.../task/api/parameters/ConditionsParameters.java | 18 +++-----
9 files changed, 157 insertions(+), 113 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 4897449753..6fc14d5cac 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -31,11 +31,15 @@ import
org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.Serializable;
import java.util.Date;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -192,6 +196,9 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
private DependentParameters dependency;
+ @TableField(exist = false)
+ private ConditionsParameters conditionsParameters;
+
/**
* switch dependency
*/
@@ -318,6 +325,43 @@ public class TaskInstance implements Serializable {
this.dependency = dependency;
}
+ public ConditionsParameters getConditionsParameters() {
+ if (this.conditionsParameters == null) {
+ Map<String, Object> taskParamsMap =
+ JSONUtils.parseObject(this.getTaskParams(), new
TypeReference<Map<String, Object>>() {
+ });
+ this.conditionsParameters =
+ JSONUtils.parseObject((String)
taskParamsMap.get(Constants.DEPENDENCE), ConditionsParameters.class);
+ }
+ return conditionsParameters;
+ }
+
+ public ConditionsParameters.ConditionResult getConditionResult() {
+ Map<String, Object> taskParamsMap =
+ JSONUtils.parseObject(this.getTaskParams(), new
TypeReference<Map<String, Object>>() {
+ });
+ String conditionResult = (String)
taskParamsMap.getOrDefault(Constants.CONDITION_RESULT, "");
+ if (StringUtils.isNotEmpty(conditionResult)) {
+ return JSONUtils.parseObject(conditionResult, new
TypeReference<ConditionsParameters.ConditionResult>() {
+ });
+ }
+ return null;
+ }
+
+ public void setConditionResult(ConditionsParameters conditionsParameters) {
+ if (conditionsParameters == null) {
+ return;
+ }
+ Map<String, Object> taskParamsMap =
+ JSONUtils.parseObject(this.getTaskParams(), new
TypeReference<Map<String, Object>>() {
+ });
+ if (taskParamsMap == null) {
+ taskParamsMap = new HashMap<>();
+ }
+ taskParamsMap.put(Constants.CONDITION_RESULT,
JSONUtils.toJsonString(conditionsParameters));
+ this.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
+ }
+
public SwitchParameters getSwitchDependency() {
// todo: We need to directly use Jackson to deserialize the taskParam,
rather than parse the map and get from
// field.
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 919ca844c4..21a23fbe5b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -385,18 +385,19 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
// retry task
log.info("Retry taskInstance taskInstance state: {}",
taskInstance.getState());
retryTaskInstance(taskInstance);
- } else if (taskInstance.getState().isFailure()) {
+ } else if (taskInstance.getState().isFailure() ||
taskInstance.getState().isKill()
+ || taskInstance.getState().isStop()) {
completeTaskSet.add(taskInstance.getTaskCode());
- ProjectUser projectUser =
-
processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId());
-
listenerEventAlertManager.publishTaskFailListenerEvent(workflowInstance,
taskInstance, projectUser);
+
listenerEventAlertManager.publishTaskFailListenerEvent(workflowInstance,
taskInstance);
+ if (isTaskNeedPutIntoErrorMap(taskInstance)) {
+ errorTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
+ }
// There are child nodes and the failure policy is: CONTINUE
if (workflowInstance.getFailureStrategy() ==
FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
taskInstance.getTaskCode(),
workflowExecuteContext.getWorkflowGraph().getDag())) {
submitPostNode(taskInstance.getTaskCode());
} else {
- errorTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
if (workflowInstance.getFailureStrategy() ==
FailureStrategy.END) {
killAllTasks();
}
@@ -805,10 +806,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
completeTaskSet.add(task.getTaskCode());
continue;
}
- if (task.isConditionsTask() ||
DagHelper.haveConditionsAfterNode(task.getTaskCode(),
-
workflowExecuteContext.getWorkflowGraph().getDag())) {
- continue;
- }
+
if (task.taskCanRetry()) {
if (task.getState().isNeedFaultTolerance()) {
log.info("TaskInstance needs fault tolerance, will
be added to standby list.");
@@ -824,7 +822,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
}
continue;
}
- if (task.getState().isFailure()) {
+ if (isTaskNeedPutIntoErrorMap(task)) {
errorTaskMap.put(task.getTaskCode(), task.getId());
}
} finally {
@@ -2015,6 +2013,24 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
}
}
+ /**
+ * Whether the task instance need to put into {@link #errorTaskMap}.
+ * Only the task instance is failed or killed, and it is parent of
condition task.
+ * Then it should be put into {@link #errorTaskMap}.
+ * <p> Once a task instance is put into {@link #errorTaskMap}, it will be
thought as failed and make the workflow be failed.
+ */
+ private boolean isTaskNeedPutIntoErrorMap(TaskInstance taskInstance) {
+ if (!taskInstance.getState().isFailure() &&
!taskInstance.getState().isStop()
+ && !taskInstance.getState().isKill()) {
+ return false;
+ }
+ TaskNode taskNode =
workflowExecuteContext.getWorkflowGraph().getTaskNodeByCode(taskInstance.getTaskCode());
+ if (DagHelper.haveConditionsAfterNode(taskNode.getCode(),
workflowExecuteContext.getWorkflowGraph().getDag())) {
+ return false;
+ }
+ return true;
+ }
+
private enum WorkflowRunnableStatus {
CREATED, INITIALIZE_QUEUE, STARTED,
;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java
index 803a8043ff..b54f10d48d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java
@@ -25,10 +25,8 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
-import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import
org.apache.dolphinscheduler.server.master.exception.LogicTaskInitializeException;
import org.apache.dolphinscheduler.server.master.runner.task.BaseSyncLogicTask;
import java.util.List;
@@ -40,37 +38,34 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class ConditionLogicTask extends BaseSyncLogicTask<DependentParameters>
{
+public class ConditionLogicTask extends
BaseSyncLogicTask<ConditionsParameters> {
public static final String TASK_TYPE = "CONDITIONS";
private final TaskInstanceDao taskInstanceDao;
private final ProcessInstanceDao workflowInstanceDao;
+ private final TaskInstance taskInstance;
+
public ConditionLogicTask(TaskExecutionContext taskExecutionContext,
- ProcessInstanceExecCacheManager
processInstanceExecCacheManager,
+ TaskInstance taskInstance,
TaskInstanceDao taskInstanceDao,
- ProcessInstanceDao workflowInstanceDao) throws
LogicTaskInitializeException {
+ ProcessInstanceDao workflowInstanceDao) {
// todo: we need to change the parameter in front-end, so that we can
directly use json to parse
- super(taskExecutionContext,
-
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId())
-
.getTaskInstance(taskExecutionContext.getTaskInstanceId())
- .orElseThrow(() -> new LogicTaskInitializeException(
- "Cannot find the task instance in workflow
execute runnable"))
- .getDependency());
- // todoļ¼check the parameters, why we don't use conditionTask?
taskInstance.getDependency();
+ super(taskExecutionContext, taskInstance.getConditionsParameters());
this.taskInstanceDao = taskInstanceDao;
this.workflowInstanceDao = workflowInstanceDao;
+ this.taskInstance = taskInstance;
}
@Override
public void handle() {
// calculate the conditionResult
DependResult conditionResult = calculateConditionResult();
- TaskExecutionStatus taskExecutionStatus =
- (conditionResult == DependResult.SUCCESS) ?
TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
- log.info("The condition result is {}, task instance statue will be:
{}", conditionResult, taskExecutionStatus);
- taskExecutionContext.setCurrentExecutionStatus(taskExecutionStatus);
+ log.info("The condition result is {}", conditionResult);
+ taskParameters.setConditionSuccess(conditionResult ==
DependResult.SUCCESS);
+ taskInstance.setConditionsParameters(taskParameters);
+
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
}
private DependResult calculateConditionResult() {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java
index d6887df6b5..4aee27f36d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.task.condition;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -43,7 +44,15 @@ public class ConditionLogicTaskPluginFactory implements
ILogicTaskPluginFactory<
@Override
public ConditionLogicTask createLogicTask(TaskExecutionContext
taskExecutionContext) throws LogicTaskInitializeException {
- return new ConditionLogicTask(taskExecutionContext,
processInstanceExecCacheManager, taskInstanceDao,
+ TaskInstance taskInstance =
+
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId())
+
.getTaskInstance(taskExecutionContext.getTaskInstanceId())
+ .orElseThrow(() -> new LogicTaskInitializeException(
+ "Cannot find the task instance in workflow
execute runnable"));
+ return new ConditionLogicTask(
+ taskExecutionContext,
+ taskInstance,
+ taskInstanceDao,
processInstanceDao);
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java
index 182ed678a0..3056a14f5b 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java
@@ -43,6 +43,7 @@ import
org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.CollectionUtils;
@@ -71,6 +72,9 @@ public class ListenerEventAlertManager {
@Autowired
private AlertPluginInstanceMapper alertPluginInstanceMapper;
+ @Autowired
+ private ProcessService processService;
+
public void publishServerDownListenerEvent(String host, String type) {
ServerDownListenerEvent event = new ServerDownListenerEvent();
event.setEventTime(new Date());
@@ -214,8 +218,9 @@ public class ListenerEventAlertManager {
}
public void publishTaskFailListenerEvent(ProcessInstance processInstance,
- TaskInstance taskInstance,
- ProjectUser projectUser) {
+ TaskInstance taskInstance) {
+ ProjectUser projectUser =
processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+
TaskFailListenerEvent event = new TaskFailListenerEvent();
event.setProjectCode(projectUser.getProjectCode());
event.setProjectName(projectUser.getProjectName());
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
index ee5e97cf55..4a6235c0e0 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
@@ -76,10 +76,10 @@ public class DagHelper {
/**
* generate task nodes needed by dag
*
- * @param taskNodeList taskNodeList
- * @param startNodeNameList startNodeNameList
+ * @param taskNodeList taskNodeList
+ * @param startNodeNameList startNodeNameList
* @param recoveryNodeCodeList recoveryNodeCodeList
- * @param taskDependType taskDependType
+ * @param taskDependType taskDependType
* @return task node list
*/
public static List<TaskNode>
generateFlowNodeListByStartNode(List<TaskNode> taskNodeList,
@@ -139,7 +139,7 @@ public class DagHelper {
/**
* find all the nodes that depended on the start node
*
- * @param startNode startNode
+ * @param startNode startNode
* @param taskNodeList taskNodeList
* @return task node list
*/
@@ -166,9 +166,9 @@ public class DagHelper {
/**
* find all nodes that start nodes depend on.
*
- * @param startNode startNode
+ * @param startNode startNode
* @param recoveryNodeCodeList recoveryNodeCodeList
- * @param taskNodeList taskNodeList
+ * @param taskNodeList taskNodeList
* @return task node list
*/
private static List<TaskNode> getFlowNodeListPre(TaskNode startNode,
@@ -204,10 +204,10 @@ public class DagHelper {
/**
* generate dag by start nodes and recovery nodes
*
- * @param totalTaskNodeList totalTaskNodeList
- * @param startNodeNameList startNodeNameList
+ * @param totalTaskNodeList totalTaskNodeList
+ * @param startNodeNameList startNodeNameList
* @param recoveryNodeCodeList recoveryNodeCodeList
- * @param depNodeType depNodeType
+ * @param depNodeType depNodeType
* @return process dag
* @throws Exception if error throws Exception
*/
@@ -232,7 +232,7 @@ public class DagHelper {
* find node by node name
*
* @param nodeDetails nodeDetails
- * @param nodeName nodeName
+ * @param nodeName nodeName
* @return task node
*/
public static TaskNode findNodeByName(List<TaskNode> nodeDetails, String
nodeName) {
@@ -248,7 +248,7 @@ public class DagHelper {
* find node by node code
*
* @param nodeDetails nodeDetails
- * @param nodeCode nodeCode
+ * @param nodeCode nodeCode
* @return task node
*/
public static TaskNode findNodeByCode(List<TaskNode> nodeDetails, Long
nodeCode) {
@@ -263,8 +263,8 @@ public class DagHelper {
/**
* the task can be submit when all the depends nodes are forbidden or
complete
*
- * @param taskNode taskNode
- * @param dag dag
+ * @param taskNode taskNode
+ * @param dag dag
* @param completeTaskList completeTaskList
* @return can submit
*/
@@ -369,22 +369,20 @@ public class DagHelper {
return conditionTaskList;
}
TaskInstance taskInstance = completeTaskList.get(nodeCode);
- ConditionsParameters conditionsParameters =
- JSONUtils.parseObject(taskNode.getConditionResult(),
ConditionsParameters.class);
+ ConditionsParameters conditionsParameters =
taskInstance.getConditionsParameters();
+ ConditionsParameters.ConditionResult conditionResult =
taskInstance.getConditionResult();
+
List<Long> skipNodeList = new ArrayList<>();
- if (taskInstance.getState().isSuccess()) {
- conditionTaskList = conditionsParameters.getSuccessNode();
- skipNodeList = conditionsParameters.getFailedNode();
- } else if (taskInstance.getState().isFailure()) {
- conditionTaskList = conditionsParameters.getFailedNode();
- skipNodeList = conditionsParameters.getSuccessNode();
+ if (conditionsParameters.isConditionSuccess()) {
+ conditionTaskList = conditionResult.getSuccessNode();
+ skipNodeList = conditionResult.getFailedNode();
} else {
- conditionTaskList.add(nodeCode);
+ conditionTaskList = conditionResult.getFailedNode();
+ skipNodeList = conditionResult.getSuccessNode();
}
- // the skipNodeList maybe null if no next task
- skipNodeList = Optional.ofNullable(skipNodeList).orElse(new
ArrayList<>());
- for (Long failedNode : skipNodeList) {
- setTaskNodeSkip(failedNode, dag, completeTaskList,
skipTaskNodeList);
+
+ if (CollectionUtils.isNotEmpty(skipNodeList)) {
+ skipNodeList.forEach(skipNode -> setTaskNodeSkip(skipNode, dag,
completeTaskList, skipTaskNodeList));
}
// the conditionTaskList maybe null if no next task
conditionTaskList = Optional.ofNullable(conditionTaskList).orElse(new
ArrayList<>());
@@ -447,6 +445,7 @@ public class DagHelper {
/**
* get all downstream nodes of the branch that the switch node needs to
execute
+ *
* @param taskCode
* @param dag
* @param switchNeedWorkCodes
@@ -480,6 +479,7 @@ public class DagHelper {
}
}
}
+
/**
* set task node and the post nodes skip flag
*/
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java
index 981b0a8050..4ba958e71d 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.service.alert;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.List;
@@ -40,8 +42,6 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* ProcessAlertManager Test
@@ -49,8 +49,6 @@ import org.slf4j.LoggerFactory;
@ExtendWith(MockitoExtension.class)
public class ListenerEventAlertManagerTest {
- private static final Logger logger =
LoggerFactory.getLogger(ListenerEventAlertManagerTest.class);
-
@InjectMocks
ListenerEventAlertManager listenerEventAlertManager;
@@ -60,6 +58,9 @@ public class ListenerEventAlertManagerTest {
@Mock
ListenerEventMapper listenerEventMapper;
+ @Mock
+ ProcessService processService;
+
@Test
public void sendServerDownListenerEventTest() {
String host = "127.0.0.1";
@@ -67,7 +68,7 @@ public class ListenerEventAlertManagerTest {
List<AlertPluginInstance> globalPluginInstanceList = new ArrayList<>();
AlertPluginInstance instance = new AlertPluginInstance(1,
"instanceParams", "instanceName");
globalPluginInstanceList.add(instance);
-
Mockito.when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
+ when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
.thenReturn(globalPluginInstanceList);
Mockito.doNothing().when(listenerEventMapper).insertServerDownEvent(any(),
any());
listenerEventAlertManager.publishServerDownListenerEvent(host, type);
@@ -82,9 +83,9 @@ public class ListenerEventAlertManagerTest {
AlertPluginInstance instance = new AlertPluginInstance(1,
"instanceParams", "instanceName");
List<AlertPluginInstance> globalPluginInstanceList = new ArrayList<>();
globalPluginInstanceList.add(instance);
-
Mockito.when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
+ when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
.thenReturn(globalPluginInstanceList);
- Mockito.when(listenerEventMapper.insert(any())).thenReturn(1);
+ when(listenerEventMapper.insert(any())).thenReturn(1);
listenerEventAlertManager.publishProcessDefinitionCreatedListenerEvent(user,
processDefinition,
taskDefinitionLogs, processTaskRelationLogs);
}
@@ -142,7 +143,8 @@ public class ListenerEventAlertManagerTest {
public void sendTaskFailListenerEvent() {
ProcessInstance processInstance = Mockito.mock(ProcessInstance.class);
TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
- ProjectUser projectUser = Mockito.mock(ProjectUser.class);
-
listenerEventAlertManager.publishTaskFailListenerEvent(processInstance,
taskInstance, projectUser);
+
when(processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()))
+ .thenReturn(new ProjectUser());
+
listenerEventAlertManager.publishTaskFailListenerEvent(processInstance,
taskInstance);
}
}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
index c19812303c..a36c414272 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.ProcessDag;
@@ -259,9 +260,14 @@ public class DagHelperTest {
completeTaskList.put(1L, new TaskInstance());
completeTaskList.put(2L, new TaskInstance());
completeTaskList.put(4L, new TaskInstance());
- TaskNode node3 = dag.getNode(3L);
- node3.setType(TASK_TYPE_CONDITIONS);
- node3.setConditionResult("{\n"
+
+ TaskInstance taskInstance3 = new TaskInstance();
+ taskInstance3.setTaskType(TASK_TYPE_CONDITIONS);
+ Map<String, Object> params = new HashMap<>();
+ ConditionsParameters conditionsParameters = new ConditionsParameters();
+ conditionsParameters.setConditionSuccess(true);
+ params.put(Constants.DEPENDENCE, "{\"conditionSuccess\": true}");
+ params.put(Constants.CONDITION_RESULT, "{\n"
+
" \"successNode\": [5\n"
+
@@ -272,11 +278,12 @@ public class DagHelperTest {
" ]\n"
+
" }");
- completeTaskList.remove(3L);
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setState(TaskExecutionStatus.SUCCESS);
+ taskInstance3.setTaskParams(JSONUtils.toJsonString(params));
+ taskInstance3.setState(TaskExecutionStatus.SUCCESS);
+ TaskNode node3 = dag.getNode(3L);
+ node3.setType(TASK_TYPE_CONDITIONS);
// complete 1/2/3/4 expect:8
- completeTaskList.put(3L, taskInstance);
+ completeTaskList.put(3L, taskInstance3);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag,
completeTaskList);
Assertions.assertEquals(1, postNodes.size());
Assertions.assertTrue(postNodes.contains(8L));
@@ -291,7 +298,6 @@ public class DagHelperTest {
// 3.complete 1/2/3/4/5/8 expect post:7 skip:6
skipNodeList.clear();
TaskInstance taskInstance1 = new TaskInstance();
- taskInstance.setState(TaskExecutionStatus.SUCCESS);
completeTaskList.put(5L, taskInstance1);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag,
completeTaskList);
Assertions.assertEquals(1, postNodes.size());
@@ -299,35 +305,6 @@ public class DagHelperTest {
Assertions.assertEquals(1, skipNodeList.size());
Assertions.assertTrue(skipNodeList.containsKey(6L));
- // dag: 1-2-3-5-7 4-3-6
- // 3-if , complete:1/2/3/4
- // 1.failure:3 expect post:6 skip:5/7
- skipNodeList.clear();
- completeTaskList.remove(3L);
- taskInstance = new TaskInstance();
-
- Map<String, Object> taskParamsMap = new HashMap<>();
- taskParamsMap.put(Constants.SWITCH_RESULT, "");
- taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
- taskInstance.setState(TaskExecutionStatus.FAILURE);
- completeTaskList.put(3L, taskInstance);
- postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag,
completeTaskList);
- Assertions.assertEquals(1, postNodes.size());
- Assertions.assertTrue(postNodes.contains(6L));
- Assertions.assertEquals(2, skipNodeList.size());
- Assertions.assertTrue(skipNodeList.containsKey(5L));
- Assertions.assertTrue(skipNodeList.containsKey(7L));
-
- // dag: 1-2-3-5-7 4-3-6
- // 3-if , complete:1/2/3/4
- // 1.failure:3 expect post:6 skip:5/7
- dag = generateDag2();
- skipNodeList.clear();
- completeTaskList.clear();
- taskInstance.setSwitchDependency(getSwitchNode());
- completeTaskList.put(1L, taskInstance);
- postNodes = DagHelper.parsePostNodes(1L, skipNodeList, dag,
completeTaskList);
- Assertions.assertEquals(1, postNodes.size());
}
@Test
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java
index 15141937b0..59b0d08fc5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java
@@ -31,13 +31,9 @@ public class ConditionsParameters extends AbstractParameters
{
// depend node list and state, only need task name
private List<DependentTaskModel> dependTaskList;
- private DependentRelation dependRelation;
+ private DependentRelation relation;
- // node list to run when success
- private List<Long> successNode;
-
- // node list to run when failed
- private List<Long> failedNode;
+ private boolean conditionSuccess;
@Override
public boolean checkParameters() {
@@ -49,11 +45,11 @@ public class ConditionsParameters extends
AbstractParameters {
return new ArrayList<>();
}
- public String getConditionResult() {
- return "{"
- + "\"successNode\": [\"" + successNode.get(0)
- + "\"],\"failedNode\": [\"" + failedNode.get(0)
- + "\"]}";
+ @Data
+ public static class ConditionResult {
+
+ private List<Long> successNode;
+ private List<Long> failedNode;
}
}