This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch upstream-dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 8ce0367bd5b334f835dbcec53e0a4633b37209b4
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Jun 13 22:04:39 2024 +0800

    Fix condition task will cause workflow instance failed
---
 .../dolphinscheduler/dao/entity/TaskInstance.java  | 44 ++++++++++++++++++
 .../master/runner/WorkflowExecuteRunnable.java     | 36 ++++++++++-----
 .../runner/task/condition/ConditionLogicTask.java  | 29 +++++-------
 .../condition/ConditionLogicTaskPluginFactory.java | 11 ++++-
 .../service/alert/ListenerEventAlertManager.java   |  9 +++-
 .../dolphinscheduler/service/utils/DagHelper.java  | 52 +++++++++++-----------
 .../alert/ListenerEventAlertManagerTest.java       | 20 +++++----
 .../service/utils/DagHelperTest.java               | 51 ++++++---------------
 .../task/api/parameters/ConditionsParameters.java  | 18 +++-----
 9 files changed, 157 insertions(+), 113 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 4897449753..6fc14d5cac 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -31,11 +31,15 @@ import 
org.apache.dolphinscheduler.common.enums.TaskExecuteType;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.Serializable;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -192,6 +196,9 @@ public class TaskInstance implements Serializable {
     @TableField(exist = false)
     private DependentParameters dependency;
 
+    @TableField(exist = false)
+    private ConditionsParameters conditionsParameters;
+
     /**
      * switch dependency
      */
@@ -318,6 +325,43 @@ public class TaskInstance implements Serializable {
         this.dependency = dependency;
     }
 
+    public ConditionsParameters getConditionsParameters() {
+        if (this.conditionsParameters == null) {
+            Map<String, Object> taskParamsMap =
+                    JSONUtils.parseObject(this.getTaskParams(), new 
TypeReference<Map<String, Object>>() {
+                    });
+            this.conditionsParameters =
+                    JSONUtils.parseObject((String) 
taskParamsMap.get(Constants.DEPENDENCE), ConditionsParameters.class);
+        }
+        return conditionsParameters;
+    }
+
+    public ConditionsParameters.ConditionResult getConditionResult() {
+        Map<String, Object> taskParamsMap =
+                JSONUtils.parseObject(this.getTaskParams(), new 
TypeReference<Map<String, Object>>() {
+                });
+        String conditionResult = (String) 
taskParamsMap.getOrDefault(Constants.CONDITION_RESULT, "");
+        if (StringUtils.isNotEmpty(conditionResult)) {
+            return JSONUtils.parseObject(conditionResult, new 
TypeReference<ConditionsParameters.ConditionResult>() {
+            });
+        }
+        return null;
+    }
+
+    public void setConditionResult(ConditionsParameters conditionsParameters) {
+        if (conditionsParameters == null) {
+            return;
+        }
+        Map<String, Object> taskParamsMap =
+                JSONUtils.parseObject(this.getTaskParams(), new 
TypeReference<Map<String, Object>>() {
+                });
+        if (taskParamsMap == null) {
+            taskParamsMap = new HashMap<>();
+        }
+        taskParamsMap.put(Constants.CONDITION_RESULT, 
JSONUtils.toJsonString(conditionsParameters));
+        this.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
+    }
+
     public SwitchParameters getSwitchDependency() {
         // todo: We need to directly use Jackson to deserialize the taskParam, 
rather than parse the map and get from
         // field.
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 919ca844c4..21a23fbe5b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -385,18 +385,19 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                 // retry task
                 log.info("Retry taskInstance taskInstance state: {}", 
taskInstance.getState());
                 retryTaskInstance(taskInstance);
-            } else if (taskInstance.getState().isFailure()) {
+            } else if (taskInstance.getState().isFailure() || 
taskInstance.getState().isKill()
+                    || taskInstance.getState().isStop()) {
                 completeTaskSet.add(taskInstance.getTaskCode());
-                ProjectUser projectUser =
-                        
processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId());
-                
listenerEventAlertManager.publishTaskFailListenerEvent(workflowInstance, 
taskInstance, projectUser);
+                
listenerEventAlertManager.publishTaskFailListenerEvent(workflowInstance, 
taskInstance);
+                if (isTaskNeedPutIntoErrorMap(taskInstance)) {
+                    errorTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
+                }
                 // There are child nodes and the failure policy is: CONTINUE
                 if (workflowInstance.getFailureStrategy() == 
FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
                         taskInstance.getTaskCode(),
                         workflowExecuteContext.getWorkflowGraph().getDag())) {
                     submitPostNode(taskInstance.getTaskCode());
                 } else {
-                    errorTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
                     if (workflowInstance.getFailureStrategy() == 
FailureStrategy.END) {
                         killAllTasks();
                     }
@@ -805,10 +806,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                         completeTaskSet.add(task.getTaskCode());
                         continue;
                     }
