This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0823c2f4ef [Fix-17613] [Master] Task group queue priority always 
remains 0 (#17614)
0823c2f4ef is described below

commit 0823c2f4ef9a08cc2d783d16a14b3002dbdceda7
Author: KwongHing <[email protected]>
AuthorDate: Thu Nov 13 19:22:59 2025 +0800

    [Fix-17613] [Master] Task group queue priority always remains 0 (#17614)
---
 .../apache/dolphinscheduler/dao/entity/TaskDefinition.java    |  2 +-
 .../server/master/engine/ITaskGroupCoordinator.java           |  4 +++-
 .../server/master/engine/TaskGroupCoordinator.java            |  5 +++--
 .../master/engine/task/runnable/TaskExecutionRunnable.java    |  2 +-
 .../engine/task/statemachine/AbstractTaskStateAction.java     |  4 +++-
 .../server/master/engine/TaskGroupCoordinatorTest.java        | 11 +++++++----
 .../start/workflow_with_one_fake_task_using_task_group.yaml   |  1 +
 7 files changed, 19 insertions(+), 10 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index 45d5738ae7..5205dfc5ae 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -206,7 +206,7 @@ public class TaskDefinition {
      */
     private int taskGroupId;
     /**
-     * task group id
+     * task group priority, todo: we should add this field to task instance 
when create task instance
      */
     private int taskGroupPriority;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
index 75bb63438f..6fd885a21f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.engine;
 
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 
@@ -66,9 +67,10 @@ public interface ITaskGroupCoordinator extends AutoCloseable 
{
      * The TaskInstance shouldn't dispatch until there exist available slot, 
the taskGroupCoordinator notify it.
      *
      * @param taskInstance the task instance which want to acquire task group 
slot.
+     * @param taskDefinition the task definition which contains the task group.
      * @throws IllegalArgumentException if the taskInstance is null or the 
used taskGroup doesn't exist.
      */
-    void acquireTaskGroupSlot(TaskInstance taskInstance);
+    void acquireTaskGroupSlot(TaskInstance taskInstance, TaskDefinition 
taskDefinition);
 
     /**
      * If the TaskInstance is using TaskGroup then it need to release 
TaskGroupSlot.
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
index 4fe69e9ac4..c4cf3c1f92 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
@@ -23,6 +23,7 @@ import 
org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskGroup;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -334,7 +335,7 @@ public class TaskGroupCoordinator implements 
ITaskGroupCoordinator, AutoCloseabl
     }
 
     @Override
-    public void acquireTaskGroupSlot(TaskInstance taskInstance) {
+    public void acquireTaskGroupSlot(TaskInstance taskInstance, TaskDefinition 
taskDefinition) {
         if (taskInstance == null || taskInstance.getTaskGroupId() <= 0) {
             throw new IllegalArgumentException("The current TaskInstance does 
not use task group");
         }
@@ -353,7 +354,7 @@ public class TaskGroupCoordinator implements 
ITaskGroupCoordinator, AutoCloseabl
                 .taskName(taskInstance.getName())
                 .groupId(taskInstance.getTaskGroupId())
                 .workflowInstanceId(taskInstance.getWorkflowInstanceId())
-                .priority(taskInstance.getTaskGroupPriority())
+                .priority(taskDefinition.getTaskGroupPriority())
                 .inQueue(Flag.YES.getCode())
                 .forceStart(Flag.NO.getCode())
                 .status(TaskGroupQueueStatus.WAIT_QUEUE)
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
index b2e4c33cbc..50b4e15a14 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
@@ -178,7 +178,7 @@ public class TaskExecutionRunnable implements 
ITaskExecutionRunnable {
 
         // larger number, higher priority
         int taskGroupPriorityCompareResult =
-                taskInstance.getTaskGroupPriority() - 
other.getTaskInstance().getTaskGroupPriority();
+                taskDefinition.getTaskGroupPriority() - 
other.getTaskDefinition().getTaskGroupPriority();
         if (taskGroupPriorityCompareResult != 0) {
             return -taskGroupPriorityCompareResult;
         }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index e451cc78a7..ad0e652d48 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -21,6 +21,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus.DISPATCH;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
@@ -84,7 +85,8 @@ public abstract class AbstractTaskStateAction implements 
ITaskStateAction {
      */
     protected void acquireTaskGroupSlot(final ITaskExecutionRunnable 
taskExecutionRunnable) {
         final TaskInstance taskInstance = 
taskExecutionRunnable.getTaskInstance();
-        taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
+        final TaskDefinition taskDefinition = 
taskExecutionRunnable.getTaskDefinition();
+        taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, 
taskDefinition);
     }
 
     /**
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
index 4568e329da..6f4b7d516c 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskGroup;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -110,13 +111,15 @@ class TaskGroupCoordinatorTest {
     void acquireTaskGroupSlot() {
         // TaskInstance is NULL
         IllegalArgumentException illegalArgumentException =
-                assertThrows(IllegalArgumentException.class, () -> 
taskGroupCoordinator.acquireTaskGroupSlot(null));
+                assertThrows(IllegalArgumentException.class,
+                        () -> taskGroupCoordinator.acquireTaskGroupSlot(null, 
null));
         assertEquals("The current TaskInstance does not use task group", 
illegalArgumentException.getMessage());
 
+        TaskDefinition taskDefinition = new TaskDefinition();
         // TaskGroupId is NULL
         TaskInstance taskInstance = new TaskInstance();
         illegalArgumentException = assertThrows(IllegalArgumentException.class,
-                () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
+                () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, 
taskDefinition));
         assertEquals("The current TaskInstance does not use task group", 
illegalArgumentException.getMessage());
 
         // TaskGroup not exist
@@ -124,12 +127,12 @@ class TaskGroupCoordinatorTest {
         taskInstance.setId(1);
         
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(null);
         illegalArgumentException = assertThrows(IllegalArgumentException.class,
-                () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
+                () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, 
taskDefinition));
         assertEquals("The current TaskGroup: 1 does not exist", 
illegalArgumentException.getMessage());
 
         // TaskGroup exist
         
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(new 
TaskGroup());
-        Assertions.assertDoesNotThrow(() -> 
taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
+        Assertions.assertDoesNotThrow(() -> 
taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, taskDefinition));
 
     }
 
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
index d846e0f7ea..e798e334a7 100644
--- 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
@@ -46,6 +46,7 @@ tasks:
     taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
     workerGroup: default
     taskGroupId: 1
+    taskGroupPriority: 1
     createTime: 2024-08-12 00:00:00
     updateTime: 2021-08-12 00:00:00
     taskExecuteType: BATCH

Reply via email to