This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new d810c487fd Fix taskGroupQueue doesn't removed from inQueue where 
wakeup failed (#14200)
d810c487fd is described below

commit d810c487fd8b015faefb72dca92ed41fb719b60d
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu May 25 14:42:32 2023 +0800

    Fix taskGroupQueue doesn't removed from inQueue where wakeup failed (#14200)
    
    * Fix taskGroupQueue doesn't removed from inQueue where wakeup failed
    
    * Fix avaliable taskGroup is 0 the TaskGroupQueue doesn't out queue
---
 .../master/runner/WorkflowExecuteRunnable.java     | 43 ++++++++++++--------
 .../runner/operator/TaskTimeoutOperator.java       | 35 +++++++++++++++-
 .../service/process/ProcessServiceImpl.java        | 46 +++++++++++++---------
 3 files changed, 87 insertions(+), 37 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 7f77a5c99f..c7b7a37bac 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -496,24 +496,33 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
      *
      */
     public void releaseTaskGroup(TaskInstance taskInstance) throws 
RemotingException, InterruptedException {
-        if (taskInstance.getTaskGroupId() > 0) {
-            TaskInstance nextTaskInstance = 
this.processService.releaseTaskGroup(taskInstance);
-            if (nextTaskInstance != null) {
-                if (nextTaskInstance.getProcessInstanceId() == 
taskInstance.getProcessInstanceId()) {
-                    TaskStateEvent nextEvent = TaskStateEvent.builder()
-                            .processInstanceId(processInstance.getId())
-                            .taskInstanceId(nextTaskInstance.getId())
-                            .type(StateEventType.WAKE_UP_TASK_GROUP)
-                            .build();
-                    this.stateEvents.add(nextEvent);
-                } else {
-                    ProcessInstance processInstance =
-                            
this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
-                    
this.masterRpcClient.sendSyncCommand(Host.of(processInstance.getHost()),
-                            new TaskWakeupRequest(processInstance.getId(), 
nextTaskInstance.getId()).convert2Command());
-                }
-            }
+        // todo: use Integer
+        if (taskInstance.getTaskGroupId() <= 0) {
+            log.info("The current TaskInstance: {} doesn't use taskGroup, no 
need to release taskGroup",
+                    taskInstance.getName());
+        }
+        TaskInstance nextTaskInstance = 
processService.releaseTaskGroup(taskInstance);
+        if (nextTaskInstance == null) {
+            log.info(
+                    "The current TaskInstance: {} is the last taskInstance in 
the taskGroup, no need to wakeup next taskInstance",
+                    taskInstance.getName());
+            return;
+        }
+        if (nextTaskInstance.getProcessInstanceId() == 
taskInstance.getProcessInstanceId()) {
+            TaskStateEvent nextEvent = TaskStateEvent.builder()
+                    .processInstanceId(processInstance.getId())
+                    .taskInstanceId(nextTaskInstance.getId())
+                    .type(StateEventType.WAKE_UP_TASK_GROUP)
+                    .build();
+            stateEvents.add(nextEvent);
+        } else {
+            ProcessInstance processInstance =
+                    
processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
+            masterRpcClient.sendSyncCommand(
+                    Host.of(processInstance.getHost()),
+                    new TaskWakeupRequest(processInstance.getId(), 
nextTaskInstance.getId()).convert2Command());
         }
+        log.info("Success send wakeup message to next taskInstance: {}", 
nextTaskInstance.getId());
     }
 
     /**
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
index 17de1e9539..146c3b894d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
@@ -19,9 +19,18 @@ package 
org.apache.dolphinscheduler.server.master.runner.operator;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.remote.command.task.TaskKillRequest;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Date;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -34,6 +43,9 @@ public class TaskTimeoutOperator implements TaskOperator {
     @Autowired
     private TaskInstanceDao taskInstanceDao;
 
+    @Autowired
+    private MasterRpcClient masterRpcClient;
+
     @Override
     public void handle(DefaultTaskExecuteRunnable taskExecuteRunnable) {
         // Right now, if the task is running in worker, the timeout strategy 
will be handled at worker side.
@@ -48,8 +60,27 @@ public class TaskTimeoutOperator implements TaskOperator {
                     taskInstance.getName(), taskTimeoutStrategy.name());
             return;
         }
-        taskExecuteRunnable.kill();
-        log.info("TaskInstance: {} timeout, killed the task instance", 
taskInstance.getName());
+        try {
+            timeoutTaskInstanceInDB(taskInstance);
+            killRemoteTaskInstanceInThreadPool(taskInstance);
+            log.info("TaskInstance: {} timeout, killed the task instance", 
taskInstance.getName());
+        } catch (Exception ex) {
+            log.error("TaskInstance timeout {} failed", 
taskInstance.getName(), ex);
+        }
 
     }
+
+    private void timeoutTaskInstanceInDB(TaskInstance taskInstance) {
+        taskInstance.setState(TaskExecutionStatus.FAILURE);
+        taskInstance.setEndTime(new Date());
+        taskInstanceDao.updateTaskInstance(taskInstance);
+    }
+
+    private void killRemoteTaskInstanceInThreadPool(TaskInstance taskInstance) 
throws RemotingException {
+        if (StringUtils.isEmpty(taskInstance.getHost())) {
+            return;
+        }
+        TaskKillRequest killCommand = new 
TaskKillRequest(taskInstance.getId());
+        masterRpcClient.send(Host.of(taskInstance.getHost()), 
killCommand.convert2Command());
+    }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index d0f56f329f..485e357509 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -2334,14 +2334,17 @@ public class ProcessServiceImpl implements 
ProcessService {
         TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupId);
         if (taskGroup == null) {
             // we don't throw exception here, to avoid the task group has been 
deleted during workflow running
+            log.warn("The taskGroup is not exist no need to acquire taskGroup, 
taskGroupId: {}", taskGroupId);
             return true;
         }
         // if task group is not applicable
         if (taskGroup.getStatus() == Flag.NO.getCode()) {
+            log.warn("The taskGroup status is {}, no need to acquire 
taskGroup, taskGroupId: {}", taskGroup.getStatus(),
+                    taskGroupId);
             return true;
         }
         // Create a waiting taskGroupQueue, after acquire resource, we can 
update the status to ACQUIRE_SUCCESS
-        TaskGroupQueue taskGroupQueue = 
this.taskGroupQueueMapper.queryByTaskId(taskInstanceId);
+        TaskGroupQueue taskGroupQueue = 
taskGroupQueueMapper.queryByTaskId(taskInstanceId);
         if (taskGroupQueue == null) {
             taskGroupQueue = insertIntoTaskGroupQueue(
                     taskInstanceId,
@@ -2350,14 +2353,12 @@ public class ProcessServiceImpl implements 
ProcessService {
                     workflowInstanceId,
                     taskGroupPriority,
                     TaskGroupQueueStatus.WAIT_QUEUE);
+            log.info("Insert TaskGroupQueue: {} successfully", 
taskGroupQueue.getId());
         } else {
             log.info("The task queue is already exist, taskId: {}", 
taskInstanceId);
             if (taskGroupQueue.getStatus() == 
TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
                 return true;
             }
-            taskGroupQueue.setInQueue(Flag.NO.getCode());
-            taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
-            this.taskGroupQueueMapper.updateById(taskGroupQueue);
         }
         // check if there already exist higher priority tasks
         List<TaskGroupQueue> highPriorityTasks = 
taskGroupQueueMapper.queryHighPriorityTasks(
@@ -2368,14 +2369,15 @@ public class ProcessServiceImpl implements 
ProcessService {
             return false;
         }
         // try to get taskGroup
-        int count = taskGroupMapper.selectAvailableCountById(taskGroupId);
-        if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
-            log.info("Success acquire taskGroup, taskInstanceId: {}, 
taskGroupId: {}", taskInstanceId, taskGroupId);
-            return true;
+        int availableTaskGroupCount = 
taskGroupMapper.selectAvailableCountById(taskGroupId);
+        if (availableTaskGroupCount < 1) {
+            log.info(
+                    "Failed to acquire taskGroup, there is no avaliable 
taskGroup, taskInstanceId: {}, taskGroupId: {}",
+                    taskInstanceId, taskGroupId);
+            taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), 
taskGroupQueue.getId());
+            return false;
         }
-        log.info("Failed to acquire taskGroup, taskInstanceId: {}, 
taskGroupId: {}", taskInstanceId, taskGroupId);
-        this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), 
taskGroupQueue.getId());
-        return false;
+        return robTaskGroupResource(taskGroupQueue);
     }
 
     /**
@@ -2387,10 +2389,13 @@ public class ProcessServiceImpl implements 
ProcessService {
         for (int i = 0; i < 10; i++) {
             TaskGroup taskGroup = 
taskGroupMapper.selectById(taskGroupQueue.getGroupId());
             if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) {
+                // remove
+                taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), 
taskGroupQueue.getId());
                 log.info("The current task Group is full, taskGroup: {}", 
taskGroup);
                 return false;
             }
-            int affectedCount = 
taskGroupMapper.robTaskGroupResource(taskGroup.getId(),
+            int affectedCount = taskGroupMapper.robTaskGroupResource(
+                    taskGroup.getId(),
                     taskGroup.getUseSize(),
                     taskGroupQueue.getId(),
                     TaskGroupQueueStatus.WAIT_QUEUE.getCode());
@@ -2404,6 +2409,7 @@ public class ProcessServiceImpl implements ProcessService 
{
             }
         }
         log.info("Failed to rob taskGroup, taskGroupQueue: {}", 
taskGroupQueue);
+        taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), 
taskGroupQueue.getId());
         return false;
     }
 
@@ -2431,10 +2437,11 @@ public class ProcessServiceImpl implements 
ProcessService {
             do {
                 taskGroup = 
taskGroupMapper.selectById(taskInstance.getTaskGroupId());
                 if (taskGroup == null) {
-                    log.error("The taskGroup is null, taskGroupId: {}", 
taskInstance.getTaskGroupId());
+                    log.error("The taskGroup is not exist no need to release 
taskGroup, taskGroupId: {}",
+                            taskInstance.getTaskGroupId());
                     return null;
                 }
-                thisTaskGroupQueue = 
this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
+                thisTaskGroupQueue = 
taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
                 if (thisTaskGroupQueue.getStatus() == 
TaskGroupQueueStatus.RELEASE) {
                     log.info("The taskGroupQueue's status is release, 
taskInstanceId: {}", taskInstance.getId());
                     return null;
@@ -2458,20 +2465,22 @@ public class ProcessServiceImpl implements 
ProcessService {
         changeTaskGroupQueueStatus(taskInstance.getId(), 
TaskGroupQueueStatus.RELEASE);
         TaskGroupQueue taskGroupQueue;
         do {
-            taskGroupQueue = 
this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
+            taskGroupQueue = taskGroupQueueMapper.queryTheHighestPriorityTasks(
+                    taskGroup.getId(),
                     TaskGroupQueueStatus.WAIT_QUEUE.getCode(),
                     Flag.NO.getCode(),
                     Flag.NO.getCode());
             if (taskGroupQueue == null) {
-                log.info("The taskGroupQueue is null, taskGroup: {}", 
taskGroup.getId());
+                log.info("There is no taskGroupQueue need to be wakeup 
taskGroup: {}", taskGroup.getId());
                 return null;
             }
-        } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
+        } while (this.taskGroupQueueMapper.updateInQueueCAS(
+                Flag.NO.getCode(),
                 Flag.YES.getCode(),
                 taskGroupQueue.getId()) != 1);
         log.info("Finished to release task group queue: taskGroupId: {}, 
taskGroupQueueId: {}",
                 taskInstance.getTaskGroupId(), taskGroupQueue.getId());
-        return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
+        return taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
     }
 
     /**
@@ -2505,6 +2514,7 @@ public class ProcessServiceImpl implements ProcessService 
{
                 .processId(workflowInstanceId)
                 .priority(taskGroupPriority)
                 .status(status)
+                .forceStart(Flag.NO.getCode())
                 .inQueue(Flag.NO.getCode())
                 .createTime(now)
                 .updateTime(now)

Reply via email to