This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.4-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.4-prepare by this push:
     new 21a1d9c  [Fix-8337] [Master] Process instance can not be kill when 
task is failure and can be retry (#8347)
21a1d9c is described below

commit 21a1d9cd9e7216b9d0ae789331cfccdfcca7ef67
Author: xiangzihao <[email protected]>
AuthorDate: Sat Feb 19 16:35:03 2022 +0800

    [Fix-8337] [Master] Process instance can not be kill when task is failure 
and can be retry (#8347)
    
    * fix bug_8337
    
    * change kill logic
    
    * change kill logic
---
 .../master/runner/WorkflowExecuteThread.java       | 59 ++++++++++++++++++++--
 .../service/process/ProcessService.java            |  2 +-
 .../queue/PeerTaskInstancePriorityQueue.java       |  8 +++
 3 files changed, 63 insertions(+), 6 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 731063b..7527daa 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -379,7 +379,7 @@ public class WorkflowExecuteThread implements Runnable {
                 processInstance.getId(),
                 task.getId(),
                 task.getState());
-        if (task.taskCanRetry()) {
+        if (task.taskCanRetry() && processInstance.getState() != 
ExecutionStatus.READY_STOP) {
             addTaskToStandByList(task);
             if (!task.retryTaskIntervalOverTime()) {
                 logger.info("failure task will be submitted: process id: {}, 
task instance id: {} state:{} retry times:{} / {}, interval:{}",
@@ -436,12 +436,20 @@ public class WorkflowExecuteThread implements Runnable {
         try {
             logger.info("process:{} state {} change to {}", 
processInstance.getId(), processInstance.getState(), 
stateEvent.getExecutionStatus());
             processInstance = 
processService.findProcessInstanceById(this.processInstance.getId());
+
+            if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
+                this.updateProcessInstanceState(stateEvent);
+                return true;
+            }
+
             if (processComplementData()) {
                 return true;
             }
+
             if (stateEvent.getExecutionStatus().typeIsFinished()) {
                 endProcess();
             }
+
             if (processInstance.getState() == ExecutionStatus.READY_STOP) {
                 killAllTasks();
             }
@@ -1111,10 +1119,6 @@ public class WorkflowExecuteThread implements Runnable {
             // active task and retry task exists
             return runningState(state);
         }
-        // process failure
-        if (processFailed()) {
-            return ExecutionStatus.FAILURE;
-        }
 
         // waiting thread
         if (hasWaitingThreadTask()) {
@@ -1130,8 +1134,10 @@ public class WorkflowExecuteThread implements Runnable {
         if (state == ExecutionStatus.READY_STOP) {
             List<TaskInstance> stopList = 
getCompleteTaskByState(ExecutionStatus.STOP);
             List<TaskInstance> killList = 
getCompleteTaskByState(ExecutionStatus.KILL);
+            List<TaskInstance> failList = 
getCompleteTaskByState(ExecutionStatus.FAILURE);
             if (CollectionUtils.isNotEmpty(stopList)
                     || CollectionUtils.isNotEmpty(killList)
+                    || CollectionUtils.isNotEmpty(failList)
                     || !isComplementEnd()) {
                 return ExecutionStatus.STOP;
             } else {
@@ -1139,6 +1145,11 @@ public class WorkflowExecuteThread implements Runnable {
             }
         }
 
+        // process failure
+        if (processFailed()) {
+            return ExecutionStatus.FAILURE;
+        }
+
         // success
         if (state == ExecutionStatus.RUNNING_EXECUTION) {
             List<TaskInstance> killTasks = 
getCompleteTaskByState(ExecutionStatus.KILL);
@@ -1203,6 +1214,26 @@ public class WorkflowExecuteThread implements Runnable {
     }
 
     /**
+     * stateEvent's execution status as process instance state
+     */
+    private void updateProcessInstanceState(StateEvent stateEvent) {
+        ExecutionStatus state = stateEvent.getExecutionStatus();
+        if (processInstance.getState() != state) {
+            logger.info(
+                    "work flow process instance [id: {}, name:{}], state 
change from {} to {}, cmd type: {}",
+                    processInstance.getId(), processInstance.getName(),
+                    processInstance.getState(), state,
+                    processInstance.getCommandType());
+
+            processInstance.setState(state);
+            if (state.typeIsFinished()) {
+                processInstance.setEndTime(new Date());
+            }
+            processService.updateProcessInstance(processInstance);
+        }
+    }
+
+    /**
      * get task dependency result
      *
      * @param taskInstance task instance
@@ -1282,12 +1313,25 @@ public class WorkflowExecuteThread implements Runnable {
         return false;
     }
 
+    private void addProcessStopEvent(ProcessInstance processInstance) {
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
+        stateEvent.setProcessInstanceId(processInstance.getId());
+        stateEvent.setExecutionStatus(ExecutionStatus.STOP);
+        this.addStateEvent(stateEvent);
+    }
+
     /**
      * close the on going tasks
      */
     private void killAllTasks() {
         logger.info("kill called on process instance id: {}, num: {}", 
processInstance.getId(),
                 activeTaskProcessorMaps.size());
+
+        if (readyToSubmitTaskQueue.size() > 0) {
+            readyToSubmitTaskQueue.clear();
+        }
+
         for (int taskId : activeTaskProcessorMaps.keySet()) {
             TaskInstance taskInstance = 
processService.findTaskInstanceById(taskId);
             if (taskInstance == null || 
taskInstance.getState().typeIsFinished()) {
@@ -1303,6 +1347,11 @@ public class WorkflowExecuteThread implements Runnable {
                 taskResponseService.addResponse(taskResponseEvent);
             }
         }
+
+        if (taskRetryCheckList.size() > 0) {
+            this.taskRetryCheckList.clear();
+            this.addProcessStopEvent(processInstance);
+        }
     }
 
     public boolean workFlowFinish() {
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index c40a796..67fdad3 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1060,7 +1060,7 @@ public class ProcessService {
                 }
                 Thread.sleep(commitInterval);
             } catch (Exception e) {
-                logger.error("task commit to mysql failed", e);
+                logger.error("task commit to db failed", e);
             }
             retryTimes += 1;
         }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index b558d42..7502607 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -105,6 +105,14 @@ public class PeerTaskInstancePriorityQueue implements 
TaskPriorityQueue<TaskInst
     }
 
     /**
+     * clear task
+     *
+     */
+    public void clear() {
+        queue.clear();
+    }
+
+    /**
      * whether contains the task instance
      *
      * @param taskInstance task instance

Reply via email to