This is an automated email from the ASF dual-hosted git repository. wenjun pushed a commit to branch 3.0.0-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 9a590546552e198394e98865c143d639d59ea253 Author: Wenjun Ruan <[email protected]> AuthorDate: Fri Jun 17 16:14:56 2022 +0800 Fix PeerTaskInstancePriorityQueue cannot contains method use taskInstanceId to check (#10491) (cherry picked from commit 0bdfa0cff958b3ee089d732590e735404bdc81dc) --- .../master/runner/WorkflowExecuteRunnable.java | 2 +- .../queue/PeerTaskInstancePriorityQueue.java | 32 +++++++++++++++------- .../queue/PeerTaskInstancePriorityQueueTest.java | 6 ++++ 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 2b11498a32..07e0f14c11 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -1088,7 +1088,7 @@ public class WorkflowExecuteRunnable implements Runnable { } /** - * encapsulation task + * encapsulation task, this method will only create a new task instance, the return task instance will not contain id. * * @param processInstance process instance * @param taskNode taskNode diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 2e939ee332..cc7f39e402 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -29,6 +29,8 @@ import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; + /** * Task instances priority queue implementation * All the task instances are in the same process instance. @@ -43,7 +45,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst * queue */ private final PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); - private final Set<Integer> taskInstanceIdSet = Collections.synchronizedSet(new HashSet<>()); + private final Set<String> taskInstanceIdentifySet = Collections.synchronizedSet(new HashSet<>()); /** * put task instance to priority queue @@ -53,8 +55,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst */ @Override public void put(TaskInstance taskInstance) throws TaskPriorityQueueException { + Preconditions.checkNotNull(taskInstance); queue.add(taskInstance); - taskInstanceIdSet.add(taskInstance.getId()); + taskInstanceIdentifySet.add(getTaskInstanceIdentify(taskInstance)); } /** @@ -67,7 +70,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst public TaskInstance take() throws TaskPriorityQueueException { TaskInstance taskInstance = queue.poll(); if (taskInstance != null) { - taskInstanceIdSet.remove(taskInstance.getId()); + taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance)); } return taskInstance; } @@ -114,7 +117,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst */ public void clear() { queue.clear(); - taskInstanceIdSet.clear(); + taskInstanceIdentifySet.clear(); } /** @@ -124,11 +127,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst * @return true is contains */ public boolean contains(TaskInstance taskInstance) { - return this.contains(taskInstance.getId()); - } - - public boolean contains(int taskInstanceId) { - return taskInstanceIdSet.contains(taskInstanceId); + Preconditions.checkNotNull(taskInstance); + return taskInstanceIdentifySet.contains(getTaskInstanceIdentify(taskInstance)); } /** @@ -138,6 +138,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst * @return true if remove success */ public boolean remove(TaskInstance taskInstance) { + Preconditions.checkNotNull(taskInstance); + taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance)); return queue.remove(taskInstance); } @@ -150,10 +152,20 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst return queue.iterator(); } + // since the task instance will not contain taskInstanceId until insert into database + // So we use processInstanceId + taskCode + version to identify a taskInstance. + private String getTaskInstanceIdentify(TaskInstance taskInstance) { + return String.join( + String.valueOf(taskInstance.getProcessInstanceId()), + String.valueOf(taskInstance.getTaskCode()), + String.valueOf(taskInstance.getTaskDefinitionVersion()) + , "-"); + } + /** * TaskInfoComparator */ - private class TaskInfoComparator implements Comparator<TaskInstance> { + private static class TaskInfoComparator implements Comparator<TaskInstance> { /** * compare o1 o2 diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index 67e40d1189..0daf8bf66a 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -36,6 +36,8 @@ public class PeerTaskInstancePriorityQueueTest { queue.put(taskInstanceHigPriority); queue.put(taskInstanceMediumPriority); Assert.assertEquals(2, queue.size()); + Assert.assertTrue(queue.contains(taskInstanceHigPriority)); + Assert.assertTrue(queue.contains(taskInstanceMediumPriority)); } @Test @@ -108,6 +110,9 @@ public class PeerTaskInstancePriorityQueueTest { TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); queue.put(taskInstanceMediumPriority); Assert.assertTrue(queue.contains(taskInstanceMediumPriority)); + TaskInstance taskInstance2 = createTaskInstance("medium2", Priority.MEDIUM, 1); + taskInstance2.setProcessInstanceId(2); + Assert.assertFalse(queue.contains(taskInstance2)); } @Test @@ -118,6 +123,7 @@ public class PeerTaskInstancePriorityQueueTest { int peekBeforeLength = queue.size(); queue.remove(taskInstanceMediumPriority); Assert.assertNotEquals(peekBeforeLength, queue.size()); + Assert.assertFalse(queue.contains(taskInstanceMediumPriority)); } /**
