This is an automated email from the ASF dual-hosted git repository.

chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new c812bf9d49 [Bug] [Mater] Not got latest task intance by task code 
(#14529)
c812bf9d49 is described below

commit c812bf9d491de04c7ab2e3b58856a7c02b4a5729
Author: 旺阳 <[email protected]>
AuthorDate: Fri Jul 14 14:51:39 2023 +0800

    [Bug] [Mater] Not got latest task intance by task code (#14529)
    
    * add task code->instance map
    
    ---------
    
    Co-authored-by: Eric Gao <[email protected]>
---
 .../master/runner/WorkflowExecuteRunnable.java     | 41 +++++++++++-----------
 .../master/runner/WorkflowExecuteRunnableTest.java | 16 +++++++++
 2 files changed, 37 insertions(+), 20 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index f7051dbabc..5caab6b709 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -180,6 +180,11 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
      */
     private final Map<Integer, TaskInstance> taskInstanceMap = new 
ConcurrentHashMap<>();
 
+    /**
+     * task instance hash map, taskCode as key
+     */
+    private final Map<Long, TaskInstance> taskCodeInstanceMap = new 
ConcurrentHashMap<>();
+
     /**
      * TaskCode as Key, TaskExecuteRunnable as Value
      */
@@ -570,6 +575,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
         }
         processService.packageTaskInstance(taskInstance, processInstance);
         taskInstanceMap.put(taskInstance.getId(), taskInstance);
+        taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance);
 
         validTaskMap.remove(taskInstance.getTaskCode());
         if (Flag.YES == taskInstance.getFlag()) {
@@ -617,15 +623,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
     }
 
     public Optional<TaskInstance> getTaskInstance(long taskCode) {
-        if (taskInstanceMap.isEmpty()) {
-            return Optional.empty();
-        }
-        for (TaskInstance taskInstance : taskInstanceMap.values()) {
-            if (taskInstance.getTaskCode() == taskCode) {
-                return Optional.of(taskInstance);
-            }
-        }
-        return Optional.empty();
+        return Optional.ofNullable(taskCodeInstanceMap.get(taskCode));
     }
 
     public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long 
taskCode) {
@@ -901,6 +899,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
 
                     validTaskMap.put(task.getTaskCode(), task.getId());
                     taskInstanceMap.put(task.getId(), task);
+                    taskCodeInstanceMap.put(task.getTaskCode(), task);
 
                     if (task.isTaskComplete()) {
                         log.info("TaskInstance is already complete.");
@@ -1012,6 +1011,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
 
                 validTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
                 taskInstanceMap.put(taskInstance.getId(), taskInstance);
+                taskCodeInstanceMap.put(taskInstance.getTaskCode(), 
taskInstance);
                 taskExecuteRunnableMap.put(taskInstance.getTaskCode(), 
taskExecuteRunnable);
                 // 3. acquire the task group.
                 // if we use task group, then need to acquire the task group 
resource
@@ -1307,14 +1307,12 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
         completeTaskSet.forEach(taskCode -> {
             Optional<TaskInstance> existTaskInstanceOptional = 
getTaskInstance(taskCode);
             if (existTaskInstanceOptional.isPresent()) {
-                TaskInstance taskInstance = 
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
-                if (taskInstance == null) {
-                    // This case will happen when we submit to db failed, then 
the taskInstanceId is 0
-                    log.warn("Cannot find the taskInstance from 
taskInstanceMap, taskConde: {}",
-                            taskCode);
-                } else {
-                    completeTaskInstanceMap.put(Long.toString(taskCode), 
taskInstance);
-                }
+                TaskInstance taskInstance = existTaskInstanceOptional.get();
+                completeTaskInstanceMap.put(Long.toString(taskCode), 
taskInstance);
+            } else {
+                // This case will happen when we submit to db failed, then the 
taskInstanceId is 0
+                log.warn("Cannot find the taskInstance from taskInstanceMap, 
taskConde: {}",
+                        taskCode);
             }
         });
 
@@ -1436,6 +1434,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
                     
defaultTaskExecuteRunnableFactory.createTaskExecuteRunnable(taskInstance));
 
             taskInstanceMap.put(taskInstance.getId(), taskInstance);
+            taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance);
             stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, 
