This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d810c487fd Fix taskGroupQueue doesn't removed from inQueue where
wakeup failed (#14200)
d810c487fd is described below
commit d810c487fd8b015faefb72dca92ed41fb719b60d
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu May 25 14:42:32 2023 +0800
Fix taskGroupQueue doesn't removed from inQueue where wakeup failed (#14200)
* Fix taskGroupQueue doesn't removed from inQueue where wakeup failed
* Fix avaliable taskGroup is 0 the TaskGroupQueue doesn't out queue
---
.../master/runner/WorkflowExecuteRunnable.java | 43 ++++++++++++--------
.../runner/operator/TaskTimeoutOperator.java | 35 +++++++++++++++-
.../service/process/ProcessServiceImpl.java | 46 +++++++++++++---------
3 files changed, 87 insertions(+), 37 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 7f77a5c99f..c7b7a37bac 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -496,24 +496,33 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
*
*/
public void releaseTaskGroup(TaskInstance taskInstance) throws
RemotingException, InterruptedException {
- if (taskInstance.getTaskGroupId() > 0) {
- TaskInstance nextTaskInstance =
this.processService.releaseTaskGroup(taskInstance);
- if (nextTaskInstance != null) {
- if (nextTaskInstance.getProcessInstanceId() ==
taskInstance.getProcessInstanceId()) {
- TaskStateEvent nextEvent = TaskStateEvent.builder()
- .processInstanceId(processInstance.getId())
- .taskInstanceId(nextTaskInstance.getId())
- .type(StateEventType.WAKE_UP_TASK_GROUP)
- .build();
- this.stateEvents.add(nextEvent);
- } else {
- ProcessInstance processInstance =
-
this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
-
this.masterRpcClient.sendSyncCommand(Host.of(processInstance.getHost()),
- new TaskWakeupRequest(processInstance.getId(),
nextTaskInstance.getId()).convert2Command());
- }
- }
+ // todo: use Integer
+ if (taskInstance.getTaskGroupId() <= 0) {
+ log.info("The current TaskInstance: {} doesn't use taskGroup, no
need to release taskGroup",
+ taskInstance.getName());
+ }
+ TaskInstance nextTaskInstance =
processService.releaseTaskGroup(taskInstance);
+ if (nextTaskInstance == null) {
+ log.info(
+ "The current TaskInstance: {} is the last taskInstance in
the taskGroup, no need to wakeup next taskInstance",
+ taskInstance.getName());
+ return;
+ }
+ if (nextTaskInstance.getProcessInstanceId() ==
taskInstance.getProcessInstanceId()) {
+ TaskStateEvent nextEvent = TaskStateEvent.builder()
+ .processInstanceId(processInstance.getId())
+ .taskInstanceId(nextTaskInstance.getId())
+ .type(StateEventType.WAKE_UP_TASK_GROUP)
+ .build();
+ stateEvents.add(nextEvent);
+ } else {
+ ProcessInstance processInstance =
+
processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
+ masterRpcClient.sendSyncCommand(
+ Host.of(processInstance.getHost()),
+ new TaskWakeupRequest(processInstance.getId(),
nextTaskInstance.getId()).convert2Command());
}
+ log.info("Success send wakeup message to next taskInstance: {}",
nextTaskInstance.getId());
}
/**
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
index 17de1e9539..146c3b894d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
@@ -19,9 +19,18 @@ package
org.apache.dolphinscheduler.server.master.runner.operator;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.remote.command.task.TaskKillRequest;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Date;
+
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -34,6 +43,9 @@ public class TaskTimeoutOperator implements TaskOperator {
@Autowired
private TaskInstanceDao taskInstanceDao;
+ @Autowired
+ private MasterRpcClient masterRpcClient;
+
@Override
public void handle(DefaultTaskExecuteRunnable taskExecuteRunnable) {
// Right now, if the task is running in worker, the timeout strategy
will be handled at worker side.
@@ -48,8 +60,27 @@ public class TaskTimeoutOperator implements TaskOperator {
taskInstance.getName(), taskTimeoutStrategy.name());
return;
}
- taskExecuteRunnable.kill();
- log.info("TaskInstance: {} timeout, killed the task instance",
taskInstance.getName());
+ try {
+ timeoutTaskInstanceInDB(taskInstance);
+ killRemoteTaskInstanceInThreadPool(taskInstance);
+ log.info("TaskInstance: {} timeout, killed the task instance",
taskInstance.getName());
+ } catch (Exception ex) {
+ log.error("TaskInstance timeout {} failed",
taskInstance.getName(), ex);
+ }
}
+
+ private void timeoutTaskInstanceInDB(TaskInstance taskInstance) {
+ taskInstance.setState(TaskExecutionStatus.FAILURE);
+ taskInstance.setEndTime(new Date());
+ taskInstanceDao.updateTaskInstance(taskInstance);
+ }
+
+ private void killRemoteTaskInstanceInThreadPool(TaskInstance taskInstance)
throws RemotingException {
+ if (StringUtils.isEmpty(taskInstance.getHost())) {
+ return;
+ }
+ TaskKillRequest killCommand = new
TaskKillRequest(taskInstance.getId());
+ masterRpcClient.send(Host.of(taskInstance.getHost()),
killCommand.convert2Command());
+ }
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index d0f56f329f..485e357509 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -2334,14 +2334,17 @@ public class ProcessServiceImpl implements
ProcessService {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupId);
if (taskGroup == null) {
// we don't throw exception here, to avoid the task group has been
deleted during workflow running
+ log.warn("The taskGroup is not exist no need to acquire taskGroup,
taskGroupId: {}", taskGroupId);
return true;
}
// if task group is not applicable
if (taskGroup.getStatus() == Flag.NO.getCode()) {
+ log.warn("The taskGroup status is {}, no need to acquire
taskGroup, taskGroupId: {}", taskGroup.getStatus(),
+ taskGroupId);
return true;
}
// Create a waiting taskGroupQueue, after acquire resource, we can
update the status to ACQUIRE_SUCCESS
- TaskGroupQueue taskGroupQueue =
this.taskGroupQueueMapper.queryByTaskId(taskInstanceId);
+ TaskGroupQueue taskGroupQueue =
taskGroupQueueMapper.queryByTaskId(taskInstanceId);
if (taskGroupQueue == null) {
taskGroupQueue = insertIntoTaskGroupQueue(
taskInstanceId,
@@ -2350,14 +2353,12 @@ public class ProcessServiceImpl implements
ProcessService {
workflowInstanceId,
taskGroupPriority,
TaskGroupQueueStatus.WAIT_QUEUE);
+ log.info("Insert TaskGroupQueue: {} successfully",
taskGroupQueue.getId());
} else {
log.info("The task queue is already exist, taskId: {}",
taskInstanceId);
if (taskGroupQueue.getStatus() ==
TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
return true;
}
- taskGroupQueue.setInQueue(Flag.NO.getCode());
- taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
- this.taskGroupQueueMapper.updateById(taskGroupQueue);
}
// check if there already exist higher priority tasks
List<TaskGroupQueue> highPriorityTasks =
taskGroupQueueMapper.queryHighPriorityTasks(
@@ -2368,14 +2369,15 @@ public class ProcessServiceImpl implements
ProcessService {
return false;
}
// try to get taskGroup
- int count = taskGroupMapper.selectAvailableCountById(taskGroupId);
- if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
- log.info("Success acquire taskGroup, taskInstanceId: {},
taskGroupId: {}", taskInstanceId, taskGroupId);
- return true;
+ int availableTaskGroupCount =
taskGroupMapper.selectAvailableCountById(taskGroupId);
+ if (availableTaskGroupCount < 1) {
+ log.info(
+ "Failed to acquire taskGroup, there is no avaliable
taskGroup, taskInstanceId: {}, taskGroupId: {}",
+ taskInstanceId, taskGroupId);
+ taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(),
taskGroupQueue.getId());
+ return false;
}
- log.info("Failed to acquire taskGroup, taskInstanceId: {},
taskGroupId: {}", taskInstanceId, taskGroupId);
- this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(),
taskGroupQueue.getId());
- return false;
+ return robTaskGroupResource(taskGroupQueue);
}
/**
@@ -2387,10 +2389,13 @@ public class ProcessServiceImpl implements
ProcessService {
for (int i = 0; i < 10; i++) {
TaskGroup taskGroup =
taskGroupMapper.selectById(taskGroupQueue.getGroupId());
if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) {
+ // remove
+ taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(),
taskGroupQueue.getId());
log.info("The current task Group is full, taskGroup: {}",
taskGroup);
return false;
}
- int affectedCount =
taskGroupMapper.robTaskGroupResource(taskGroup.getId(),
+ int affectedCount = taskGroupMapper.robTaskGroupResource(
+ taskGroup.getId(),
taskGroup.getUseSize(),
taskGroupQueue.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
@@ -2404,6 +2409,7 @@ public class ProcessServiceImpl implements ProcessService
{
}
}
log.info("Failed to rob taskGroup, taskGroupQueue: {}",
taskGroupQueue);
+ taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(),
taskGroupQueue.getId());
return false;
}
@@ -2431,10 +2437,11 @@ public class ProcessServiceImpl implements
ProcessService {
do {
taskGroup =
taskGroupMapper.selectById(taskInstance.getTaskGroupId());
if (taskGroup == null) {
- log.error("The taskGroup is null, taskGroupId: {}",
taskInstance.getTaskGroupId());
+ log.error("The taskGroup is not exist no need to release
taskGroup, taskGroupId: {}",
+ taskInstance.getTaskGroupId());
return null;
}
- thisTaskGroupQueue =
this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
+ thisTaskGroupQueue =
taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() ==
TaskGroupQueueStatus.RELEASE) {
log.info("The taskGroupQueue's status is release,
taskInstanceId: {}", taskInstance.getId());
return null;
@@ -2458,20 +2465,22 @@ public class ProcessServiceImpl implements
ProcessService {
changeTaskGroupQueueStatus(taskInstance.getId(),
TaskGroupQueueStatus.RELEASE);
TaskGroupQueue taskGroupQueue;
do {
- taskGroupQueue =
this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
+ taskGroupQueue = taskGroupQueueMapper.queryTheHighestPriorityTasks(
+ taskGroup.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode(),
Flag.NO.getCode(),
Flag.NO.getCode());
if (taskGroupQueue == null) {
- log.info("The taskGroupQueue is null, taskGroup: {}",
taskGroup.getId());
+ log.info("There is no taskGroupQueue need to be wakeup
taskGroup: {}", taskGroup.getId());
return null;
}
- } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
+ } while (this.taskGroupQueueMapper.updateInQueueCAS(
+ Flag.NO.getCode(),
Flag.YES.getCode(),
taskGroupQueue.getId()) != 1);
log.info("Finished to release task group queue: taskGroupId: {},
taskGroupQueueId: {}",
taskInstance.getTaskGroupId(), taskGroupQueue.getId());
- return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
+ return taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
}
/**
@@ -2505,6 +2514,7 @@ public class ProcessServiceImpl implements ProcessService
{
.processId(workflowInstanceId)
.priority(taskGroupPriority)
.status(status)
+ .forceStart(Flag.NO.getCode())
.inQueue(Flag.NO.getCode())
.createTime(now)
.updateTime(now)