This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0823c2f4ef [Fix-17613] [Master] Task group queue priority always
remains 0 (#17614)
0823c2f4ef is described below
commit 0823c2f4ef9a08cc2d783d16a14b3002dbdceda7
Author: KwongHing <[email protected]>
AuthorDate: Thu Nov 13 19:22:59 2025 +0800
[Fix-17613] [Master] Task group queue priority always remains 0 (#17614)
---
.../apache/dolphinscheduler/dao/entity/TaskDefinition.java | 2 +-
.../server/master/engine/ITaskGroupCoordinator.java | 4 +++-
.../server/master/engine/TaskGroupCoordinator.java | 5 +++--
.../master/engine/task/runnable/TaskExecutionRunnable.java | 2 +-
.../engine/task/statemachine/AbstractTaskStateAction.java | 4 +++-
.../server/master/engine/TaskGroupCoordinatorTest.java | 11 +++++++----
.../start/workflow_with_one_fake_task_using_task_group.yaml | 1 +
7 files changed, 19 insertions(+), 10 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index 45d5738ae7..5205dfc5ae 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -206,7 +206,7 @@ public class TaskDefinition {
*/
private int taskGroupId;
/**
- * task group id
+ * task group priority, todo: we should add this field to task instance
when create task instance
*/
private int taskGroupPriority;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
index 75bb63438f..6fd885a21f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.engine;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -66,9 +67,10 @@ public interface ITaskGroupCoordinator extends AutoCloseable
{
* The TaskInstance shouldn't dispatch until there exist available slot,
the taskGroupCoordinator notify it.
*
* @param taskInstance the task instance which want to acquire task group
slot.
+ * @param taskDefinition the task definition which contains the task group.
* @throws IllegalArgumentException if the taskInstance is null or the
used taskGroup doesn't exist.
*/
- void acquireTaskGroupSlot(TaskInstance taskInstance);
+ void acquireTaskGroupSlot(TaskInstance taskInstance, TaskDefinition
taskDefinition);
/**
* If the TaskInstance is using TaskGroup then it need to release
TaskGroupSlot.
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
index 4fe69e9ac4..c4cf3c1f92 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
@@ -23,6 +23,7 @@ import
org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -334,7 +335,7 @@ public class TaskGroupCoordinator implements
ITaskGroupCoordinator, AutoCloseabl
}
@Override
- public void acquireTaskGroupSlot(TaskInstance taskInstance) {
+ public void acquireTaskGroupSlot(TaskInstance taskInstance, TaskDefinition
taskDefinition) {
if (taskInstance == null || taskInstance.getTaskGroupId() <= 0) {
throw new IllegalArgumentException("The current TaskInstance does
not use task group");
}
@@ -353,7 +354,7 @@ public class TaskGroupCoordinator implements
ITaskGroupCoordinator, AutoCloseabl
.taskName(taskInstance.getName())
.groupId(taskInstance.getTaskGroupId())
.workflowInstanceId(taskInstance.getWorkflowInstanceId())
- .priority(taskInstance.getTaskGroupPriority())
+ .priority(taskDefinition.getTaskGroupPriority())
.inQueue(Flag.YES.getCode())
.forceStart(Flag.NO.getCode())
.status(TaskGroupQueueStatus.WAIT_QUEUE)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
index b2e4c33cbc..50b4e15a14 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
@@ -178,7 +178,7 @@ public class TaskExecutionRunnable implements
ITaskExecutionRunnable {
// larger number, higher priority
int taskGroupPriorityCompareResult =
- taskInstance.getTaskGroupPriority() -
other.getTaskInstance().getTaskGroupPriority();
+ taskDefinition.getTaskGroupPriority() -
other.getTaskDefinition().getTaskGroupPriority();
if (taskGroupPriorityCompareResult != 0) {
return -taskGroupPriorityCompareResult;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index e451cc78a7..ad0e652d48 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -21,6 +21,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus.DISPATCH;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
@@ -84,7 +85,8 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
*/
protected void acquireTaskGroupSlot(final ITaskExecutionRunnable
taskExecutionRunnable) {
final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
- taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
+ final TaskDefinition taskDefinition =
taskExecutionRunnable.getTaskDefinition();
+ taskGroupCoordinator.acquireTaskGroupSlot(taskInstance,
taskDefinition);
}
/**
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
index 4568e329da..6f4b7d516c 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -110,13 +111,15 @@ class TaskGroupCoordinatorTest {
void acquireTaskGroupSlot() {
// TaskInstance is NULL
IllegalArgumentException illegalArgumentException =
- assertThrows(IllegalArgumentException.class, () ->
taskGroupCoordinator.acquireTaskGroupSlot(null));
+ assertThrows(IllegalArgumentException.class,
+ () -> taskGroupCoordinator.acquireTaskGroupSlot(null,
null));
assertEquals("The current TaskInstance does not use task group",
illegalArgumentException.getMessage());
+ TaskDefinition taskDefinition = new TaskDefinition();
// TaskGroupId is NULL
TaskInstance taskInstance = new TaskInstance();
illegalArgumentException = assertThrows(IllegalArgumentException.class,
- () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
+ () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance,
taskDefinition));
assertEquals("The current TaskInstance does not use task group",
illegalArgumentException.getMessage());
// TaskGroup not exist
@@ -124,12 +127,12 @@ class TaskGroupCoordinatorTest {
taskInstance.setId(1);
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(null);
illegalArgumentException = assertThrows(IllegalArgumentException.class,
- () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
+ () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance,
taskDefinition));
assertEquals("The current TaskGroup: 1 does not exist",
illegalArgumentException.getMessage());
// TaskGroup exist
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(new
TaskGroup());
- Assertions.assertDoesNotThrow(() ->
taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
+ Assertions.assertDoesNotThrow(() ->
taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, taskDefinition));
}
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
index d846e0f7ea..e798e334a7 100644
---
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
@@ -46,6 +46,7 @@ tasks:
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
workerGroup: default
taskGroupId: 1
+ taskGroupPriority: 1
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH