This is an automated email from the ASF dual-hosted git repository.
changhaifu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 44c356db4d TaskGroupPriority only compare When TaskGroup is same
(#15486)
44c356db4d is described below
commit 44c356db4dda185e585de7727316e756002bb720
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Jan 18 10:59:54 2024 +0800
TaskGroupPriority only compare When TaskGroup is same (#15486)
---
.../master/runner/WorkflowExecuteRunnable.java | 33 ++++++++++----------
....java => StandByTaskInstancePriorityQueue.java} | 35 +++++++++++-----------
...a => StandByTaskInstancePriorityQueueTest.java} | 29 +++++++++---------
3 files changed, 50 insertions(+), 47 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index b558d6cf37..3759f2209c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -91,7 +91,7 @@ import
org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
+import
org.apache.dolphinscheduler.service.queue.StandByTaskInstancePriorityQueue;
import org.apache.dolphinscheduler.service.utils.DagHelper;
import org.apache.commons.collections4.CollectionUtils;
@@ -208,7 +208,8 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
/**
* The StandBy task list, will be executed, need to know, the taskInstance
in this queue may doesn't have id.
*/
- private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new
PeerTaskInstancePriorityQueue();
+ private final StandByTaskInstancePriorityQueue
standByTaskInstancePriorityQueue =
+ new StandByTaskInstancePriorityQueue();
/**
* wait to retry taskInstance map, taskCode as key, taskInstance as value
@@ -249,7 +250,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
this.taskInstanceDao = taskInstanceDao;
this.defaultTaskExecuteRunnableFactory =
defaultTaskExecuteRunnableFactory;
this.listenerEventAlertManager = listenerEventAlertManager;
- TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
+
TaskMetrics.registerTaskPrepared(standByTaskInstancePriorityQueue::size);
}
/**
@@ -1430,7 +1431,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
// if previous node success , post node submit
for (TaskInstance task : taskInstances) {
- if (readyToSubmitTaskQueue.contains(task)) {
+ if (standByTaskInstancePriorityQueue.contains(task)) {
log.warn("Task is already at submit queue, taskInstanceName:
{}", task.getName());
continue;
}
@@ -1665,7 +1666,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
return true;
}
if (workflowInstance.getFailureStrategy() ==
FailureStrategy.CONTINUE) {
- return readyToSubmitTaskQueue.size() == 0 &&
taskExecuteRunnableMap.size() == 0
+ return standByTaskInstancePriorityQueue.size() == 0 &&
taskExecuteRunnableMap.size() == 0
&& waitToRetryTaskInstanceMap.size() == 0;
}
}
@@ -1688,7 +1689,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
List<TaskInstance> pauseList =
getCompleteTaskByState(TaskExecutionStatus.PAUSE);
if (CollectionUtils.isNotEmpty(pauseList) ||
workflowInstance.isBlocked() || !isComplementEnd()
- || readyToSubmitTaskQueue.size() > 0) {
+ || standByTaskInstancePriorityQueue.size() > 0) {
return WorkflowExecutionStatus.PAUSE;
} else {
return WorkflowExecutionStatus.SUCCESS;
@@ -1711,8 +1712,8 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
}
}
}
- if (readyToSubmitTaskQueue.size() > 0) {
- for (Iterator<TaskInstance> iter =
readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
+ if (standByTaskInstancePriorityQueue.size() > 0) {
+ for (Iterator<TaskInstance> iter =
standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) {
iter.next().setState(TaskExecutionStatus.PAUSE);
}
}
@@ -1773,7 +1774,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
// success
if (state == WorkflowExecutionStatus.RUNNING_EXECUTION) {
List<TaskInstance> killTasks =
getCompleteTaskByState(TaskExecutionStatus.KILL);
- if (readyToSubmitTaskQueue.size() > 0 ||
waitToRetryTaskInstanceMap.size() > 0) {
+ if (standByTaskInstancePriorityQueue.size() > 0 ||
waitToRetryTaskInstanceMap.size() > 0) {
// tasks currently pending submission, no retries, indicating
that depend is waiting to complete
return WorkflowExecutionStatus.RUNNING_EXECUTION;
} else if (CollectionUtils.isNotEmpty(killTasks)) {
@@ -1878,7 +1879,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
* @param taskInstance task instance
*/
public void addTaskToStandByList(TaskInstance taskInstance) {
- if (readyToSubmitTaskQueue.contains(taskInstance)) {
+ if (standByTaskInstancePriorityQueue.contains(taskInstance)) {
log.warn("Task already exists in ready submit queue, no need to
add again, task code:{}",
taskInstance.getTaskCode());
return;
@@ -1888,7 +1889,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
taskInstance.getId(),
taskInstance.getTaskCode());
TaskMetrics.incTaskInstanceByState("submit");
- readyToSubmitTaskQueue.put(taskInstance);
+ standByTaskInstancePriorityQueue.put(taskInstance);
}
/**
@@ -1897,7 +1898,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
* @param taskInstance task instance
*/
private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
- return readyToSubmitTaskQueue.remove(taskInstance);
+ return standByTaskInstancePriorityQueue.remove(taskInstance);
}
/**
@@ -1906,7 +1907,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
* @return Boolean whether has retry task in standby
*/
private boolean hasRetryTaskInStandBy() {
- for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator();
iter.hasNext();) {
+ for (Iterator<TaskInstance> iter =
standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) {
if (iter.next().getState().isFailure()) {
return true;
}
@@ -1923,8 +1924,8 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
workflowInstance.getId(),
taskExecuteRunnableMap.size());
- if (readyToSubmitTaskQueue.size() > 0) {
- readyToSubmitTaskQueue.clear();
+ if (standByTaskInstancePriorityQueue.size() > 0) {
+ standByTaskInstancePriorityQueue.clear();
}
for (long taskCode : taskExecuteRunnableMap.keySet()) {
@@ -1965,7 +1966,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
public void submitStandByTask() throws StateEventHandleException {
ProcessInstance workflowInstance =
workflowExecuteContext.getWorkflowInstance();
TaskInstance task;
- while ((task = readyToSubmitTaskQueue.peek()) != null) {
+ while ((task = standByTaskInstancePriorityQueue.peek()) != null) {
// stop tasks which is retrying if forced success happens
if (task.getId() != null && task.taskCanRetry()) {
TaskInstance retryTask =
taskInstanceDao.queryById(task.getId());
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java
similarity index 79%
rename from
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
rename to
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java
index 5bd89ffad5..c11c4fe5a9 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.queue;
-import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import
org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
@@ -35,7 +34,7 @@ import com.google.common.base.Preconditions;
* Task instances priority queue implementation
* All the task instances are in the same process instance.
*/
-public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInstance> {
+public class StandByTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInstance> {
/**
* queue size
@@ -45,7 +44,8 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
/**
* queue
*/
- private final PriorityQueue<TaskInstance> queue = new
PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+ private final PriorityQueue<TaskInstance> queue =
+ new PriorityQueue<>(QUEUE_MAX_SIZE, new
TaskInstancePriorityComparator());
private final Set<String> taskInstanceIdentifySet =
Collections.synchronizedSet(new HashSet<>());
/**
@@ -163,24 +163,25 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
}
/**
- * TaskInfoComparator
+ * This comparator is used to sort task instances in the standby queue.
+ * If the TaskInstance is in the same taskGroup, then we will sort the
TaskInstance by {@link TaskInstance#getTaskGroupPriority()} in the taskGroup.
+ * Otherwise, we will sort the TaskInstance by {@link
TaskInstance#getTaskInstancePriority()} in the workflow.
*/
- private static class TaskInfoComparator implements
Comparator<TaskInstance> {
-
- /**
- * compare o1 o2
- *
- * @param o1 o1
- * @param o2 o2
- * @return compare result
- */
+ private static class TaskInstancePriorityComparator implements
Comparator<TaskInstance> {
+
@Override
public int compare(TaskInstance o1, TaskInstance o2) {
- if
(o1.getTaskInstancePriority().equals(o2.getTaskInstancePriority())) {
- // larger number, higher priority
- return Constants.OPPOSITE_VALUE *
Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
+ int taskPriorityInTaskGroup = -1 *
Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
+ int taskInstancePriorityInWorkflow =
+ Long.compare(o1.getTaskInstancePriority().getCode(),
o2.getTaskInstancePriority().getCode());
+
+ if (o1.getTaskGroupId() == o2.getTaskGroupId()) {
+ // If at the same taskGroup
+ if (taskPriorityInTaskGroup != 0) {
+ return taskPriorityInTaskGroup;
+ }
}
- return
o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority());
+ return taskInstancePriorityInWorkflow;
}
}
}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java
similarity index 84%
rename from
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
rename to
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java
index a430ab3c95..80a2513929 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java
@@ -26,11 +26,11 @@ import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-public class PeerTaskInstancePriorityQueueTest {
+public class StandByTaskInstancePriorityQueueTest {
@Test
public void put() throws TaskPriorityQueueException {
- PeerTaskInstancePriorityQueue queue = new
PeerTaskInstancePriorityQueue();
+ StandByTaskInstancePriorityQueue queue = new
StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high",
Priority.HIGH, 1);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium",
Priority.MEDIUM, 1);
queue.put(taskInstanceHigPriority);
@@ -42,7 +42,7 @@ public class PeerTaskInstancePriorityQueueTest {
@Test
public void take() throws Exception {
- PeerTaskInstancePriorityQueue queue =
getPeerTaskInstancePriorityQueue();
+ StandByTaskInstancePriorityQueue queue =
getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size();
queue.take();
Assertions.assertTrue(queue.size() < peekBeforeLength);
@@ -50,7 +50,7 @@ public class PeerTaskInstancePriorityQueueTest {
@Test
public void poll() throws Exception {
- PeerTaskInstancePriorityQueue queue =
getPeerTaskInstancePriorityQueue();
+ StandByTaskInstancePriorityQueue queue =
getPeerTaskInstancePriorityQueue();
Assertions.assertThrows(TaskPriorityQueueException.class, () -> {
queue.poll(1000, TimeUnit.MILLISECONDS);
});
@@ -58,14 +58,15 @@ public class PeerTaskInstancePriorityQueueTest {
@Test
public void peek() throws Exception {
- PeerTaskInstancePriorityQueue queue =
getPeerTaskInstancePriorityQueue();
+ StandByTaskInstancePriorityQueue queue =
getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size();
Assertions.assertEquals(peekBeforeLength, queue.size());
}
@Test
public void peekTaskGroupPriority() throws Exception {
- PeerTaskInstancePriorityQueue queue = new
PeerTaskInstancePriorityQueue();
+ StandByTaskInstancePriorityQueue queue = new
StandByTaskInstancePriorityQueue();
+
TaskInstance taskInstanceHigPriority = createTaskInstance("high",
Priority.HIGH, 2);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium",
Priority.HIGH, 1);
queue.put(taskInstanceMediumPriority);
@@ -80,7 +81,7 @@ public class PeerTaskInstancePriorityQueueTest {
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
- Assertions.assertEquals(taskInstance.getName(), "medium");
+ Assertions.assertEquals("medium", taskInstance.getName());
taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
taskInstanceMediumPriority = createTaskInstance("medium",
Priority.MEDIUM, 2);
@@ -88,7 +89,7 @@ public class PeerTaskInstancePriorityQueueTest {
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
- Assertions.assertEquals(taskInstance.getName(), "high");
+ Assertions.assertEquals("medium", taskInstance.getName());
taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
taskInstanceMediumPriority = createTaskInstance("medium",
Priority.MEDIUM, 1);
@@ -96,7 +97,7 @@ public class PeerTaskInstancePriorityQueueTest {
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
- Assertions.assertEquals(taskInstance.getName(), "high");
+ Assertions.assertEquals("high", taskInstance.getName());
}
@@ -107,7 +108,7 @@ public class PeerTaskInstancePriorityQueueTest {
@Test
public void contains() throws Exception {
- PeerTaskInstancePriorityQueue queue = new
PeerTaskInstancePriorityQueue();
+ StandByTaskInstancePriorityQueue queue = new
StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium",
Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
Assertions.assertTrue(queue.contains(taskInstanceMediumPriority));
@@ -117,8 +118,8 @@ public class PeerTaskInstancePriorityQueueTest {
}
@Test
- public void remove() throws Exception {
- PeerTaskInstancePriorityQueue queue = new
PeerTaskInstancePriorityQueue();
+ public void remove() {
+ StandByTaskInstancePriorityQueue queue = new
StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium",
Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
int peekBeforeLength = queue.size();
@@ -133,8 +134,8 @@ public class PeerTaskInstancePriorityQueueTest {
* @return queue
* @throws Exception
*/
- private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue()
throws Exception {
- PeerTaskInstancePriorityQueue queue = new
PeerTaskInstancePriorityQueue();
+ private StandByTaskInstancePriorityQueue
getPeerTaskInstancePriorityQueue() throws Exception {
+ StandByTaskInstancePriorityQueue queue = new
StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high",
Priority.HIGH, 1);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium",
Priority.MEDIUM, 1);
taskInstanceHigPriority.setTaskGroupPriority(3);