This is an automated email from the ASF dual-hosted git repository.

xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 465e7ae6ee [Bug] [Master] Workflow keep running when task has no id 
(#14315)
465e7ae6ee is described below

commit 465e7ae6ee44d50f6f319ddfaab82bdf77fec0e5
Author: 旺阳 <[email protected]>
AuthorDate: Wed Jun 14 12:53:16 2023 +0800

    [Bug] [Master] Workflow keep running when task has no id (#14315)
    
    * change completeTaskMap to completeTaskSet
    
    * Update 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
    
    Co-authored-by: Wenjun Ruan <[email protected]>
    
    * change method name
    
    * fix ut
    
    * fix spotless
    
    ---------
    
    Co-authored-by: Wenjun Ruan <[email protected]>
---
 .../server/master/event/TaskStateEventHandler.java |   7 +-
 .../master/runner/WorkflowExecuteRunnable.java     | 142 +++++++++++++--------
 .../master/runner/WorkflowExecuteRunnableTest.java |  32 ++---
 3 files changed, 106 insertions(+), 75 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
index 524d0d4a6d..cf9fc262ad 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
@@ -23,8 +23,8 @@ import 
org.apache.dolphinscheduler.remote.exceptions.RemotingException;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 
-import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -55,7 +55,7 @@ public class TaskStateEventHandler implements 
StateEventHandler {
                 "Handle task instance state event, the current task instance 
state {} will be changed to {}",
                 task.getState().name(), taskStateEvent.getStatus().name());
 
-        Map<Long, Integer> completeTaskMap = 
workflowExecuteRunnable.getCompleteTaskMap();
+        Set<Long> completeTaskSet = 
workflowExecuteRunnable.getCompleteTaskCodes();
         if (task.getState().isFinished()
                 && (taskStateEvent.getStatus() != null && 
taskStateEvent.getStatus().isRunning())) {
             String errorMessage = String.format(
@@ -67,8 +67,7 @@ public class TaskStateEventHandler implements 
StateEventHandler {
         }
 
         if (task.getState().isFinished()) {
-            if (completeTaskMap.containsKey(task.getTaskCode())
-                    && 
completeTaskMap.get(task.getTaskCode()).equals(task.getId())) {
+            if (completeTaskSet.contains(task.getTaskCode())) {
                 log.warn("The task instance is already complete, stateEvent: 
{}", stateEvent);
                 return true;
             }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 1564adbc14..88a1937b59 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -199,10 +199,10 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
     private final Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
 
     /**
-     * complete task map, taskCode as key, taskInstanceId as value
+     * complete task set
      * in a DAG, only one taskInstance per taskCode is valid
      */
-    private final Map<Long, Integer> completeTaskMap = new 
ConcurrentHashMap<>();
+    private final Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
 
     /**
      * depend failed task set
@@ -444,7 +444,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
             stateWheelExecuteThread.removeTask4RetryCheck(processInstance, 
taskInstance);
 
             if (taskInstance.getState().isSuccess()) {
-                completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
+                completeTaskSet.add(taskInstance.getTaskCode());
                 mergeTaskInstanceVarPool(taskInstance);
                 processInstanceDao.upsertProcessInstance(processInstance);
                 // save the cacheKey only if the task is defined as cache task 
and the task is success
@@ -459,7 +459,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
                 log.info("Retry taskInstance taskInstance state: {}", 
taskInstance.getState());
                 retryTaskInstance(taskInstance);
             } else if (taskInstance.getState().isFailure()) {
-                completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
+                completeTaskSet.add(taskInstance.getTaskCode());
                 // There are child nodes and the failure policy is: CONTINUE
                 if (processInstance.getFailureStrategy() == 
FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
                         Long.toString(taskInstance.getTaskCode()),
@@ -472,8 +472,8 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
                     }
                 }
             } else if (taskInstance.getState().isFinished()) {
-                // todo: when the task instance type is pause, then it should 
not in completeTaskMap
-                completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
+                // todo: when the task instance type is pause, then it should 
not in completeTaskSet
+                completeTaskSet.add(taskInstance.getTaskCode());
             }
             log.info("TaskInstance finished will try to update the workflow 
instance state, task code:{} state:{}",
                     taskInstance.getTaskCode(),
@@ -482,9 +482,9 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
 
             sendTaskLogOnMasterToRemoteIfNeeded(taskInstance.getLogPath(), 
taskInstance.getHost());
         } catch (Exception ex) {
-            log.error("Task finish failed, get a exception, will remove this 
taskInstance from completeTaskMap", ex);
+            log.error("Task finish failed, get a exception, will remove this 
taskInstance from completeTaskSet", ex);
             // remove the task from complete map, so that we can finish in the 
next time.
-            completeTaskMap.remove(taskInstance.getTaskCode());
+            completeTaskSet.remove(taskInstance.getTaskCode());
             throw ex;
         }
     }
@@ -873,7 +873,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
         // do we need to clear?
         taskExecuteRunnableMap.clear();
         dependFailedTaskSet.clear();
-        completeTaskMap.clear();
+        completeTaskSet.clear();
         errorTaskMap.clear();
 
         if (!isNewProcessInstance()) {
@@ -909,7 +909,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
 
                     if (task.isTaskComplete()) {
                         log.info("TaskInstance is already complete.");
-                        completeTaskMap.put(task.getTaskCode(), task.getId());
+                        completeTaskSet.add(task.getTaskCode());
                         continue;
                     }
                     if (task.isConditionsTask() || 
DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()),
@@ -980,9 +980,9 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
                 }
             }
         }
-        log.info("Initialize task queue, dependFailedTaskSet: {}, 
completeTaskMap: {}, errorTaskMap: {}",
+        log.info("Initialize task queue, dependFailedTaskSet: {}, 
completeTaskSet: {}, errorTaskMap: {}",
                 dependFailedTaskSet,
-                completeTaskMap,
+                completeTaskSet,
                 errorTaskMap);
     }
 
@@ -1236,7 +1236,12 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
         Map<String, TaskInstance> allTaskInstance = new HashMap<>();
         if (CollectionUtils.isNotEmpty(preTask)) {
             for (String preTaskCode : preTask) {
-                Integer taskId = 
completeTaskMap.get(Long.parseLong(preTaskCode));
+                Optional<TaskInstance> existTaskInstanceOptional = 
getTaskInstance(Long.parseLong(preTaskCode));
+                if (!existTaskInstanceOptional.isPresent()) {
+                    continue;
+                }
+
+                Integer taskId = existTaskInstanceOptional.get().getId();
                 if (taskId == null) {
                     continue;
                 }
@@ -1303,20 +1308,21 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
      */
     private Map<String, TaskInstance> getCompleteTaskInstanceMap() {
         Map<String, TaskInstance> completeTaskInstanceMap = new HashMap<>();
-        for (Map.Entry<Long, Integer> entry : completeTaskMap.entrySet()) {
-            Long taskConde = entry.getKey();
-            Integer taskInstanceId = entry.getValue();
-            TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
-            if (taskInstance == null) {
-                log.warn("Cannot find the taskInstance from taskInstanceMap, 
taskInstanceId: {}, taskConde: {}",
-                        taskInstanceId,
-                        taskConde);
-                // This case will happen when we submit to db failed, then the 
taskInstanceId is 0
-                continue;
+
+        completeTaskSet.forEach(taskCode -> {
+            Optional<TaskInstance> existTaskInstanceOptional = 
getTaskInstance(taskCode);
+            if (existTaskInstanceOptional.isPresent()) {
+                TaskInstance taskInstance = 
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
+                if (taskInstance == null) {
+                    // This case will happen when we submit to db failed, then 
the taskInstanceId is 0
+                    log.warn("Cannot find the taskInstance from 
taskInstanceMap, taskConde: {}",
+                            taskCode);
+                } else {
+                    completeTaskInstanceMap.put(Long.toString(taskCode), 
taskInstance);
+                }
             }
-            
completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), 
taskInstance);
+        });
 
-        }
         return completeTaskInstanceMap;
     }
 
@@ -1364,17 +1370,21 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
         }
         // the end node of the branch of the dag
         if (StringUtils.isNotEmpty(parentNodeCode) && 
dag.getEndNode().contains(parentNodeCode)) {
-            TaskInstance endTaskInstance = 
taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode)));
-            String taskInstanceVarPool = endTaskInstance.getVarPool();
-            if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
-                Set<Property> taskProperties = new 
HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
-                String processInstanceVarPool = processInstance.getVarPool();
-                if (StringUtils.isNotEmpty(processInstanceVarPool)) {
-                    Set<Property> properties = new 
HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
-                    properties.addAll(taskProperties);
-                    
processInstance.setVarPool(JSONUtils.toJsonString(properties));
-                } else {
-                    
processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
+            Optional<TaskInstance> existTaskInstanceOptional = 
getTaskInstance(NumberUtils.toLong(parentNodeCode));
+            if (existTaskInstanceOptional.isPresent()) {
+                TaskInstance endTaskInstance = 
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
+                String taskInstanceVarPool = endTaskInstance.getVarPool();
+                if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
+                    Set<Property> taskProperties = new 
HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
+                    String processInstanceVarPool = 
processInstance.getVarPool();
+                    if (StringUtils.isNotEmpty(processInstanceVarPool)) {
+                        Set<Property> properties =
+                                new 
HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
+                        properties.addAll(taskProperties);
+                        
processInstance.setVarPool(JSONUtils.toJsonString(properties));
+                    } else {
+                        
processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
+                    }
                 }
             }
         }
@@ -1387,7 +1397,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
                 continue;
             }
 
-            if (task.getId() != null && 
completeTaskMap.containsKey(task.getTaskCode())) {
+            if (task.getId() != null && 
completeTaskSet.contains(task.getTaskCode())) {
                 log.info("Task has already run success, taskName: {}", 
task.getName());
                 continue;
             }
@@ -1459,12 +1469,18 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
         for (String depsNode : indirectDepCodeList) {
             if (dag.containsNode(depsNode) && 
!skipTaskNodeMap.containsKey(depsNode)) {
                 // dependencies must be fully completed
-                Long despNodeTaskCode = Long.parseLong(depsNode);
-                if (!completeTaskMap.containsKey(despNodeTaskCode)) {
+                long despNodeTaskCode = Long.parseLong(depsNode);
+                if (!completeTaskSet.contains(despNodeTaskCode)) {
                     return DependResult.WAITING;
                 }
-                Integer depsTaskId = completeTaskMap.get(despNodeTaskCode);
-                TaskExecutionStatus depTaskState = 
taskInstanceMap.get(depsTaskId).getState();
+
+                Optional<TaskInstance> existTaskInstanceOptional = 
getTaskInstance(despNodeTaskCode);
+                if (!existTaskInstanceOptional.isPresent()) {
+                    return DependResult.NON_EXEC;
+                }
+
+                TaskExecutionStatus depTaskState =
+                        
taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState();
                 if (depTaskState.isKill()) {
                     return DependResult.NON_EXEC;
                 }
@@ -1484,7 +1500,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
             }
         }
         log.info("The dependTasks of task all success, currentTaskCode: {}, 
dependTaskCodes: {}",
-                taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
+                taskCode, Arrays.toString(completeTaskSet.toArray()));
         return DependResult.SUCCESS;
     }
 
@@ -1535,8 +1551,12 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
                     .contains(nextNodeCode);
         }
         long taskCode = Long.parseLong(dependNodeCode);
-        Integer taskInstanceId = completeTaskMap.get(taskCode);
-        TaskExecutionStatus depTaskState = 
taskInstanceMap.get(taskInstanceId).getState();
+        Optional<TaskInstance> existTaskInstanceOptional = 
getTaskInstance(taskCode);
+        if (!existTaskInstanceOptional.isPresent()) {
+            return false;
+        }
+
+        TaskExecutionStatus depTaskState = 
taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState();
         return !depTaskState.isFailure();
     }
 