taskInstance);
             stateWheelExecuteThread.addTask4RetryCheck(processInstance, 
taskInstance);
             return true;
@@ -1552,7 +1551,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
             return false;
         }
 
-        TaskExecutionStatus depTaskState = 
taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState();
+        TaskExecutionStatus depTaskState = 
existTaskInstanceOptional.get().getState();
         return !depTaskState.isFailure();
     }
 
@@ -1568,8 +1567,8 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
         completeTaskSet.forEach(taskCode -> {
             Optional<TaskInstance> existTaskInstanceOptional = 
getTaskInstance(taskCode);
             if (existTaskInstanceOptional.isPresent()) {
-                TaskInstance taskInstance = 
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
-                if (taskInstance != null && taskInstance.getState() == state) {
+                TaskInstance taskInstance = existTaskInstanceOptional.get();
+                if (taskInstance.getState() == state) {
                     resultList.add(taskInstance);
                 }
             }
@@ -1928,6 +1927,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
                     removeTaskFromStandbyList(task);
                     completeTaskSet.add(task.getTaskCode());
                     taskInstanceMap.put(task.getId(), task);
+                    taskCodeInstanceMap.put(task.getTaskCode(), task);
                     submitPostNode(Long.toString(task.getTaskCode()));
                     continue;
                 }
@@ -1954,6 +1954,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
                     }
                     completeTaskSet.add(task.getTaskCode());
                     taskInstanceMap.put(task.getId(), task);
+                    taskCodeInstanceMap.put(task.getTaskCode(), task);
                     errorTaskMap.put(task.getTaskCode(), task.getId());
 
                     taskExecuteRunnableMap.remove(task.getTaskCode());
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 26b637fd1c..332a2617d7 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -209,6 +209,10 @@ public class WorkflowExecuteRunnableTest {
             taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
             taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
 
+            Map<Long, TaskInstance> taskCodeInstanceMap = new 
ConcurrentHashMap<>();
+            taskCodeInstanceMap.put(taskInstance1.getTaskCode(), 
taskInstance1);
+            taskCodeInstanceMap.put(taskInstance2.getTaskCode(), 
taskInstance2);
+
             Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
             completeTaskSet.add(taskInstance1.getTaskCode());
             completeTaskSet.add(taskInstance2.getTaskCode());
@@ -223,6 +227,10 @@ public class WorkflowExecuteRunnableTest {
             taskInstanceMapField.setAccessible(true);
             taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
 
+            Field taskCodeInstanceMapField = 
masterExecThreadClass.getDeclaredField("taskCodeInstanceMap");
+            taskCodeInstanceMapField.setAccessible(true);
+            taskCodeInstanceMapField.set(workflowExecuteThread, 
taskCodeInstanceMap);
+
             workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
             Assertions.assertNotNull(taskInstance.getVarPool());
 
@@ -281,6 +289,10 @@ public class WorkflowExecuteRunnableTest {
         taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
         taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
 
+        Map<Long, TaskInstance> taskCodeInstanceMap = new 
ConcurrentHashMap<>();
+        taskCodeInstanceMap.put(taskInstance1.getTaskCode(), taskInstance1);
+        taskCodeInstanceMap.put(taskInstance2.getTaskCode(), taskInstance2);
+
         Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
         completeTaskSet.add(taskInstance1.getTaskCode());
         completeTaskSet.add(taskInstance2.getTaskCode());
@@ -295,6 +307,10 @@ public class WorkflowExecuteRunnableTest {
         taskInstanceMapField.setAccessible(true);
         taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
 
+        Field taskCodeInstanceMapField = 
masterExecThreadClass.getDeclaredField("taskCodeInstanceMap");
+        taskCodeInstanceMapField.setAccessible(true);
+        taskCodeInstanceMapField.set(workflowExecuteThread, 
taskCodeInstanceMap);
+
         
Mockito.when(processInstance.getCommandType()).thenReturn(CommandType.EXECUTE_TASK);
         Mockito.when(processInstance.getId()).thenReturn(123);
 

Reply via email to