This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new c812bf9d49 [Bug] [Mater] Not got latest task intance by task code
(#14529)
c812bf9d49 is described below
commit c812bf9d491de04c7ab2e3b58856a7c02b4a5729
Author: 旺阳 <[email protected]>
AuthorDate: Fri Jul 14 14:51:39 2023 +0800
[Bug] [Mater] Not got latest task intance by task code (#14529)
* add task code->instance map
---------
Co-authored-by: Eric Gao <[email protected]>
---
.../master/runner/WorkflowExecuteRunnable.java | 41 +++++++++++-----------
.../master/runner/WorkflowExecuteRunnableTest.java | 16 +++++++++
2 files changed, 37 insertions(+), 20 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index f7051dbabc..5caab6b709 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -180,6 +180,11 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
*/
private final Map<Integer, TaskInstance> taskInstanceMap = new
ConcurrentHashMap<>();
+ /**
+ * task instance hash map, taskCode as key
+ */
+ private final Map<Long, TaskInstance> taskCodeInstanceMap = new
ConcurrentHashMap<>();
+
/**
* TaskCode as Key, TaskExecuteRunnable as Value
*/
@@ -570,6 +575,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
}
processService.packageTaskInstance(taskInstance, processInstance);
taskInstanceMap.put(taskInstance.getId(), taskInstance);
+ taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance);
validTaskMap.remove(taskInstance.getTaskCode());
if (Flag.YES == taskInstance.getFlag()) {
@@ -617,15 +623,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
}
public Optional<TaskInstance> getTaskInstance(long taskCode) {
- if (taskInstanceMap.isEmpty()) {
- return Optional.empty();
- }
- for (TaskInstance taskInstance : taskInstanceMap.values()) {
- if (taskInstance.getTaskCode() == taskCode) {
- return Optional.of(taskInstance);
- }
- }
- return Optional.empty();
+ return Optional.ofNullable(taskCodeInstanceMap.get(taskCode));
}
public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long
taskCode) {
@@ -901,6 +899,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
validTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
+ taskCodeInstanceMap.put(task.getTaskCode(), task);
if (task.isTaskComplete()) {
log.info("TaskInstance is already complete.");
@@ -1012,6 +1011,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
validTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance);
+ taskCodeInstanceMap.put(taskInstance.getTaskCode(),
taskInstance);
taskExecuteRunnableMap.put(taskInstance.getTaskCode(),
taskExecuteRunnable);
// 3. acquire the task group.
// if we use task group, then need to acquire the task group
resource
@@ -1307,14 +1307,12 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
completeTaskSet.forEach(taskCode -> {
Optional<TaskInstance> existTaskInstanceOptional =
getTaskInstance(taskCode);
if (existTaskInstanceOptional.isPresent()) {
- TaskInstance taskInstance =
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
- if (taskInstance == null) {
- // This case will happen when we submit to db failed, then
the taskInstanceId is 0
- log.warn("Cannot find the taskInstance from
taskInstanceMap, taskConde: {}",
- taskCode);
- } else {
- completeTaskInstanceMap.put(Long.toString(taskCode),
taskInstance);
- }
+ TaskInstance taskInstance = existTaskInstanceOptional.get();
+ completeTaskInstanceMap.put(Long.toString(taskCode),
taskInstance);
+ } else {
+ // This case will happen when we submit to db failed, then the
taskInstanceId is 0
+ log.warn("Cannot find the taskInstance from taskInstanceMap,
taskConde: {}",
+ taskCode);
}
});
@@ -1436,6 +1434,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
defaultTaskExecuteRunnableFactory.createTaskExecuteRunnable(taskInstance));
taskInstanceMap.put(taskInstance.getId(), taskInstance);
+ taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance);
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance,
taskInstance);
stateWheelExecuteThread.addTask4RetryCheck(processInstance,
taskInstance);
return true;
@@ -1552,7 +1551,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
return false;
}
- TaskExecutionStatus depTaskState =
taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState();
+ TaskExecutionStatus depTaskState =
existTaskInstanceOptional.get().getState();
return !depTaskState.isFailure();
}
@@ -1568,8 +1567,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
completeTaskSet.forEach(taskCode -> {
Optional<TaskInstance> existTaskInstanceOptional =
getTaskInstance(taskCode);
if (existTaskInstanceOptional.isPresent()) {
- TaskInstance taskInstance =
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
- if (taskInstance != null && taskInstance.getState() == state) {
+ TaskInstance taskInstance = existTaskInstanceOptional.get();
+ if (taskInstance.getState() == state) {
resultList.add(taskInstance);
}
}
@@ -1928,6 +1927,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
removeTaskFromStandbyList(task);
completeTaskSet.add(task.getTaskCode());
taskInstanceMap.put(task.getId(), task);
+ taskCodeInstanceMap.put(task.getTaskCode(), task);
submitPostNode(Long.toString(task.getTaskCode()));
continue;
}
@@ -1954,6 +1954,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
}
completeTaskSet.add(task.getTaskCode());
taskInstanceMap.put(task.getId(), task);
+ taskCodeInstanceMap.put(task.getTaskCode(), task);
errorTaskMap.put(task.getTaskCode(), task.getId());
taskExecuteRunnableMap.remove(task.getTaskCode());
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 26b637fd1c..332a2617d7 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -209,6 +209,10 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
+ Map<Long, TaskInstance> taskCodeInstanceMap = new
ConcurrentHashMap<>();
+ taskCodeInstanceMap.put(taskInstance1.getTaskCode(),
taskInstance1);
+ taskCodeInstanceMap.put(taskInstance2.getTaskCode(),
taskInstance2);
+
Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
completeTaskSet.add(taskInstance1.getTaskCode());
completeTaskSet.add(taskInstance2.getTaskCode());
@@ -223,6 +227,10 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMapField.setAccessible(true);
taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
+ Field taskCodeInstanceMapField =
masterExecThreadClass.getDeclaredField("taskCodeInstanceMap");
+ taskCodeInstanceMapField.setAccessible(true);
+ taskCodeInstanceMapField.set(workflowExecuteThread,
taskCodeInstanceMap);
+
workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
Assertions.assertNotNull(taskInstance.getVarPool());
@@ -281,6 +289,10 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
+ Map<Long, TaskInstance> taskCodeInstanceMap = new
ConcurrentHashMap<>();
+ taskCodeInstanceMap.put(taskInstance1.getTaskCode(), taskInstance1);
+ taskCodeInstanceMap.put(taskInstance2.getTaskCode(), taskInstance2);
+
Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
completeTaskSet.add(taskInstance1.getTaskCode());
completeTaskSet.add(taskInstance2.getTaskCode());
@@ -295,6 +307,10 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMapField.setAccessible(true);
taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
+ Field taskCodeInstanceMapField =
masterExecThreadClass.getDeclaredField("taskCodeInstanceMap");
+ taskCodeInstanceMapField.setAccessible(true);
+ taskCodeInstanceMapField.set(workflowExecuteThread,
taskCodeInstanceMap);
+
Mockito.when(processInstance.getCommandType()).thenReturn(CommandType.EXECUTE_TASK);
Mockito.when(processInstance.getId()).thenReturn(123);