This is an automated email from the ASF dual-hosted git repository.
gallardot pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 34b74525d9 [Improvement] Delete taskGroupQueue when release (#16425)
34b74525d9 is described below
commit 34b74525d9f1bcd5e650d8a00898a5b03ded40b9
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Aug 7 20:51:52 2024 +0800
[Improvement] Delete taskGroupQueue when release (#16425)
---
.../runner/taskgroup/TaskGroupCoordinator.java | 33 ++++++++--------------
.../runner/taskgroup/TaskGroupCoordinatorTest.java | 5 +---
2 files changed, 12 insertions(+), 26 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
index bd7af94611..aecc32287b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
@@ -166,7 +166,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
}
/**
- * Make sure the TaskGroupQueue status is {@link
TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not
exist or status is finished.
+ * Clear the TaskGroupQueue when the related {@link TaskInstance} is not
exist or status is finished.
*/
private void amendTaskGroupQueueStatus() {
int minTaskGroupQueueId = -1;
@@ -188,7 +188,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
}
/**
- * Make sure the TaskGroupQueue status is {@link
TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not
exist or status is finished.
+ * Clear the TaskGroupQueue when the related {@link TaskInstance} is not
exist or status is finished.
*/
private void amendTaskGroupQueueStatus(List<TaskGroupQueue>
taskGroupQueues) {
List<Integer> taskInstanceIds = taskGroupQueues.stream()
@@ -205,14 +205,14 @@ public class TaskGroupCoordinator extends
BaseDaemonThread {
if (taskInstance == null) {
log.warn("The TaskInstance: {} is not exist, will release the
TaskGroupQueue: {}", taskId,
taskGroupQueue);
- releaseTaskGroupQueueSlot(taskGroupQueue);
+ deleteTaskGroupQueueSlot(taskGroupQueue);
continue;
}
if (taskInstance.getState().isFinished()) {
log.warn("The TaskInstance: {} state: {} finished, will
release the TaskGroupQueue: {}",
taskInstance.getName(), taskInstance.getState(),
taskGroupQueue);
- releaseTaskGroupQueueSlot(taskGroupQueue);
+ deleteTaskGroupQueueSlot(taskGroupQueue);
continue;
}
}
@@ -257,13 +257,10 @@ public class TaskGroupCoordinator extends
BaseDaemonThread {
taskGroupQueue.getTaskName(),
taskGroupQueue.getId());
- taskGroupQueue.setInQueue(Flag.NO.getCode());
- taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE);
- taskGroupQueue.setUpdateTime(new Date());
- taskGroupQueueDao.updateById(taskGroupQueue);
+ deleteTaskGroupQueueSlot(taskGroupQueue);
log.info("Release the force start TaskGroupQueue {}",
taskGroupQueue);
} catch (UnsupportedOperationException
unsupportedOperationException) {
- releaseTaskGroupQueueSlot(taskGroupQueue);
+ deleteTaskGroupQueueSlot(taskGroupQueue);
log.info(
"Notify the ForceStart TaskInstance: {} for
taskGroupQueue: {} failed, will release the taskGroupQueue",
taskGroupQueue.getTaskName(), taskGroupQueue.getId(),
unsupportedOperationException);
@@ -322,7 +319,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueDao.updateById(taskGroupQueue);
} catch (UnsupportedOperationException
unsupportedOperationException) {
- releaseTaskGroupQueueSlot(taskGroupQueue);
+ deleteTaskGroupQueueSlot(taskGroupQueue);
log.info(
"Notify the Waiting TaskInstance: {} for
taskGroupQueue: {} failed, will release the taskGroupQueue",
taskGroupQueue.getTaskName(),
taskGroupQueue.getId(), unsupportedOperationException);
@@ -417,7 +414,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
/**
* Release the task group slot for the given {@link TaskInstance}.
* <p>
- * When taskInstance want to release a TaskGroup slot, should call this
method. The release method will move the TaskGroupQueue out queue and set
status to {@link TaskGroupQueueStatus#RELEASE}.
+ * When taskInstance want to release a TaskGroup slot, should call this
method. The release method will delete the taskGroupQueue.
* This method is idempotent, this means that if the task group slot is
already released, this method will do nothing.
*
* @param taskInstance the task instance which want to release task group
slot.
@@ -429,7 +426,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
}
List<TaskGroupQueue> taskGroupQueues =
taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId());
for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
- releaseTaskGroupQueueSlot(taskGroupQueue);
+ deleteTaskGroupQueueSlot(taskGroupQueue);
}
}
@@ -479,16 +476,8 @@ public class TaskGroupCoordinator extends BaseDaemonThread
{
log.info("Wake up TaskInstance: {} success", taskInstance.getName());
}
- private void releaseTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) {
- if (TaskGroupQueueStatus.RELEASE.equals(taskGroupQueue.getStatus())
- && Flag.NO.getCode() == taskGroupQueue.getInQueue()) {
- log.info("The TaskGroupQueue: {} is already released",
taskGroupQueue);
- return;
- }
- taskGroupQueue.setInQueue(Flag.NO.getCode());
- taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE);
- taskGroupQueue.setUpdateTime(new Date());
- taskGroupQueueDao.updateById(taskGroupQueue);
+ private void deleteTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) {
+ taskGroupQueueDao.deleteById(taskGroupQueue);
log.info("Success release TaskGroupQueue: {}", taskGroupQueue);
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
index f654550b75..9214afaba8 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -174,9 +173,7 @@ class TaskGroupCoordinatorTest {
when(taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId())).thenReturn(taskGroupQueues);
taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
- assertEquals(Flag.NO.getCode(), taskGroupQueue.getInQueue());
- assertEquals(TaskGroupQueueStatus.RELEASE, taskGroupQueue.getStatus());
- verify(taskGroupQueueDao, Mockito.times(1)).updateById(taskGroupQueue);
+ verify(taskGroupQueueDao, Mockito.times(1)).deleteById(taskGroupQueue);
}
}