@@ -1548,12 +1568,17 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
      */
     private List<TaskInstance> getCompleteTaskByState(TaskExecutionStatus 
state) {
         List<TaskInstance> resultList = new ArrayList<>();
-        for (Integer taskInstanceId : completeTaskMap.values()) {
-            TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
-            if (taskInstance != null && taskInstance.getState() == state) {
-                resultList.add(taskInstance);
+
+        completeTaskSet.forEach(taskCode -> {
+            Optional<TaskInstance> existTaskInstanceOptional = 
getTaskInstance(taskCode);
+            if (existTaskInstanceOptional.isPresent()) {
+                TaskInstance taskInstance = 
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
+                if (taskInstance != null && taskInstance.getState() == state) {
+                    resultList.add(taskInstance);
+                }
             }
-        }
+        });
+
         return resultList;
     }
 
@@ -1905,7 +1930,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
                             "Task {} has been forced success, put it into 
complete task list and stop retrying, taskInstanceId: {}",
                             task.getName(), task.getId());
                     removeTaskFromStandbyList(task);
-                    completeTaskMap.put(task.getTaskCode(), task.getId());
+                    completeTaskSet.add(task.getTaskCode());
                     taskInstanceMap.put(task.getId(), task);
                     submitPostNode(Long.toString(task.getTaskCode()));
                     continue;