-                    if (task.isConditionsTask() || 
DagHelper.haveConditionsAfterNode(task.getTaskCode(),
-                            
workflowExecuteContext.getWorkflowGraph().getDag())) {
-                        continue;
-                    }
+
                     if (task.taskCanRetry()) {
                         if (task.getState().isNeedFaultTolerance()) {
                             log.info("TaskInstance needs fault tolerance, will 
be added to standby list.");
@@ -824,7 +822,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                         }
                         continue;
                     }
-                    if (task.getState().isFailure()) {
+                    if (isTaskNeedPutIntoErrorMap(task)) {
                         errorTaskMap.put(task.getTaskCode(), task.getId());
                     }
                 } finally {
@@ -2015,6 +2013,24 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
         }
     }
 
+    /**
+     * Whether the task instance need to put into {@link #errorTaskMap}.
+     * Only the task instance is failed or killed, and it is parent of 
condition task.
+     * Then it should be put into {@link #errorTaskMap}.
+     * <p> Once a task instance is put into {@link #errorTaskMap}, it will be 
thought as failed and make the workflow be failed.
+     */
+    private boolean isTaskNeedPutIntoErrorMap(TaskInstance taskInstance) {
+        if (!taskInstance.getState().isFailure() && 
!taskInstance.getState().isStop()
+                && !taskInstance.getState().isKill()) {
+            return false;
+        }
+        TaskNode taskNode = 
workflowExecuteContext.getWorkflowGraph().getTaskNodeByCode(taskInstance.getTaskCode());
+        if (DagHelper.haveConditionsAfterNode(taskNode.getCode(), 
workflowExecuteContext.getWorkflowGraph().getDag())) {
+            return false;
+        }
+        return true;
+    }
+
     private enum WorkflowRunnableStatus {
         CREATED, INITIALIZE_QUEUE, STARTED,
         ;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java
index 803a8043ff..b54f10d48d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java
@@ -25,10 +25,8 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
-import 
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
 import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
-import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import 
org.apache.dolphinscheduler.server.master.exception.LogicTaskInitializeException;
 import org.apache.dolphinscheduler.server.master.runner.task.BaseSyncLogicTask;
 
 import java.util.List;
@@ -40,37 +38,34 @@ import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class ConditionLogicTask extends BaseSyncLogicTask<DependentParameters> 
{
+public class ConditionLogicTask extends 
BaseSyncLogicTask<ConditionsParameters> {
 
     public static final String TASK_TYPE = "CONDITIONS";
 
     private final TaskInstanceDao taskInstanceDao;
     private final ProcessInstanceDao workflowInstanceDao;
 
+    private final TaskInstance taskInstance;
+
     public ConditionLogicTask(TaskExecutionContext taskExecutionContext,
-                              ProcessInstanceExecCacheManager 
processInstanceExecCacheManager,
+                              TaskInstance taskInstance,
                               TaskInstanceDao taskInstanceDao,
-                              ProcessInstanceDao workflowInstanceDao) throws 
LogicTaskInitializeException {
+                              ProcessInstanceDao workflowInstanceDao) {
         // todo: we need to change the parameter in front-end, so that we can 
directly use json to parse
-        super(taskExecutionContext,
-                
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId())
-                        
.getTaskInstance(taskExecutionContext.getTaskInstanceId())
-                        .orElseThrow(() -> new LogicTaskInitializeException(
-                                "Cannot find the task instance in workflow 
execute runnable"))
-                        .getDependency());
-        // todo:check the parameters, why we don't use conditionTask? 
taskInstance.getDependency();
+        super(taskExecutionContext, taskInstance.getConditionsParameters());
         this.taskInstanceDao = taskInstanceDao;
         this.workflowInstanceDao = workflowInstanceDao;
+        this.taskInstance = taskInstance;
     }
 
     @Override
     public void handle() {
         // calculate the conditionResult
         DependResult conditionResult = calculateConditionResult();
-        TaskExecutionStatus taskExecutionStatus =
-                (conditionResult == DependResult.SUCCESS) ? 
TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
-        log.info("The condition result is {}, task instance statue will be: 
{}", conditionResult, taskExecutionStatus);
-        taskExecutionContext.setCurrentExecutionStatus(taskExecutionStatus);
+        log.info("The condition result is {}", conditionResult);
+        taskParameters.setConditionSuccess(conditionResult == 
DependResult.SUCCESS);
+        taskInstance.setConditionsParameters(taskParameters);
+        
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
     }
 
     private DependResult calculateConditionResult() {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java
index d6887df6b5..4aee27f36d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task.condition;
 
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -43,7 +44,15 @@ public class ConditionLogicTaskPluginFactory implements 
ILogicTaskPluginFactory<
 
     @Override
     public ConditionLogicTask createLogicTask(TaskExecutionContext 
taskExecutionContext) throws LogicTaskInitializeException {
-        return new ConditionLogicTask(taskExecutionContext, 
processInstanceExecCacheManager, taskInstanceDao,
+        TaskInstance taskInstance =
+                
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId())
+                        
.getTaskInstance(taskExecutionContext.getTaskInstanceId())
+                        .orElseThrow(() -> new LogicTaskInitializeException(
+                                "Cannot find the task instance in workflow 
execute runnable"));
+        return new ConditionLogicTask(
+                taskExecutionContext,
+                taskInstance,
+                taskInstanceDao,
                 processInstanceDao);
     }
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java
index 182ed678a0..3056a14f5b 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java
@@ -43,6 +43,7 @@ import 
org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent;
 import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent;
 import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.collections4.CollectionUtils;
@@ -71,6 +72,9 @@ public class ListenerEventAlertManager {
     @Autowired
     private AlertPluginInstanceMapper alertPluginInstanceMapper;
 
+    @Autowired
+    private ProcessService processService;
+
     public void publishServerDownListenerEvent(String host, String type) {
         ServerDownListenerEvent event = new ServerDownListenerEvent();
         event.setEventTime(new Date());
@@ -214,8 +218,9 @@ public class ListenerEventAlertManager {
     }
 
     public void publishTaskFailListenerEvent(ProcessInstance processInstance,
-                                             TaskInstance taskInstance,
-                                             ProjectUser projectUser) {
+                                             TaskInstance taskInstance) {
+        ProjectUser projectUser = 
processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+
         TaskFailListenerEvent event = new TaskFailListenerEvent();
         event.setProjectCode(projectUser.getProjectCode());
         event.setProjectName(projectUser.getProjectName());
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
index ee5e97cf55..4a6235c0e0 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
@@ -76,10 +76,10 @@ public class DagHelper {
     /**
      * generate task nodes needed by dag
      *
-     * @param taskNodeList taskNodeList
-     * @param startNodeNameList startNodeNameList
+     * @param taskNodeList         taskNodeList
+     * @param startNodeNameList    startNodeNameList
      * @param recoveryNodeCodeList recoveryNodeCodeList
-     * @param taskDependType taskDependType
+     * @param taskDependType       taskDependType
      * @return task node list
      */
     public static List<TaskNode> 
generateFlowNodeListByStartNode(List<TaskNode> taskNodeList,
@@ -139,7 +139,7 @@ public class DagHelper {
     /**
      * find all the nodes that depended on the start node
      *
-     * @param startNode startNode
+     * @param startNode    startNode
      * @param taskNodeList taskNodeList
      * @return task node list
      */
@@ -166,9 +166,9 @@ public class DagHelper {
     /**
      * find all nodes that start nodes depend on.
      *
-     * @param startNode startNode
+     * @param startNode            startNode
      * @param recoveryNodeCodeList recoveryNodeCodeList
-     * @param taskNodeList taskNodeList
+     * @param taskNodeList         taskNodeList
      * @return task node list
      */
     private static List<TaskNode> getFlowNodeListPre(TaskNode startNode,
@@ -204,10 +204,10 @@ public class DagHelper {
     /**
      * generate dag by start nodes and recovery nodes
      *
-     * @param totalTaskNodeList totalTaskNodeList
-     * @param startNodeNameList startNodeNameList
+     * @param totalTaskNodeList    totalTaskNodeList
+     * @param startNodeNameList    startNodeNameList
      * @param recoveryNodeCodeList recoveryNodeCodeList
-     * @param depNodeType depNodeType
+     * @param depNodeType          depNodeType
      * @return process dag
      * @throws Exception if error throws Exception
      */
@@ -232,7 +232,7 @@ public class DagHelper {
      * find node by node name
      *
      * @param nodeDetails nodeDetails
-     * @param nodeName nodeName
+     * @param nodeName    nodeName
      * @return task node
      */
     public static TaskNode findNodeByName(List<TaskNode> nodeDetails, String 
nodeName) {
@@ -248,7 +248,7 @@ public class DagHelper {
      * find node by node code
      *
      * @param nodeDetails nodeDetails
-     * @param nodeCode nodeCode
+     * @param nodeCode    nodeCode
      * @return task node
      */
     public static TaskNode findNodeByCode(List<TaskNode> nodeDetails, Long 
nodeCode) {
@@ -263,8 +263,8 @@ public class DagHelper {
     /**
      * the task can be submit when  all the depends nodes are forbidden or 
complete
      *
-     * @param taskNode taskNode
-     * @param dag dag
+     * @param taskNode         taskNode
+     * @param dag              dag
      * @param completeTaskList completeTaskList
      * @return can submit
      */
@@ -369,22 +369,20 @@ public class DagHelper {
             return conditionTaskList;
         }
         TaskInstance taskInstance = completeTaskList.get(nodeCode);
-        ConditionsParameters conditionsParameters =
-                JSONUtils.parseObject(taskNode.getConditionResult(), 
ConditionsParameters.class);
+        ConditionsParameters conditionsParameters = 
taskInstance.getConditionsParameters();
+        ConditionsParameters.ConditionResult conditionResult = 
taskInstance.getConditionResult();
+
         List<Long> skipNodeList = new ArrayList<>();
-        if (taskInstance.getState().isSuccess()) {
-            conditionTaskList = conditionsParameters.getSuccessNode();
-            skipNodeList = conditionsParameters.getFailedNode();
-        } else if (taskInstance.getState().isFailure()) {
-            conditionTaskList = conditionsParameters.getFailedNode();
-            skipNodeList = conditionsParameters.getSuccessNode();
+        if (conditionsParameters.isConditionSuccess()) {
+            conditionTaskList = conditionResult.getSuccessNode();
+            skipNodeList = conditionResult.getFailedNode();
         } else {
-            conditionTaskList.add(nodeCode);
+            conditionTaskList = conditionResult.getFailedNode();
+            skipNodeList = conditionResult.getSuccessNode();
         }
-        // the skipNodeList maybe null if no next task
-        skipNodeList = Optional.ofNullable(skipNodeList).orElse(new 
ArrayList<>());
-        for (Long failedNode : skipNodeList) {
-            setTaskNodeSkip(failedNode, dag, completeTaskList, 
skipTaskNodeList);
+
+        if (CollectionUtils.isNotEmpty(skipNodeList)) {
+            skipNodeList.forEach(skipNode -> setTaskNodeSkip(skipNode, dag, 
completeTaskList, skipTaskNodeList));
         }
         // the conditionTaskList maybe null if no next task
         conditionTaskList = Optional.ofNullable(conditionTaskList).orElse(new 
ArrayList<>());
@@ -447,6 +445,7 @@ public class DagHelper {
 
     /**
      * get all downstream nodes of the branch that the switch node needs to 
execute
+     *
      * @param taskCode
      * @param dag
      * @param switchNeedWorkCodes
@@ -480,6 +479,7 @@ public class DagHelper {
             }
         }
     }
+
     /**
      * set task node and the post nodes skip flag
      */
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java
index 981b0a8050..4ba958e71d 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.service.alert;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -40,8 +42,6 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * ProcessAlertManager Test
@@ -49,8 +49,6 @@ import org.slf4j.LoggerFactory;
 @ExtendWith(MockitoExtension.class)
 public class ListenerEventAlertManagerTest {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ListenerEventAlertManagerTest.class);
-
     @InjectMocks
     ListenerEventAlertManager listenerEventAlertManager;
 
@@ -60,6 +58,9 @@ public class ListenerEventAlertManagerTest {
     @Mock
     ListenerEventMapper listenerEventMapper;
 
+    @Mock
+    ProcessService processService;
+
     @Test
     public void sendServerDownListenerEventTest() {
         String host = "127.0.0.1";
@@ -67,7 +68,7 @@ public class ListenerEventAlertManagerTest {
         List<AlertPluginInstance> globalPluginInstanceList = new ArrayList<>();
         AlertPluginInstance instance = new AlertPluginInstance(1, 
"instanceParams", "instanceName");
         globalPluginInstanceList.add(instance);
-        
Mockito.when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
+        when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
                 .thenReturn(globalPluginInstanceList);
         
Mockito.doNothing().when(listenerEventMapper).insertServerDownEvent(any(), 
any());
         listenerEventAlertManager.publishServerDownListenerEvent(host, type);
@@ -82,9 +83,9 @@ public class ListenerEventAlertManagerTest {
         AlertPluginInstance instance = new AlertPluginInstance(1, 
"instanceParams", "instanceName");
         List<AlertPluginInstance> globalPluginInstanceList = new ArrayList<>();
         globalPluginInstanceList.add(instance);
-        
Mockito.when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
+        when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
                 .thenReturn(globalPluginInstanceList);
-        Mockito.when(listenerEventMapper.insert(any())).thenReturn(1);
+        when(listenerEventMapper.insert(any())).thenReturn(1);
         
listenerEventAlertManager.publishProcessDefinitionCreatedListenerEvent(user, 
processDefinition,
                 taskDefinitionLogs, processTaskRelationLogs);
     }
@@ -142,7 +143,8 @@ public class ListenerEventAlertManagerTest {
     public void sendTaskFailListenerEvent() {
         ProcessInstance processInstance = Mockito.mock(ProcessInstance.class);
         TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
-        ProjectUser projectUser = Mockito.mock(ProjectUser.class);
-        
listenerEventAlertManager.publishTaskFailListenerEvent(processInstance, 
taskInstance, projectUser);
+        
when(processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()))
+                .thenReturn(new ProjectUser());
+        
listenerEventAlertManager.publishTaskFailListenerEvent(processInstance, 
taskInstance);
     }
 }
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
index c19812303c..a36c414272 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
 import org.apache.dolphinscheduler.service.model.TaskNode;
 import org.apache.dolphinscheduler.service.process.ProcessDag;
@@ -259,9 +260,14 @@ public class DagHelperTest {
         completeTaskList.put(1L, new TaskInstance());
         completeTaskList.put(2L, new TaskInstance());
         completeTaskList.put(4L, new TaskInstance());
-        TaskNode node3 = dag.getNode(3L);
-        node3.setType(TASK_TYPE_CONDITIONS);
-        node3.setConditionResult("{\n"
+
+        TaskInstance taskInstance3 = new TaskInstance();
+        taskInstance3.setTaskType(TASK_TYPE_CONDITIONS);
+        Map<String, Object> params = new HashMap<>();
+        ConditionsParameters conditionsParameters = new ConditionsParameters();
+        conditionsParameters.setConditionSuccess(true);
+        params.put(Constants.DEPENDENCE, "{\"conditionSuccess\": true}");
+        params.put(Constants.CONDITION_RESULT, "{\n"
                 +
                 "                \"successNode\": [5\n"
                 +
@@ -272,11 +278,12 @@ public class DagHelperTest {
                 "                ]\n"
                 +
                 "            }");
-        completeTaskList.remove(3L);
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setState(TaskExecutionStatus.SUCCESS);
+        taskInstance3.setTaskParams(JSONUtils.toJsonString(params));
+        taskInstance3.setState(TaskExecutionStatus.SUCCESS);
+        TaskNode node3 = dag.getNode(3L);
+        node3.setType(TASK_TYPE_CONDITIONS);
         // complete 1/2/3/4 expect:8
-        completeTaskList.put(3L, taskInstance);
+        completeTaskList.put(3L, taskInstance3);
         postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, 
completeTaskList);
         Assertions.assertEquals(1, postNodes.size());
         Assertions.assertTrue(postNodes.contains(8L));
@@ -291,7 +298,6 @@ public class DagHelperTest {
         // 3.complete 1/2/3/4/5/8 expect post:7 skip:6
         skipNodeList.clear();
         TaskInstance taskInstance1 = new TaskInstance();
-        taskInstance.setState(TaskExecutionStatus.SUCCESS);
         completeTaskList.put(5L, taskInstance1);
         postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, 
completeTaskList);
         Assertions.assertEquals(1, postNodes.size());
@@ -299,35 +305,6 @@ public class DagHelperTest {
         Assertions.assertEquals(1, skipNodeList.size());
         Assertions.assertTrue(skipNodeList.containsKey(6L));
 
-        // dag: 1-2-3-5-7 4-3-6
-        // 3-if , complete:1/2/3/4
-        // 1.failure:3 expect post:6 skip:5/7
-        skipNodeList.clear();
-        completeTaskList.remove(3L);
-        taskInstance = new TaskInstance();
-
-        Map<String, Object> taskParamsMap = new HashMap<>();
-        taskParamsMap.put(Constants.SWITCH_RESULT, "");
-        taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
-        taskInstance.setState(TaskExecutionStatus.FAILURE);
-        completeTaskList.put(3L, taskInstance);
-        postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, 
completeTaskList);
-        Assertions.assertEquals(1, postNodes.size());
-        Assertions.assertTrue(postNodes.contains(6L));
-        Assertions.assertEquals(2, skipNodeList.size());
-        Assertions.assertTrue(skipNodeList.containsKey(5L));
-        Assertions.assertTrue(skipNodeList.containsKey(7L));
-
-        // dag: 1-2-3-5-7 4-3-6
-        // 3-if , complete:1/2/3/4
-        // 1.failure:3 expect post:6 skip:5/7
-        dag = generateDag2();
-        skipNodeList.clear();
-        completeTaskList.clear();
-        taskInstance.setSwitchDependency(getSwitchNode());
-        completeTaskList.put(1L, taskInstance);
-        postNodes = DagHelper.parsePostNodes(1L, skipNodeList, dag, 
completeTaskList);
-        Assertions.assertEquals(1, postNodes.size());
     }
 
     @Test
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java
index 15141937b0..59b0d08fc5 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java
@@ -31,13 +31,9 @@ public class ConditionsParameters extends AbstractParameters 
{
 
     // depend node list and state, only need task name
     private List<DependentTaskModel> dependTaskList;
-    private DependentRelation dependRelation;
+    private DependentRelation relation;
 
-    // node list to run when success
-    private List<Long> successNode;
-
-    // node list to run when failed
-    private List<Long> failedNode;
+    private boolean conditionSuccess;
 
     @Override
     public boolean checkParameters() {
@@ -49,11 +45,11 @@ public class ConditionsParameters extends 
AbstractParameters {
         return new ArrayList<>();
     }
 
-    public String getConditionResult() {
-        return "{"
-                + "\"successNode\": [\"" + successNode.get(0)
-                + "\"],\"failedNode\": [\"" + failedNode.get(0)
-                + "\"]}";
+    @Data
+    public static class ConditionResult {
+
+        private List<Long> successNode;
+        private List<Long> failedNode;
     }
 
 }

Reply via email to