This is an automated email from the ASF dual-hosted git repository.

changhaifu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 44c356db4d TaskGroupPriority only compare When TaskGroup is same 
(#15486)
44c356db4d is described below

commit 44c356db4dda185e585de7727316e756002bb720
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Jan 18 10:59:54 2024 +0800

    TaskGroupPriority only compare When TaskGroup is same (#15486)
---
 .../master/runner/WorkflowExecuteRunnable.java     | 33 ++++++++++----------
 ....java => StandByTaskInstancePriorityQueue.java} | 35 +++++++++++-----------
 ...a => StandByTaskInstancePriorityQueueTest.java} | 29 +++++++++---------
 3 files changed, 50 insertions(+), 47 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index b558d6cf37..3759f2209c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -91,7 +91,7 @@ import 
org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.model.TaskNode;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
+import 
org.apache.dolphinscheduler.service.queue.StandByTaskInstancePriorityQueue;
 import org.apache.dolphinscheduler.service.utils.DagHelper;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -208,7 +208,8 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
     /**
      * The StandBy task list, will be executed, need to know, the taskInstance 
in this queue may doesn't have id.
      */
-    private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new 
PeerTaskInstancePriorityQueue();
+    private final StandByTaskInstancePriorityQueue 
standByTaskInstancePriorityQueue =
+            new StandByTaskInstancePriorityQueue();
 
     /**
      * wait to retry taskInstance map, taskCode as key, taskInstance as value
@@ -249,7 +250,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
         this.taskInstanceDao = taskInstanceDao;
         this.defaultTaskExecuteRunnableFactory = 
defaultTaskExecuteRunnableFactory;
         this.listenerEventAlertManager = listenerEventAlertManager;
-        TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
+        
TaskMetrics.registerTaskPrepared(standByTaskInstancePriorityQueue::size);
     }
 
     /**
@@ -1430,7 +1431,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
         // if previous node success , post node submit
         for (TaskInstance task : taskInstances) {
 
-            if (readyToSubmitTaskQueue.contains(task)) {
+            if (standByTaskInstancePriorityQueue.contains(task)) {
                 log.warn("Task is already at submit queue, taskInstanceName: 
{}", task.getName());
                 continue;
             }
@@ -1665,7 +1666,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                 return true;
             }
             if (workflowInstance.getFailureStrategy() == 
FailureStrategy.CONTINUE) {
-                return readyToSubmitTaskQueue.size() == 0 && 
taskExecuteRunnableMap.size() == 0
+                return standByTaskInstancePriorityQueue.size() == 0 && 
taskExecuteRunnableMap.size() == 0
                         && waitToRetryTaskInstanceMap.size() == 0;
             }
         }
@@ -1688,7 +1689,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
 
         List<TaskInstance> pauseList = 
getCompleteTaskByState(TaskExecutionStatus.PAUSE);
         if (CollectionUtils.isNotEmpty(pauseList) || 
workflowInstance.isBlocked() || !isComplementEnd()
-                || readyToSubmitTaskQueue.size() > 0) {
+                || standByTaskInstancePriorityQueue.size() > 0) {
             return WorkflowExecutionStatus.PAUSE;
         } else {
             return WorkflowExecutionStatus.SUCCESS;
@@ -1711,8 +1712,8 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                 }
             }
         }
-        if (readyToSubmitTaskQueue.size() > 0) {
-            for (Iterator<TaskInstance> iter = 
readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
+        if (standByTaskInstancePriorityQueue.size() > 0) {
+            for (Iterator<TaskInstance> iter = 
standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) {
                 iter.next().setState(TaskExecutionStatus.PAUSE);
             }
         }
@@ -1773,7 +1774,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
         // success
         if (state == WorkflowExecutionStatus.RUNNING_EXECUTION) {
             List<TaskInstance> killTasks = 
getCompleteTaskByState(TaskExecutionStatus.KILL);
-            if (readyToSubmitTaskQueue.size() > 0 || 
waitToRetryTaskInstanceMap.size() > 0) {
+            if (standByTaskInstancePriorityQueue.size() > 0 || 
waitToRetryTaskInstanceMap.size() > 0) {
                 // tasks currently pending submission, no retries, indicating 
that depend is waiting to complete
                 return WorkflowExecutionStatus.RUNNING_EXECUTION;
             } else if (CollectionUtils.isNotEmpty(killTasks)) {
@@ -1878,7 +1879,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
      * @param taskInstance task instance
      */
     public void addTaskToStandByList(TaskInstance taskInstance) {
-        if (readyToSubmitTaskQueue.contains(taskInstance)) {
+        if (standByTaskInstancePriorityQueue.contains(taskInstance)) {
             log.warn("Task already exists in ready submit queue, no need to 
add again, task code:{}",
                     taskInstance.getTaskCode());
             return;
@@ -1888,7 +1889,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                 taskInstance.getId(),
                 taskInstance.getTaskCode());
         TaskMetrics.incTaskInstanceByState("submit");
-        readyToSubmitTaskQueue.put(taskInstance);
+        standByTaskInstancePriorityQueue.put(taskInstance);
     }
 
     /**
@@ -1897,7 +1898,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
      * @param taskInstance task instance
      */
     private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
-        return readyToSubmitTaskQueue.remove(taskInstance);
+        return standByTaskInstancePriorityQueue.remove(taskInstance);
     }
 
     /**
@@ -1906,7 +1907,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
      * @return Boolean whether has retry task in standby
      */
     private boolean hasRetryTaskInStandBy() {
-        for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); 
iter.hasNext();) {
+        for (Iterator<TaskInstance> iter = 
standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) {
             if (iter.next().getState().isFailure()) {
                 return true;
             }
@@ -1923,8 +1924,8 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                 workflowInstance.getId(),
                 taskExecuteRunnableMap.size());
 
-        if (readyToSubmitTaskQueue.size() > 0) {
-            readyToSubmitTaskQueue.clear();
+        if (standByTaskInstancePriorityQueue.size() > 0) {
+            standByTaskInstancePriorityQueue.clear();
         }
 
         for (long taskCode : taskExecuteRunnableMap.keySet()) {
@@ -1965,7 +1966,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
     public void submitStandByTask() throws StateEventHandleException {
         ProcessInstance workflowInstance = 
workflowExecuteContext.getWorkflowInstance();
         TaskInstance task;
-        while ((task = readyToSubmitTaskQueue.peek()) != null) {
+        while ((task = standByTaskInstancePriorityQueue.peek()) != null) {
             // stop tasks which is retrying if forced success happens
             if (task.getId() != null && task.taskCanRetry()) {
                 TaskInstance retryTask = 
taskInstanceDao.queryById(task.getId());
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java
similarity index 79%
rename from 
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
rename to 
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java
index 5bd89ffad5..c11c4fe5a9 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.service.queue;
 
-import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import 
org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
 
@@ -35,7 +34,7 @@ import com.google.common.base.Preconditions;
  * Task instances priority queue implementation
  * All the task instances are in the same process instance.
  */
-public class PeerTaskInstancePriorityQueue implements 
TaskPriorityQueue<TaskInstance> {
+public class StandByTaskInstancePriorityQueue implements 
TaskPriorityQueue<TaskInstance> {
 
     /**
      * queue size
@@ -45,7 +44,8 @@ public class PeerTaskInstancePriorityQueue implements 
TaskPriorityQueue<TaskInst
     /**
      * queue
      */
-    private final PriorityQueue<TaskInstance> queue = new 
PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+    private final PriorityQueue<TaskInstance> queue =
+            new PriorityQueue<>(QUEUE_MAX_SIZE, new 
TaskInstancePriorityComparator());
     private final Set<String> taskInstanceIdentifySet = 
Collections.synchronizedSet(new HashSet<>());
 
     /**
@@ -163,24 +163,25 @@ public class PeerTaskInstancePriorityQueue implements 
TaskPriorityQueue<TaskInst
     }
 
     /**
-     * TaskInfoComparator
+     * This comparator is used to sort task instances in the standby queue.
+     * If the TaskInstance is in the same taskGroup, then we will sort the 
TaskInstance by {@link TaskInstance#getTaskGroupPriority()} in the taskGroup.
+     * Otherwise, we will sort the TaskInstance by {@link 
TaskInstance#getTaskInstancePriority()} in the workflow.
      */
-    private static class TaskInfoComparator implements 
Comparator<TaskInstance> {
-
-        /**
-         * compare o1 o2
-         *
-         * @param o1 o1
-         * @param o2 o2
-         * @return compare result
-         */
+    private static class TaskInstancePriorityComparator implements 
Comparator<TaskInstance> {
+
         @Override
         public int compare(TaskInstance o1, TaskInstance o2) {
-            if 
(o1.getTaskInstancePriority().equals(o2.getTaskInstancePriority())) {
-                // larger number, higher priority
-                return Constants.OPPOSITE_VALUE * 
Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
+            int taskPriorityInTaskGroup = -1 * 
Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
+            int taskInstancePriorityInWorkflow =
+                    Long.compare(o1.getTaskInstancePriority().getCode(), 
o2.getTaskInstancePriority().getCode());
+
+            if (o1.getTaskGroupId() == o2.getTaskGroupId()) {
+                // If at the same taskGroup
+                if (taskPriorityInTaskGroup != 0) {
+                    return taskPriorityInTaskGroup;
+                }
             }
-            return 
o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority());
+            return taskInstancePriorityInWorkflow;
         }
     }
 }
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java
similarity index 84%
rename from 
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
rename to 
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java
index a430ab3c95..80a2513929 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java
@@ -26,11 +26,11 @@ import java.util.concurrent.TimeUnit;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class PeerTaskInstancePriorityQueueTest {
+public class StandByTaskInstancePriorityQueueTest {
 
     @Test
     public void put() throws TaskPriorityQueueException {
-        PeerTaskInstancePriorityQueue queue = new 
PeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = new 
StandByTaskInstancePriorityQueue();
         TaskInstance taskInstanceHigPriority = createTaskInstance("high", 
Priority.HIGH, 1);
         TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", 
Priority.MEDIUM, 1);
         queue.put(taskInstanceHigPriority);
@@ -42,7 +42,7 @@ public class PeerTaskInstancePriorityQueueTest {
 
     @Test
     public void take() throws Exception {
-        PeerTaskInstancePriorityQueue queue = 
getPeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = 
getPeerTaskInstancePriorityQueue();
         int peekBeforeLength = queue.size();
         queue.take();
         Assertions.assertTrue(queue.size() < peekBeforeLength);
@@ -50,7 +50,7 @@ public class PeerTaskInstancePriorityQueueTest {
 
     @Test
     public void poll() throws Exception {
-        PeerTaskInstancePriorityQueue queue = 
getPeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = 
getPeerTaskInstancePriorityQueue();
         Assertions.assertThrows(TaskPriorityQueueException.class, () -> {
             queue.poll(1000, TimeUnit.MILLISECONDS);
         });
@@ -58,14 +58,15 @@ public class PeerTaskInstancePriorityQueueTest {
 
     @Test
     public void peek() throws Exception {
-        PeerTaskInstancePriorityQueue queue = 
getPeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = 
getPeerTaskInstancePriorityQueue();
         int peekBeforeLength = queue.size();
         Assertions.assertEquals(peekBeforeLength, queue.size());
     }
 
     @Test
     public void peekTaskGroupPriority() throws Exception {
-        PeerTaskInstancePriorityQueue queue = new 
PeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = new 
StandByTaskInstancePriorityQueue();
+
         TaskInstance taskInstanceHigPriority = createTaskInstance("high", 
Priority.HIGH, 2);
         TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", 
Priority.HIGH, 1);
         queue.put(taskInstanceMediumPriority);
@@ -80,7 +81,7 @@ public class PeerTaskInstancePriorityQueueTest {
         queue.put(taskInstanceHigPriority);
         taskInstance = queue.peek();
         queue.clear();
-        Assertions.assertEquals(taskInstance.getName(), "medium");
+        Assertions.assertEquals("medium", taskInstance.getName());
 
         taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
         taskInstanceMediumPriority = createTaskInstance("medium", 
Priority.MEDIUM, 2);
@@ -88,7 +89,7 @@ public class PeerTaskInstancePriorityQueueTest {
         queue.put(taskInstanceHigPriority);
         taskInstance = queue.peek();
         queue.clear();
-        Assertions.assertEquals(taskInstance.getName(), "high");
+        Assertions.assertEquals("medium", taskInstance.getName());
 
         taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
         taskInstanceMediumPriority = createTaskInstance("medium", 
Priority.MEDIUM, 1);
@@ -96,7 +97,7 @@ public class PeerTaskInstancePriorityQueueTest {
         queue.put(taskInstanceHigPriority);
         taskInstance = queue.peek();
         queue.clear();
-        Assertions.assertEquals(taskInstance.getName(), "high");
+        Assertions.assertEquals("high", taskInstance.getName());
 
     }
 
@@ -107,7 +108,7 @@ public class PeerTaskInstancePriorityQueueTest {
 
     @Test
     public void contains() throws Exception {
-        PeerTaskInstancePriorityQueue queue = new 
PeerTaskInstancePriorityQueue();
+        StandByTaskInstancePriorityQueue queue = new 
StandByTaskInstancePriorityQueue();
         TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", 
Priority.MEDIUM, 1);
         queue.put(taskInstanceMediumPriority);
         Assertions.assertTrue(queue.contains(taskInstanceMediumPriority));
@@ -117,8 +118,8 @@ public class PeerTaskInstancePriorityQueueTest {
     }
 
     @Test
-    public void remove() throws Exception {
-        PeerTaskInstancePriorityQueue queue = new 
PeerTaskInstancePriorityQueue();
+    public void remove() {
+        StandByTaskInstancePriorityQueue queue = new 
StandByTaskInstancePriorityQueue();
         TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", 
Priority.MEDIUM, 1);
         queue.put(taskInstanceMediumPriority);
         int peekBeforeLength = queue.size();
@@ -133,8 +134,8 @@ public class PeerTaskInstancePriorityQueueTest {
      * @return queue
      * @throws Exception
      */
-    private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() 
throws Exception {
-        PeerTaskInstancePriorityQueue queue = new 
PeerTaskInstancePriorityQueue();
+    private StandByTaskInstancePriorityQueue 
getPeerTaskInstancePriorityQueue() throws Exception {
+        StandByTaskInstancePriorityQueue queue = new 
StandByTaskInstancePriorityQueue();
         TaskInstance taskInstanceHigPriority = createTaskInstance("high", 
Priority.HIGH, 1);
         TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", 
Priority.MEDIUM, 1);
         taskInstanceHigPriority.setTaskGroupPriority(3);

Reply via email to