This is an automated email from the ASF dual-hosted git repository.
xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 465e7ae6ee [Bug] [Master] Workflow keep running when task has no id
(#14315)
465e7ae6ee is described below
commit 465e7ae6ee44d50f6f319ddfaab82bdf77fec0e5
Author: 旺阳 <[email protected]>
AuthorDate: Wed Jun 14 12:53:16 2023 +0800
[Bug] [Master] Workflow keep running when task has no id (#14315)
* change completeTaskMap to completeTaskSet
* Update
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
Co-authored-by: Wenjun Ruan <[email protected]>
* change method name
* fix ut
* fix spotless
---------
Co-authored-by: Wenjun Ruan <[email protected]>
---
.../server/master/event/TaskStateEventHandler.java | 7 +-
.../master/runner/WorkflowExecuteRunnable.java | 142 +++++++++++++--------
.../master/runner/WorkflowExecuteRunnableTest.java | 32 ++---
3 files changed, 106 insertions(+), 75 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
index 524d0d4a6d..cf9fc262ad 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
@@ -23,8 +23,8 @@ import
org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import lombok.extern.slf4j.Slf4j;
@@ -55,7 +55,7 @@ public class TaskStateEventHandler implements
StateEventHandler {
"Handle task instance state event, the current task instance
state {} will be changed to {}",
task.getState().name(), taskStateEvent.getStatus().name());
- Map<Long, Integer> completeTaskMap =
workflowExecuteRunnable.getCompleteTaskMap();
+ Set<Long> completeTaskSet =
workflowExecuteRunnable.getCompleteTaskCodes();
if (task.getState().isFinished()
&& (taskStateEvent.getStatus() != null &&
taskStateEvent.getStatus().isRunning())) {
String errorMessage = String.format(
@@ -67,8 +67,7 @@ public class TaskStateEventHandler implements
StateEventHandler {
}
if (task.getState().isFinished()) {
- if (completeTaskMap.containsKey(task.getTaskCode())
- &&
completeTaskMap.get(task.getTaskCode()).equals(task.getId())) {
+ if (completeTaskSet.contains(task.getTaskCode())) {
log.warn("The task instance is already complete, stateEvent:
{}", stateEvent);
return true;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 1564adbc14..88a1937b59 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -199,10 +199,10 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
private final Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
/**
- * complete task map, taskCode as key, taskInstanceId as value
+ * complete task set
* in a DAG, only one taskInstance per taskCode is valid
*/
- private final Map<Long, Integer> completeTaskMap = new
ConcurrentHashMap<>();
+ private final Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
/**
* depend failed task set
@@ -444,7 +444,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
stateWheelExecuteThread.removeTask4RetryCheck(processInstance,
taskInstance);
if (taskInstance.getState().isSuccess()) {
- completeTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
+ completeTaskSet.add(taskInstance.getTaskCode());
mergeTaskInstanceVarPool(taskInstance);
processInstanceDao.upsertProcessInstance(processInstance);
// save the cacheKey only if the task is defined as cache task
and the task is success
@@ -459,7 +459,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
log.info("Retry taskInstance taskInstance state: {}",
taskInstance.getState());
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().isFailure()) {
- completeTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
+ completeTaskSet.add(taskInstance.getTaskCode());
// There are child nodes and the failure policy is: CONTINUE
if (processInstance.getFailureStrategy() ==
FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
Long.toString(taskInstance.getTaskCode()),
@@ -472,8 +472,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
}
}
} else if (taskInstance.getState().isFinished()) {
- // todo: when the task instance type is pause, then it should
not in completeTaskMap
- completeTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
+ // todo: when the task instance type is pause, then it should
not in completeTaskSet
+ completeTaskSet.add(taskInstance.getTaskCode());
}
log.info("TaskInstance finished will try to update the workflow
instance state, task code:{} state:{}",
taskInstance.getTaskCode(),
@@ -482,9 +482,9 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
sendTaskLogOnMasterToRemoteIfNeeded(taskInstance.getLogPath(),
taskInstance.getHost());
} catch (Exception ex) {
- log.error("Task finish failed, get a exception, will remove this
taskInstance from completeTaskMap", ex);
+ log.error("Task finish failed, get a exception, will remove this
taskInstance from completeTaskSet", ex);
// remove the task from complete map, so that we can finish in the
next time.
- completeTaskMap.remove(taskInstance.getTaskCode());
+ completeTaskSet.remove(taskInstance.getTaskCode());
throw ex;
}
}
@@ -873,7 +873,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
// do we need to clear?
taskExecuteRunnableMap.clear();
dependFailedTaskSet.clear();
- completeTaskMap.clear();
+ completeTaskSet.clear();
errorTaskMap.clear();
if (!isNewProcessInstance()) {
@@ -909,7 +909,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
if (task.isTaskComplete()) {
log.info("TaskInstance is already complete.");
- completeTaskMap.put(task.getTaskCode(), task.getId());
+ completeTaskSet.add(task.getTaskCode());
continue;
}
if (task.isConditionsTask() ||
DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()),
@@ -980,9 +980,9 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
}
}
}
- log.info("Initialize task queue, dependFailedTaskSet: {},
completeTaskMap: {}, errorTaskMap: {}",
+ log.info("Initialize task queue, dependFailedTaskSet: {},
completeTaskSet: {}, errorTaskMap: {}",
dependFailedTaskSet,
- completeTaskMap,
+ completeTaskSet,
errorTaskMap);
}
@@ -1236,7 +1236,12 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
Map<String, TaskInstance> allTaskInstance = new HashMap<>();
if (CollectionUtils.isNotEmpty(preTask)) {
for (String preTaskCode : preTask) {
- Integer taskId =
completeTaskMap.get(Long.parseLong(preTaskCode));
+ Optional<TaskInstance> existTaskInstanceOptional =
getTaskInstance(Long.parseLong(preTaskCode));
+ if (!existTaskInstanceOptional.isPresent()) {
+ continue;
+ }
+
+ Integer taskId = existTaskInstanceOptional.get().getId();
if (taskId == null) {
continue;
}
@@ -1303,20 +1308,21 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
*/
private Map<String, TaskInstance> getCompleteTaskInstanceMap() {
Map<String, TaskInstance> completeTaskInstanceMap = new HashMap<>();
- for (Map.Entry<Long, Integer> entry : completeTaskMap.entrySet()) {
- Long taskConde = entry.getKey();
- Integer taskInstanceId = entry.getValue();
- TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
- if (taskInstance == null) {
- log.warn("Cannot find the taskInstance from taskInstanceMap,
taskInstanceId: {}, taskConde: {}",
- taskInstanceId,
- taskConde);
- // This case will happen when we submit to db failed, then the
taskInstanceId is 0
- continue;
+
+ completeTaskSet.forEach(taskCode -> {
+ Optional<TaskInstance> existTaskInstanceOptional =
getTaskInstance(taskCode);
+ if (existTaskInstanceOptional.isPresent()) {
+ TaskInstance taskInstance =
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
+ if (taskInstance == null) {
+ // This case will happen when we submit to db failed, then
the taskInstanceId is 0
+ log.warn("Cannot find the taskInstance from
taskInstanceMap, taskConde: {}",
+ taskCode);
+ } else {
+ completeTaskInstanceMap.put(Long.toString(taskCode),
taskInstance);
+ }
}
-
completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()),
taskInstance);
+ });
- }
return completeTaskInstanceMap;
}
@@ -1364,17 +1370,21 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
}
// the end node of the branch of the dag
if (StringUtils.isNotEmpty(parentNodeCode) &&
dag.getEndNode().contains(parentNodeCode)) {
- TaskInstance endTaskInstance =
taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode)));
- String taskInstanceVarPool = endTaskInstance.getVarPool();
- if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
- Set<Property> taskProperties = new
HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
- String processInstanceVarPool = processInstance.getVarPool();
- if (StringUtils.isNotEmpty(processInstanceVarPool)) {
- Set<Property> properties = new
HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
- properties.addAll(taskProperties);
-
processInstance.setVarPool(JSONUtils.toJsonString(properties));
- } else {
-
processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
+ Optional<TaskInstance> existTaskInstanceOptional =
getTaskInstance(NumberUtils.toLong(parentNodeCode));
+ if (existTaskInstanceOptional.isPresent()) {
+ TaskInstance endTaskInstance =
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
+ String taskInstanceVarPool = endTaskInstance.getVarPool();
+ if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
+ Set<Property> taskProperties = new
HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
+ String processInstanceVarPool =
processInstance.getVarPool();
+ if (StringUtils.isNotEmpty(processInstanceVarPool)) {
+ Set<Property> properties =
+ new
HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
+ properties.addAll(taskProperties);
+
processInstance.setVarPool(JSONUtils.toJsonString(properties));
+ } else {
+
processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
+ }
}
}
}
@@ -1387,7 +1397,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
continue;
}
- if (task.getId() != null &&
completeTaskMap.containsKey(task.getTaskCode())) {
+ if (task.getId() != null &&
completeTaskSet.contains(task.getTaskCode())) {
log.info("Task has already run success, taskName: {}",
task.getName());
continue;
}
@@ -1459,12 +1469,18 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
for (String depsNode : indirectDepCodeList) {
if (dag.containsNode(depsNode) &&
!skipTaskNodeMap.containsKey(depsNode)) {
// dependencies must be fully completed
- Long despNodeTaskCode = Long.parseLong(depsNode);
- if (!completeTaskMap.containsKey(despNodeTaskCode)) {
+ long despNodeTaskCode = Long.parseLong(depsNode);
+ if (!completeTaskSet.contains(despNodeTaskCode)) {
return DependResult.WAITING;
}
- Integer depsTaskId = completeTaskMap.get(despNodeTaskCode);
- TaskExecutionStatus depTaskState =
taskInstanceMap.get(depsTaskId).getState();
+
+ Optional<TaskInstance> existTaskInstanceOptional =
getTaskInstance(despNodeTaskCode);
+ if (!existTaskInstanceOptional.isPresent()) {
+ return DependResult.NON_EXEC;
+ }
+
+ TaskExecutionStatus depTaskState =
+
taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState();
if (depTaskState.isKill()) {
return DependResult.NON_EXEC;
}
@@ -1484,7 +1500,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
}
}
log.info("The dependTasks of task all success, currentTaskCode: {},
dependTaskCodes: {}",
- taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
+ taskCode, Arrays.toString(completeTaskSet.toArray()));
return DependResult.SUCCESS;
}
@@ -1535,8 +1551,12 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
.contains(nextNodeCode);
}
long taskCode = Long.parseLong(dependNodeCode);
- Integer taskInstanceId = completeTaskMap.get(taskCode);
- TaskExecutionStatus depTaskState =
taskInstanceMap.get(taskInstanceId).getState();
+ Optional<TaskInstance> existTaskInstanceOptional =
getTaskInstance(taskCode);
+ if (!existTaskInstanceOptional.isPresent()) {
+ return false;
+ }
+
+ TaskExecutionStatus depTaskState =
taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState();
return !depTaskState.isFailure();
}
@@ -1548,12 +1568,17 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
*/
private List<TaskInstance> getCompleteTaskByState(TaskExecutionStatus
state) {
List<TaskInstance> resultList = new ArrayList<>();
- for (Integer taskInstanceId : completeTaskMap.values()) {
- TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
- if (taskInstance != null && taskInstance.getState() == state) {
- resultList.add(taskInstance);
+
+ completeTaskSet.forEach(taskCode -> {
+ Optional<TaskInstance> existTaskInstanceOptional =
getTaskInstance(taskCode);
+ if (existTaskInstanceOptional.isPresent()) {
+ TaskInstance taskInstance =
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
+ if (taskInstance != null && taskInstance.getState() == state) {
+ resultList.add(taskInstance);
+ }
}
- }
+ });
+
return resultList;
}
@@ -1905,7 +1930,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
"Task {} has been forced success, put it into
complete task list and stop retrying, taskInstanceId: {}",
task.getName(), task.getId());
removeTaskFromStandbyList(task);
- completeTaskMap.put(task.getTaskCode(), task.getId());
+ completeTaskSet.add(task.getTaskCode());
taskInstanceMap.put(task.getId(), task);
submitPostNode(Long.toString(task.getTaskCode()));
continue;
@@ -1931,7 +1956,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
processInstance.getId(),
task.getTaskCode());
}
- completeTaskMap.put(task.getTaskCode(), task.getId());
+ completeTaskSet.add(task.getTaskCode());
taskInstanceMap.put(task.getId(), task);
errorTaskMap.put(task.getTaskCode(), task.getId());
@@ -2066,8 +2091,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
return false;
}
- public Map<Long, Integer> getCompleteTaskMap() {
- return completeTaskMap;
+ public Set<Long> getCompleteTaskCodes() {
+ return completeTaskSet;
}
public Map<Long, DefaultTaskExecuteRunnable> getTaskExecuteRunnableMap() {
@@ -2117,7 +2142,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
* 1. find all task code from sub dag (only contains related task)
* 2. set the flag of tasks to Flag.NO
* 3. clear varPool data from re-execute task instance in process instance
- * 4. remove related task instance from taskInstanceMap, completeTaskMap,
validTaskMap, errorTaskMap
+ * 4. remove related task instance from taskInstanceMap, completeTaskSet,
validTaskMap, errorTaskMap
*
* @return task instance
*/
@@ -2176,9 +2201,14 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
processInstance.setVarPool(JSONUtils.toJsonString(processProperties));
processInstanceDao.updateById(processInstance);
- // remove task instance from taskInstanceMap, completeTaskMap,
validTaskMap, errorTaskMap
+ // remove task instance from taskInstanceMap, completeTaskSet,
validTaskMap, errorTaskMap
+ // completeTaskSet remove dependency taskInstanceMap, so the sort
can't change
+ completeTaskSet.removeIf(set -> {
+ Optional<TaskInstance> existTaskInstanceOptional =
getTaskInstance(set);
+ return existTaskInstanceOptional
+ .filter(taskInstance ->
dag.containsNode(Integer.toString(taskInstance.getId()))).isPresent();
+ });
taskInstanceMap.entrySet().removeIf(map ->
dag.containsNode(Long.toString(map.getValue().getTaskCode())));
- completeTaskMap.entrySet().removeIf(map ->
dag.containsNode(Long.toString(map.getKey())));
validTaskMap.entrySet().removeIf(map ->
dag.containsNode(Long.toString(map.getKey())));
errorTaskMap.entrySet().removeIf(map ->
dag.containsNode(Long.toString(map.getKey())));
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index f224ff36c6..26b637fd1c 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -69,6 +69,8 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.springframework.context.ApplicationContext;
+import com.google.common.collect.Sets;
+
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class WorkflowExecuteRunnableTest {
@@ -207,15 +209,15 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
- Map<Long, Integer> completeTaskList = new ConcurrentHashMap<>();
- completeTaskList.put(taskInstance1.getTaskCode(),
taskInstance1.getId());
- completeTaskList.put(taskInstance2.getTaskCode(),
taskInstance2.getId());
+ Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
+ completeTaskSet.add(taskInstance1.getTaskCode());
+ completeTaskSet.add(taskInstance2.getTaskCode());
Class<WorkflowExecuteRunnable> masterExecThreadClass =
WorkflowExecuteRunnable.class;
- Field completeTaskMapField =
masterExecThreadClass.getDeclaredField("completeTaskMap");
- completeTaskMapField.setAccessible(true);
- completeTaskMapField.set(workflowExecuteThread, completeTaskList);
+ Field completeTaskSetField =
masterExecThreadClass.getDeclaredField("completeTaskSet");
+ completeTaskSetField.setAccessible(true);
+ completeTaskSetField.set(workflowExecuteThread, completeTaskSet);
Field taskInstanceMapField =
masterExecThreadClass.getDeclaredField("taskInstanceMap");
taskInstanceMapField.setAccessible(true);
@@ -225,10 +227,10 @@ public class WorkflowExecuteRunnableTest {
Assertions.assertNotNull(taskInstance.getVarPool());
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
- completeTaskList.put(taskInstance2.getTaskCode(),
taskInstance2.getId());
+ completeTaskSet.add(taskInstance2.getTaskCode());
- completeTaskMapField.setAccessible(true);
- completeTaskMapField.set(workflowExecuteThread, completeTaskList);
+ completeTaskSetField.setAccessible(true);
+ completeTaskSetField.set(workflowExecuteThread, completeTaskSet);
taskInstanceMapField.setAccessible(true);
taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
@@ -279,15 +281,15 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
- Map<Long, Integer> completeTaskList = new ConcurrentHashMap<>();
- completeTaskList.put(taskInstance1.getTaskCode(),
taskInstance1.getId());
- completeTaskList.put(taskInstance2.getTaskCode(),
taskInstance2.getId());
+ Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
+ completeTaskSet.add(taskInstance1.getTaskCode());
+ completeTaskSet.add(taskInstance2.getTaskCode());
Class<WorkflowExecuteRunnable> masterExecThreadClass =
WorkflowExecuteRunnable.class;
- Field completeTaskMapField =
masterExecThreadClass.getDeclaredField("completeTaskMap");
+ Field completeTaskMapField =
masterExecThreadClass.getDeclaredField("completeTaskSet");
completeTaskMapField.setAccessible(true);
- completeTaskMapField.set(workflowExecuteThread, completeTaskList);
+ completeTaskMapField.set(workflowExecuteThread, completeTaskSet);
Field taskInstanceMapField =
masterExecThreadClass.getDeclaredField("taskInstanceMap");
taskInstanceMapField.setAccessible(true);
@@ -318,7 +320,7 @@ public class WorkflowExecuteRunnableTest {
workflowExecuteThread.clearDataIfExecuteTask();
Assertions.assertEquals(1, taskInstanceMap.size());
- Assertions.assertEquals(1, completeTaskList.size());
+ Assertions.assertEquals(1, completeTaskSet.size());
}