This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.4-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.4-prepare by this push:
new 21a1d9c [Fix-8337] [Master] Process instance can not be kill when
task is failure and can be retry (#8347)
21a1d9c is described below
commit 21a1d9cd9e7216b9d0ae789331cfccdfcca7ef67
Author: xiangzihao <[email protected]>
AuthorDate: Sat Feb 19 16:35:03 2022 +0800
[Fix-8337] [Master] Process instance can not be kill when task is failure
and can be retry (#8347)
* fix bug_8337
* change kill logic
* change kill logic
---
.../master/runner/WorkflowExecuteThread.java | 59 ++++++++++++++++++++--
.../service/process/ProcessService.java | 2 +-
.../queue/PeerTaskInstancePriorityQueue.java | 8 +++
3 files changed, 63 insertions(+), 6 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 731063b..7527daa 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -379,7 +379,7 @@ public class WorkflowExecuteThread implements Runnable {
processInstance.getId(),
task.getId(),
task.getState());
- if (task.taskCanRetry()) {
+ if (task.taskCanRetry() && processInstance.getState() !=
ExecutionStatus.READY_STOP) {
addTaskToStandByList(task);
if (!task.retryTaskIntervalOverTime()) {
logger.info("failure task will be submitted: process id: {},
task instance id: {} state:{} retry times:{} / {}, interval:{}",
@@ -436,12 +436,20 @@ public class WorkflowExecuteThread implements Runnable {
try {
logger.info("process:{} state {} change to {}",
processInstance.getId(), processInstance.getState(),
stateEvent.getExecutionStatus());
processInstance =
processService.findProcessInstanceById(this.processInstance.getId());
+
+ if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
+ this.updateProcessInstanceState(stateEvent);
+ return true;
+ }
+
if (processComplementData()) {
return true;
}
+
if (stateEvent.getExecutionStatus().typeIsFinished()) {
endProcess();
}
+
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
killAllTasks();
}
@@ -1111,10 +1119,6 @@ public class WorkflowExecuteThread implements Runnable {
// active task and retry task exists
return runningState(state);
}
- // process failure
- if (processFailed()) {
- return ExecutionStatus.FAILURE;
- }
// waiting thread
if (hasWaitingThreadTask()) {
@@ -1130,8 +1134,10 @@ public class WorkflowExecuteThread implements Runnable {
if (state == ExecutionStatus.READY_STOP) {
List<TaskInstance> stopList =
getCompleteTaskByState(ExecutionStatus.STOP);
List<TaskInstance> killList =
getCompleteTaskByState(ExecutionStatus.KILL);
+ List<TaskInstance> failList =
getCompleteTaskByState(ExecutionStatus.FAILURE);
if (CollectionUtils.isNotEmpty(stopList)
|| CollectionUtils.isNotEmpty(killList)
+ || CollectionUtils.isNotEmpty(failList)
|| !isComplementEnd()) {
return ExecutionStatus.STOP;
} else {
@@ -1139,6 +1145,11 @@ public class WorkflowExecuteThread implements Runnable {
}
}
+ // process failure
+ if (processFailed()) {
+ return ExecutionStatus.FAILURE;
+ }
+
// success
if (state == ExecutionStatus.RUNNING_EXECUTION) {
List<TaskInstance> killTasks =
getCompleteTaskByState(ExecutionStatus.KILL);
@@ -1203,6 +1214,26 @@ public class WorkflowExecuteThread implements Runnable {
}
/**
+ * stateEvent's execution status as process instance state
+ */
+ private void updateProcessInstanceState(StateEvent stateEvent) {
+ ExecutionStatus state = stateEvent.getExecutionStatus();
+ if (processInstance.getState() != state) {
+ logger.info(
+ "work flow process instance [id: {}, name:{}], state
change from {} to {}, cmd type: {}",
+ processInstance.getId(), processInstance.getName(),
+ processInstance.getState(), state,
+ processInstance.getCommandType());
+
+ processInstance.setState(state);
+ if (state.typeIsFinished()) {
+ processInstance.setEndTime(new Date());
+ }
+ processService.updateProcessInstance(processInstance);
+ }
+ }
+
+ /**
* get task dependency result
*
* @param taskInstance task instance
@@ -1282,12 +1313,25 @@ public class WorkflowExecuteThread implements Runnable {
return false;
}
+ private void addProcessStopEvent(ProcessInstance processInstance) {
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
+ stateEvent.setProcessInstanceId(processInstance.getId());
+ stateEvent.setExecutionStatus(ExecutionStatus.STOP);
+ this.addStateEvent(stateEvent);
+ }
+
/**
* close the on going tasks
*/
private void killAllTasks() {
logger.info("kill called on process instance id: {}, num: {}",
processInstance.getId(),
activeTaskProcessorMaps.size());
+
+ if (readyToSubmitTaskQueue.size() > 0) {
+ readyToSubmitTaskQueue.clear();
+ }
+
for (int taskId : activeTaskProcessorMaps.keySet()) {
TaskInstance taskInstance =
processService.findTaskInstanceById(taskId);
if (taskInstance == null ||
taskInstance.getState().typeIsFinished()) {
@@ -1303,6 +1347,11 @@ public class WorkflowExecuteThread implements Runnable {
taskResponseService.addResponse(taskResponseEvent);
}
}
+
+ if (taskRetryCheckList.size() > 0) {
+ this.taskRetryCheckList.clear();
+ this.addProcessStopEvent(processInstance);
+ }
}
public boolean workFlowFinish() {
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index c40a796..67fdad3 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1060,7 +1060,7 @@ public class ProcessService {
}
Thread.sleep(commitInterval);
} catch (Exception e) {
- logger.error("task commit to mysql failed", e);
+ logger.error("task commit to db failed", e);
}
retryTimes += 1;
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index b558d42..7502607 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -105,6 +105,14 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
}
/**
+ * clear task
+ *
+ */
+ public void clear() {
+ queue.clear();
+ }
+
+ /**
* whether contains the task instance
*
* @param taskInstance task instance