This is an automated email from the ASF dual-hosted git repository.

gallardot pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 34b74525d9 [Improvement] Delete taskGroupQueue when release (#16425)
34b74525d9 is described below

commit 34b74525d9f1bcd5e650d8a00898a5b03ded40b9
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Aug 7 20:51:52 2024 +0800

    [Improvement] Delete taskGroupQueue when release (#16425)
---
 .../runner/taskgroup/TaskGroupCoordinator.java     | 33 ++++++++--------------
 .../runner/taskgroup/TaskGroupCoordinatorTest.java |  5 +---
 2 files changed, 12 insertions(+), 26 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
index bd7af94611..aecc32287b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
@@ -166,7 +166,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
     }
 
     /**
-     * Make sure the TaskGroupQueue status is {@link 
TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not 
exist or status is finished.
+     * Clear the TaskGroupQueue when the related {@link TaskInstance} is not 
exist or status is finished.
      */
     private void amendTaskGroupQueueStatus() {
         int minTaskGroupQueueId = -1;
@@ -188,7 +188,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
     }
 
     /**
-     * Make sure the TaskGroupQueue status is {@link 
TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not 
exist or status is finished.
+     * Clear the TaskGroupQueue when the related {@link TaskInstance} is not 
exist or status is finished.
      */
     private void amendTaskGroupQueueStatus(List<TaskGroupQueue> 
taskGroupQueues) {
         List<Integer> taskInstanceIds = taskGroupQueues.stream()
@@ -205,14 +205,14 @@ public class TaskGroupCoordinator extends 
BaseDaemonThread {
             if (taskInstance == null) {
                 log.warn("The TaskInstance: {} is not exist, will release the 
TaskGroupQueue: {}", taskId,
                         taskGroupQueue);
-                releaseTaskGroupQueueSlot(taskGroupQueue);
+                deleteTaskGroupQueueSlot(taskGroupQueue);
                 continue;
             }
 
             if (taskInstance.getState().isFinished()) {
                 log.warn("The TaskInstance: {} state: {} finished, will 
release the TaskGroupQueue: {}",
                         taskInstance.getName(), taskInstance.getState(), 
taskGroupQueue);
-                releaseTaskGroupQueueSlot(taskGroupQueue);
+                deleteTaskGroupQueueSlot(taskGroupQueue);
                 continue;
             }
         }
@@ -257,13 +257,10 @@ public class TaskGroupCoordinator extends 
BaseDaemonThread {
                         taskGroupQueue.getTaskName(),
                         taskGroupQueue.getId());
 
-                taskGroupQueue.setInQueue(Flag.NO.getCode());
-                taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE);
-                taskGroupQueue.setUpdateTime(new Date());
-                taskGroupQueueDao.updateById(taskGroupQueue);
+                deleteTaskGroupQueueSlot(taskGroupQueue);
                 log.info("Release the force start TaskGroupQueue {}", 
taskGroupQueue);
             } catch (UnsupportedOperationException 
unsupportedOperationException) {
-                releaseTaskGroupQueueSlot(taskGroupQueue);
+                deleteTaskGroupQueueSlot(taskGroupQueue);
                 log.info(
                         "Notify the ForceStart TaskInstance: {} for 
taskGroupQueue: {} failed, will release the taskGroupQueue",
                         taskGroupQueue.getTaskName(), taskGroupQueue.getId(), 
unsupportedOperationException);
@@ -322,7 +319,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
                     taskGroupQueue.setUpdateTime(new Date());
                     taskGroupQueueDao.updateById(taskGroupQueue);
                 } catch (UnsupportedOperationException 
unsupportedOperationException) {
-                    releaseTaskGroupQueueSlot(taskGroupQueue);
+                    deleteTaskGroupQueueSlot(taskGroupQueue);
                     log.info(
                             "Notify the Waiting TaskInstance: {} for 
taskGroupQueue: {} failed, will release the taskGroupQueue",
                             taskGroupQueue.getTaskName(), 
taskGroupQueue.getId(), unsupportedOperationException);
@@ -417,7 +414,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
     /**
      * Release the task group slot for the given {@link TaskInstance}.
      * <p>
-     * When taskInstance want to release a TaskGroup slot, should call this 
method. The release method will move the TaskGroupQueue out queue and set 
status to {@link TaskGroupQueueStatus#RELEASE}.
+     * When taskInstance want to release a TaskGroup slot, should call this 
method. The release method will delete the taskGroupQueue.
      * This method is idempotent, this means that if the task group slot is 
already released, this method will do nothing.
      *
      * @param taskInstance the task instance which want to release task group 
slot.
@@ -429,7 +426,7 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
         }
         List<TaskGroupQueue> taskGroupQueues = 
taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId());
         for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
-            releaseTaskGroupQueueSlot(taskGroupQueue);
+            deleteTaskGroupQueueSlot(taskGroupQueue);
         }
     }
 
@@ -479,16 +476,8 @@ public class TaskGroupCoordinator extends BaseDaemonThread 
{
         log.info("Wake up TaskInstance: {} success", taskInstance.getName());
     }
 
-    private void releaseTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) {
-        if (TaskGroupQueueStatus.RELEASE.equals(taskGroupQueue.getStatus())
-                && Flag.NO.getCode() == taskGroupQueue.getInQueue()) {
-            log.info("The TaskGroupQueue: {} is already released", 
taskGroupQueue);
-            return;
-        }
-        taskGroupQueue.setInQueue(Flag.NO.getCode());
-        taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE);
-        taskGroupQueue.setUpdateTime(new Date());
-        taskGroupQueueDao.updateById(taskGroupQueue);
+    private void deleteTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) {
+        taskGroupQueueDao.deleteById(taskGroupQueue);
         log.info("Success release TaskGroupQueue: {}", taskGroupQueue);
     }
 
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
index f654550b75..9214afaba8 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinatorTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
 import org.apache.dolphinscheduler.dao.entity.TaskGroup;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -174,9 +173,7 @@ class TaskGroupCoordinatorTest {
         
when(taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId())).thenReturn(taskGroupQueues);
         taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
 
-        assertEquals(Flag.NO.getCode(), taskGroupQueue.getInQueue());
-        assertEquals(TaskGroupQueueStatus.RELEASE, taskGroupQueue.getStatus());
-        verify(taskGroupQueueDao, Mockito.times(1)).updateById(taskGroupQueue);
+        verify(taskGroupQueueDao, Mockito.times(1)).deleteById(taskGroupQueue);
 
     }
 }

Reply via email to