@@ -1931,7 +1956,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
                                 processInstance.getId(),
                                 task.getTaskCode());
                     }
-                    completeTaskMap.put(task.getTaskCode(), task.getId());
+                    completeTaskSet.add(task.getTaskCode());
                     taskInstanceMap.put(task.getId(), task);
                     errorTaskMap.put(task.getTaskCode(), task.getId());
 
@@ -2066,8 +2091,8 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
         return false;
     }
 
-    public Map<Long, Integer> getCompleteTaskMap() {
-        return completeTaskMap;
+    public Set<Long> getCompleteTaskCodes() {
+        return completeTaskSet;
     }
 
     public Map<Long, DefaultTaskExecuteRunnable> getTaskExecuteRunnableMap() {
@@ -2117,7 +2142,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
      * 1. find all task code from sub dag (only contains related task)
      * 2. set the flag of tasks to Flag.NO
      * 3. clear varPool data from re-execute task instance in process instance
-     * 4. remove related task instance from taskInstanceMap, completeTaskMap, 
validTaskMap, errorTaskMap
+     * 4. remove related task instance from taskInstanceMap, completeTaskSet, 
validTaskMap, errorTaskMap
      *
      * @return task instance
      */
@@ -2176,9 +2201,14 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
         processInstance.setVarPool(JSONUtils.toJsonString(processProperties));
         processInstanceDao.updateById(processInstance);
 
-        // remove task instance from taskInstanceMap, completeTaskMap, 
validTaskMap, errorTaskMap
+        // remove task instance from taskInstanceMap, completeTaskSet, 
validTaskMap, errorTaskMap
+        // completeTaskSet remove dependency taskInstanceMap, so the sort 
can't change
+        completeTaskSet.removeIf(set -> {
+            Optional<TaskInstance> existTaskInstanceOptional = 
getTaskInstance(set);
+            return existTaskInstanceOptional
+                    .filter(taskInstance -> 
dag.containsNode(Integer.toString(taskInstance.getId()))).isPresent();
+        });
         taskInstanceMap.entrySet().removeIf(map -> 
dag.containsNode(Long.toString(map.getValue().getTaskCode())));
-        completeTaskMap.entrySet().removeIf(map -> 
dag.containsNode(Long.toString(map.getKey())));
         validTaskMap.entrySet().removeIf(map -> 
dag.containsNode(Long.toString(map.getKey())));
         errorTaskMap.entrySet().removeIf(map -> 
dag.containsNode(Long.toString(map.getKey())));
     }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index f224ff36c6..26b637fd1c 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -69,6 +69,8 @@ import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 import org.springframework.context.ApplicationContext;
 
+import com.google.common.collect.Sets;
+
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
 public class WorkflowExecuteRunnableTest {
@@ -207,15 +209,15 @@ public class WorkflowExecuteRunnableTest {
             taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
             taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
 
-            Map<Long, Integer> completeTaskList = new ConcurrentHashMap<>();
-            completeTaskList.put(taskInstance1.getTaskCode(), 
taskInstance1.getId());
-            completeTaskList.put(taskInstance2.getTaskCode(), 
taskInstance2.getId());
+            Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
+            completeTaskSet.add(taskInstance1.getTaskCode());
+            completeTaskSet.add(taskInstance2.getTaskCode());
 
             Class<WorkflowExecuteRunnable> masterExecThreadClass = 
WorkflowExecuteRunnable.class;
 
-            Field completeTaskMapField = 
masterExecThreadClass.getDeclaredField("completeTaskMap");
-            completeTaskMapField.setAccessible(true);
-            completeTaskMapField.set(workflowExecuteThread, completeTaskList);
+            Field completeTaskSetField = 
masterExecThreadClass.getDeclaredField("completeTaskSet");
+            completeTaskSetField.setAccessible(true);
+            completeTaskSetField.set(workflowExecuteThread, completeTaskSet);
 
             Field taskInstanceMapField = 
masterExecThreadClass.getDeclaredField("taskInstanceMap");
             taskInstanceMapField.setAccessible(true);
@@ -225,10 +227,10 @@ public class WorkflowExecuteRunnableTest {
             Assertions.assertNotNull(taskInstance.getVarPool());
 
             
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
-            completeTaskList.put(taskInstance2.getTaskCode(), 
taskInstance2.getId());
+            completeTaskSet.add(taskInstance2.getTaskCode());
 
-            completeTaskMapField.setAccessible(true);
-            completeTaskMapField.set(workflowExecuteThread, completeTaskList);
+            completeTaskSetField.setAccessible(true);
+            completeTaskSetField.set(workflowExecuteThread, completeTaskSet);
             taskInstanceMapField.setAccessible(true);
             taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
 
@@ -279,15 +281,15 @@ public class WorkflowExecuteRunnableTest {
         taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
         taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
 
-        Map<Long, Integer> completeTaskList = new ConcurrentHashMap<>();
-        completeTaskList.put(taskInstance1.getTaskCode(), 
taskInstance1.getId());
-        completeTaskList.put(taskInstance2.getTaskCode(), 
taskInstance2.getId());
+        Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
+        completeTaskSet.add(taskInstance1.getTaskCode());
+        completeTaskSet.add(taskInstance2.getTaskCode());
 
         Class<WorkflowExecuteRunnable> masterExecThreadClass = 
WorkflowExecuteRunnable.class;
 
-        Field completeTaskMapField = 
masterExecThreadClass.getDeclaredField("completeTaskMap");
+        Field completeTaskMapField = 
masterExecThreadClass.getDeclaredField("completeTaskSet");
         completeTaskMapField.setAccessible(true);
-        completeTaskMapField.set(workflowExecuteThread, completeTaskList);
+        completeTaskMapField.set(workflowExecuteThread, completeTaskSet);
 
         Field taskInstanceMapField = 
masterExecThreadClass.getDeclaredField("taskInstanceMap");
         taskInstanceMapField.setAccessible(true);
@@ -318,7 +320,7 @@ public class WorkflowExecuteRunnableTest {
         workflowExecuteThread.clearDataIfExecuteTask();
 
         Assertions.assertEquals(1, taskInstanceMap.size());
-        Assertions.assertEquals(1, completeTaskList.size());
+        Assertions.assertEquals(1, completeTaskSet.size());
 
     }
 

Reply via email to