This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0bdfa0cff9 Fix PeerTaskInstancePriorityQueue cannot contains method
use taskInstanceId to check (#10491)
0bdfa0cff9 is described below
commit 0bdfa0cff958b3ee089d732590e735404bdc81dc
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jun 17 16:14:56 2022 +0800
Fix PeerTaskInstancePriorityQueue cannot contains method use taskInstanceId
to check (#10491)
---
.../master/runner/WorkflowExecuteRunnable.java | 2 +-
.../queue/PeerTaskInstancePriorityQueue.java | 32 +++++++++++++++-------
.../queue/PeerTaskInstancePriorityQueueTest.java | 6 ++++
3 files changed, 29 insertions(+), 11 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index ae627c44f5..dfaa1810be 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -1103,7 +1103,7 @@ public class WorkflowExecuteRunnable implements Runnable {
}
/**
- * encapsulation task
+ * encapsulation task, this method will only create a new task instance,
the return task instance will not contain id.
*
* @param processInstance process instance
* @param taskNode taskNode
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index 2e939ee332..cc7f39e402 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -29,6 +29,8 @@ import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Preconditions;
+
/**
* Task instances priority queue implementation
* All the task instances are in the same process instance.
@@ -43,7 +45,7 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
* queue
*/
private final PriorityQueue<TaskInstance> queue = new
PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
- private final Set<Integer> taskInstanceIdSet =
Collections.synchronizedSet(new HashSet<>());
+ private final Set<String> taskInstanceIdentifySet =
Collections.synchronizedSet(new HashSet<>());
/**
* put task instance to priority queue
@@ -53,8 +55,9 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
*/
@Override
public void put(TaskInstance taskInstance) throws
TaskPriorityQueueException {
+ Preconditions.checkNotNull(taskInstance);
queue.add(taskInstance);
- taskInstanceIdSet.add(taskInstance.getId());
+ taskInstanceIdentifySet.add(getTaskInstanceIdentify(taskInstance));
}
/**
@@ -67,7 +70,7 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
public TaskInstance take() throws TaskPriorityQueueException {
TaskInstance taskInstance = queue.poll();
if (taskInstance != null) {
- taskInstanceIdSet.remove(taskInstance.getId());
+
taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance));
}
return taskInstance;
}
@@ -114,7 +117,7 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
*/
public void clear() {
queue.clear();
- taskInstanceIdSet.clear();
+ taskInstanceIdentifySet.clear();
}
/**
@@ -124,11 +127,8 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
* @return true is contains
*/
public boolean contains(TaskInstance taskInstance) {
- return this.contains(taskInstance.getId());
- }
-
- public boolean contains(int taskInstanceId) {
- return taskInstanceIdSet.contains(taskInstanceId);
+ Preconditions.checkNotNull(taskInstance);
+ return
taskInstanceIdentifySet.contains(getTaskInstanceIdentify(taskInstance));
}
/**
@@ -138,6 +138,8 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
* @return true if remove success
*/
public boolean remove(TaskInstance taskInstance) {
+ Preconditions.checkNotNull(taskInstance);
+ taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance));
return queue.remove(taskInstance);
}
@@ -150,10 +152,20 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
return queue.iterator();
}
+ // since the task instance will not contain taskInstanceId until insert
into database
+ // So we use processInstanceId + taskCode + version to identify a
taskInstance.
+ private String getTaskInstanceIdentify(TaskInstance taskInstance) {
+ return String.join(
+ String.valueOf(taskInstance.getProcessInstanceId()),
+ String.valueOf(taskInstance.getTaskCode()),
+ String.valueOf(taskInstance.getTaskDefinitionVersion())
+ , "-");
+ }
+
/**
* TaskInfoComparator
*/
- private class TaskInfoComparator implements Comparator<TaskInstance> {
+ private static class TaskInfoComparator implements
Comparator<TaskInstance> {
/**
* compare o1 o2
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
index 67e40d1189..0daf8bf66a 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
@@ -36,6 +36,8 @@ public class PeerTaskInstancePriorityQueueTest {
queue.put(taskInstanceHigPriority);
queue.put(taskInstanceMediumPriority);
Assert.assertEquals(2, queue.size());
+ Assert.assertTrue(queue.contains(taskInstanceHigPriority));
+ Assert.assertTrue(queue.contains(taskInstanceMediumPriority));
}
@Test
@@ -108,6 +110,9 @@ public class PeerTaskInstancePriorityQueueTest {
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium",
Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
Assert.assertTrue(queue.contains(taskInstanceMediumPriority));
+ TaskInstance taskInstance2 = createTaskInstance("medium2",
Priority.MEDIUM, 1);
+ taskInstance2.setProcessInstanceId(2);
+ Assert.assertFalse(queue.contains(taskInstance2));
}
@Test
@@ -118,6 +123,7 @@ public class PeerTaskInstancePriorityQueueTest {
int peekBeforeLength = queue.size();
queue.remove(taskInstanceMediumPriority);
Assert.assertNotEquals(peekBeforeLength, queue.size());
+ Assert.assertFalse(queue.contains(taskInstanceMediumPriority));
}
/**