This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 67dafed0e6 Add Driver.failed() call in FragmentInstanceScheduler
(#5593)
67dafed0e6 is described below
commit 67dafed0e6f1ae7f9bff2d01acd39ca659302719
Author: BaiJian <[email protected]>
AuthorDate: Wed Apr 20 17:24:05 2022 +0800
Add Driver.failed() call in FragmentInstanceScheduler (#5593)
---
.../schedule/FragmentInstanceAbortedException.java | 35 ++++++++++++
.../db/mpp/schedule/FragmentInstanceScheduler.java | 13 ++++-
.../mpp/schedule/FragmentInstanceTaskExecutor.java | 1 +
.../schedule/FragmentInstanceTimeoutSentinel.java | 1 +
.../db/mpp/schedule/queue/L1PriorityQueue.java | 37 +++++++-----
.../db/mpp/schedule/queue/L2PriorityQueue.java | 66 ++++++++++++++--------
.../db/mpp/schedule/task/FragmentInstanceTask.java | 10 ++++
.../mpp/schedule/task/FragmentInstanceTaskID.java | 10 ++--
.../db/mpp/schedule/DefaultTaskSchedulerTest.java | 18 ++++++
.../schedule/FragmentInstanceSchedulerTest.java | 20 +++++++
.../FragmentInstanceTimeoutSentinelTest.java | 55 ++++++++++--------
.../db/mpp/schedule/queue/L1PriorityQueueTest.java | 22 ++++++++
.../db/mpp/schedule/queue/L2PriorityQueueTest.java | 27 +++++++++
13 files changed, 249 insertions(+), 66 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
new file mode 100644
index 0000000000..20017340f6
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.schedule;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.Driver;
+
+/** A common exception to pass to {@link Driver#failed(Throwable)} */
+public class FragmentInstanceAbortedException extends Exception {
+
+ public static final String BY_TIMEOUT = "timeout";
+ public static final String BY_FRAGMENT_ABORT_CALLED = "fragment abort
called";
+ public static final String BY_QUERY_CASCADING_ABORTED = "query cascading
aborted";
+ public static final String BY_ALREADY_BEING_CANCELLED = "already being
cancelled";
+
+ public FragmentInstanceAbortedException(FragmentInstanceId id, String
causeMsg) {
+ super(String.format("FragmentInstance %s is aborted by %s", id.toString(),
causeMsg));
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index 875894f47f..4422e81ba8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -78,9 +78,7 @@ public class FragmentInstanceScheduler implements
IFragmentInstanceScheduler, IS
new FragmentInstanceTask());
this.timeoutQueue =
new L1PriorityQueue<>(
- MAX_CAPACITY,
- new FragmentInstanceTask.SchedulePriorityComparator(),
- new FragmentInstanceTask());
+ MAX_CAPACITY, new FragmentInstanceTask.TimeoutComparator(), new
FragmentInstanceTask());
this.queryMap = new ConcurrentHashMap<>();
this.blockedTasks = Collections.synchronizedSet(new HashSet<>());
this.scheduler = new Scheduler();
@@ -154,6 +152,7 @@ public class FragmentInstanceScheduler implements
IFragmentInstanceScheduler, IS
for (FragmentInstanceTask task : queryRelatedTasks) {
task.lock();
try {
+
task.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED);
clearFragmentInstanceTask(task);
} finally {
task.unlock();
@@ -170,6 +169,7 @@ public class FragmentInstanceScheduler implements
IFragmentInstanceScheduler, IS
}
task.lock();
try {
+
task.setAbortCause(FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED);
clearFragmentInstanceTask(task);
} finally {
task.unlock();
@@ -190,6 +190,12 @@ public class FragmentInstanceScheduler implements
IFragmentInstanceScheduler, IS
if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) {
task.setStatus(FragmentInstanceTaskStatus.ABORTED);
}
+ if (task.getAbortCause() != null) {
+ task.getFragmentInstance()
+ .failed(
+ new FragmentInstanceAbortedException(
+ task.getFragmentInstance().getInfo(), task.getAbortCause()));
+ }
if (task.getStatus() == FragmentInstanceTaskStatus.ABORTED) {
blockManager.forceDeregisterFragmentInstance(
new TFragmentInstanceId(
@@ -345,6 +351,7 @@ public class FragmentInstanceScheduler implements
IFragmentInstanceScheduler, IS
}
otherTask.lock();
try {
+
otherTask.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED);
clearFragmentInstanceTask(otherTask);
} finally {
otherTask.unlock();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
index ed704c4ca4..fd19b67ee0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
@@ -59,6 +59,7 @@ public class FragmentInstanceTaskExecutor extends
AbstractExecutor {
// long cost = System.nanoTime() - startTime;
// If the future is cancelled, the task is in an error and should be
thrown.
if (future.isCancelled()) {
+
task.setAbortCause(FragmentInstanceAbortedException.BY_ALREADY_BEING_CANCELLED);
scheduler.toAborted(task);
return;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
index c1327a3db7..e7aaaf4e47 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
@@ -50,6 +50,7 @@ public class FragmentInstanceTimeoutSentinel extends
AbstractExecutor {
// After this time, the task must be timeout.
Thread.sleep(waitTime);
}
+ task.setAbortCause(FragmentInstanceAbortedException.BY_TIMEOUT);
scheduler.toAborted(task);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
index 997bbc9296..ccbf9144e2 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.mpp.schedule.queue;
import java.util.Comparator;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
/**
* An efficient subclass of {@link IndexedBlockingQueue} with 1-level priority
groups.
@@ -36,9 +38,8 @@ import java.util.TreeMap;
*/
public class L1PriorityQueue<E extends IDIndexedAccessible> extends
IndexedBlockingQueue<E> {
- // Here we use a map not a set to act as a queue because we need to get the
element reference
- // after it was removed.
- private final SortedMap<E, E> elements;
+ private final SortedSet<E> sortedElements; // Used for accessing in order
+ private final Map<ID, E> keyedElements; // Used for accessing randomly
/**
* Init the queue with max capacity and specified comparator.
@@ -51,41 +52,51 @@ public class L1PriorityQueue<E extends IDIndexedAccessible>
extends IndexedBlock
*/
public L1PriorityQueue(int maxCapacity, Comparator<E> comparator, E
queryHolder) {
super(maxCapacity, queryHolder);
- this.elements = new TreeMap<>(comparator);
+ this.sortedElements = new TreeSet<>(comparator);
+ this.keyedElements = new HashMap<>();
}
@Override
protected boolean isEmpty() {
- return elements.isEmpty();
+ return keyedElements.isEmpty();
}
@Override
protected E pollFirst() {
- return elements.remove(elements.firstKey());
+ E element = sortedElements.first();
+ sortedElements.remove(element);
+ keyedElements.remove(element.getId());
+ return element;
}
@Override
protected void pushToQueue(E element) {
- elements.put(element, element);
+ keyedElements.put(element.getId(), element);
+ sortedElements.add(element);
}
@Override
protected E remove(E element) {
- return elements.remove(element);
+ E e = keyedElements.remove(element.getId());
+ if (e != null) {
+ sortedElements.remove(e);
+ }
+ return e;
}
@Override
protected boolean contains(E element) {
- return elements.containsKey(element);
+ return keyedElements.containsKey(element.getId());
}
@Override
protected E get(E element) {
- return elements.get(element);
+ return keyedElements.get(element.getId());
}
@Override
protected void clearAllElements() {
- elements.clear();
+ sortedElements.clear();
+ keyedElements.clear();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
index 50cfd1c29d..90d2c45004 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.mpp.schedule.queue;
import java.util.Comparator;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
/**
* An efficient subclass of {@link IndexedBlockingQueue} with 2-level priority
groups. The
@@ -38,10 +40,10 @@ import java.util.TreeMap;
*/
public class L2PriorityQueue<E extends IDIndexedAccessible> extends
IndexedBlockingQueue<E> {
- // Here we use a map not a set to act as a queue because we need to get the
element reference
- // after it was removed.
- private SortedMap<E, E> workingElements;
- private SortedMap<E, E> idleElements;
+ private SortedSet<E> workingSortedElements;
+ private SortedSet<E> idleSortedElements;
+ private Map<ID, E> workingKeyedElements;
+ private Map<ID, E> idleKeyedElements;
/**
* Init the queue with max capacity and specified comparator.
@@ -54,56 +56,74 @@ public class L2PriorityQueue<E extends IDIndexedAccessible>
extends IndexedBlock
*/
public L2PriorityQueue(int maxCapacity, Comparator<E> comparator, E
queryHolder) {
super(maxCapacity, queryHolder);
- this.workingElements = new TreeMap<>(comparator);
- this.idleElements = new TreeMap<>(comparator);
+ this.workingSortedElements = new TreeSet<>(comparator);
+ this.idleSortedElements = new TreeSet<>(comparator);
+ this.workingKeyedElements = new HashMap<>();
+ this.idleKeyedElements = new HashMap<>();
}
@Override
protected boolean isEmpty() {
- return workingElements.isEmpty() && idleElements.isEmpty();
+ return workingKeyedElements.isEmpty() && idleKeyedElements.isEmpty();
}
@Override
protected E pollFirst() {
- if (workingElements.isEmpty()) {
- SortedMap<E, E> tmp = workingElements;
- workingElements = idleElements;
- idleElements = tmp;
+ if (workingKeyedElements.isEmpty()) {
+ // Switch the two queues
+ Map<ID, E> tmp = workingKeyedElements;
+ workingKeyedElements = idleKeyedElements;
+ idleKeyedElements = tmp;
+ SortedSet<E> tmpSet = workingSortedElements;
+ workingSortedElements = idleSortedElements;
+ idleSortedElements = tmpSet;
}
- return workingElements.remove(workingElements.firstKey());
+ E element = workingSortedElements.first();
+ workingSortedElements.remove(element);
+ workingKeyedElements.remove(element.getId());
+ return element;
}
@Override
protected void pushToQueue(E element) {
- idleElements.put(element, element);
+ idleKeyedElements.put(element.getId(), element);
+ idleSortedElements.add(element);
}
@Override
protected E remove(E element) {
- E e = workingElements.remove(element);
- if (e == null) {
- e = idleElements.remove(element);
+ E e = workingKeyedElements.remove(element.getId());
+ if (e != null) {
+ workingSortedElements.remove(e);
+ return e;
+ }
+ e = idleKeyedElements.remove(element.getId());
+ if (e != null) {
+ idleSortedElements.remove(e);
}
return e;
}
@Override
protected boolean contains(E element) {
- return workingElements.containsKey(element) ||
idleElements.containsKey(element);
+ return workingKeyedElements.containsKey(element.getId())
+ || idleKeyedElements.containsKey(element.getId());
}
@Override
protected E get(E element) {
- E e = workingElements.get(element);
+ E e = workingKeyedElements.get(element.getId());
if (e != null) {
return e;
}
- return idleElements.get(element);
+ return idleKeyedElements.get(element.getId());
}
@Override
protected void clearAllElements() {
- workingElements.clear();
- idleElements.clear();
+ workingKeyedElements.clear();
+ workingSortedElements.clear();
+ idleKeyedElements.clear();
+ idleSortedElements.clear();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index e30b1f15bc..abebdaf30d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@ -53,6 +53,8 @@ public class FragmentInstanceTask implements
IDIndexedAccessible {
// Running stats
private long cpuWallNano;
+ private String abortCause;
+
/** Initialize a dummy instance for queryHolder */
public FragmentInstanceTask() {
this(new StubFragmentInstance(), 0L, null);
@@ -139,6 +141,14 @@ public class FragmentInstanceTask implements
IDIndexedAccessible {
return o instanceof FragmentInstanceTask && ((FragmentInstanceTask)
o).getId().equals(id);
}
+ public String getAbortCause() {
+ return abortCause;
+ }
+
+ public void setAbortCause(String abortCause) {
+ this.abortCause = abortCause;
+ }
+
/** a comparator of ddl, the less the ddl is, the low order it has. */
public static class TimeoutComparator implements
Comparator<FragmentInstanceTask> {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
index 6ea8b7d4d2..a0081b177c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
@@ -36,10 +36,12 @@ public class FragmentInstanceTaskID implements ID,
Comparable<FragmentInstanceTa
@Override
public boolean equals(Object o) {
- return o instanceof FragmentInstanceTaskID
- && id.getQueryId().equals(((FragmentInstanceTaskID) o).getQueryId())
- && id.getFragmentId().getId() == ((FragmentInstanceTaskID)
o).getFragmentId().getId()
- && id.getInstanceId().equals(((FragmentInstanceTaskID)
o).getInstanceId());
+ return o instanceof FragmentInstanceTaskID && ((FragmentInstanceTaskID)
o).id.equals(id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
}
public String toString() {
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index e73529aa06..82ca5dfd2c 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -94,6 +94,7 @@ public class DefaultTaskSchedulerTest {
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+ Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
clear();
}
@@ -141,6 +142,7 @@ public class DefaultTaskSchedulerTest {
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+ Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
clear();
}
@@ -193,6 +195,7 @@ public class DefaultTaskSchedulerTest {
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+ Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
clear();
}
@@ -245,6 +248,7 @@ public class DefaultTaskSchedulerTest {
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+ Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
clear();
}
@@ -296,6 +300,7 @@ public class DefaultTaskSchedulerTest {
Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId()));
Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+ Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
clear();
}
@@ -342,6 +347,9 @@ public class DefaultTaskSchedulerTest {
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask1));
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask2));
+
+ Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+ Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any());
clear();
}
FragmentInstanceTaskStatus[] validStates =
@@ -351,6 +359,11 @@ public class DefaultTaskSchedulerTest {
FragmentInstanceTaskStatus.BLOCKED,
};
for (FragmentInstanceTaskStatus status : validStates) {
+ Mockito.reset(mockDriver1);
+ Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+ Mockito.reset(mockDriver2);
+ Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+
FragmentInstanceTask testTask1 = new FragmentInstanceTask(mockDriver1,
100L, status);
FragmentInstanceTask testTask2 =
@@ -377,6 +390,11 @@ public class DefaultTaskSchedulerTest {
Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getId()));
Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getId()));
Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+
+ // The mockDriver1.failed() will be called outside the scheduler
+ Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+ Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
+
clear();
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
index 233908ca44..56435496ab 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
@@ -117,6 +117,8 @@ public class FragmentInstanceSchedulerTest {
Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
// Abort one FragmentInstance
+ Mockito.reset(mockDriver1);
+ Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
manager.abortFragmentInstance(instanceId1);
Mockito.verify(mockDataBlockManager, Mockito.times(1))
.forceDeregisterFragmentInstance(Mockito.any());
@@ -129,9 +131,18 @@ public class FragmentInstanceSchedulerTest {
Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus());
Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+ Mockito.verify(mockDriver1, Mockito.times(1)).failed(Mockito.any());
+ Assert.assertEquals(
+ FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED,
task1.getAbortCause());
// Abort the whole query
Mockito.reset(mockDataBlockManager);
+ Mockito.reset(mockDriver1);
+ Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+ Mockito.reset(mockDriver2);
+ Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+ Mockito.reset(mockDriver3);
+ Mockito.when(mockDriver3.getInfo()).thenReturn(instanceId3);
manager.abortQuery(queryId);
Mockito.verify(mockDataBlockManager, Mockito.times(2))
.forceDeregisterFragmentInstance(Mockito.any());
@@ -144,5 +155,14 @@ public class FragmentInstanceSchedulerTest {
Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task2.getStatus());
Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task3.getStatus());
Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+ Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+ Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
+ Mockito.verify(mockDriver3, Mockito.times(1)).failed(Mockito.any());
+ Mockito.verify(mockDriver4, Mockito.never()).failed(Mockito.any());
+ Assert.assertEquals(
+ FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED,
task2.getAbortCause());
+ Assert.assertEquals(
+ FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED,
task3.getAbortCause());
+ Assert.assertNull(task4.getAbortCause());
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
index 87d1de0870..862f4ca207 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
@@ -69,29 +69,33 @@ public class FragmentInstanceTimeoutSentinelTest {
new FragmentInstanceTask(mockDriver, 100L,
FragmentInstanceTaskStatus.FINISHED);
executor.execute(testTask);
Assert.assertEquals(FragmentInstanceTaskStatus.FINISHED,
testTask.getStatus());
- Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
+ Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+ Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
// ABORTED status test
testTask = new FragmentInstanceTask(mockDriver, 100L,
FragmentInstanceTaskStatus.ABORTED);
executor.execute(testTask);
Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED,
testTask.getStatus());
- Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
+ Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+ Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
// RUNNING status test
testTask = new FragmentInstanceTask(mockDriver, 100L,
FragmentInstanceTaskStatus.RUNNING);
executor.execute(testTask);
Assert.assertEquals(FragmentInstanceTaskStatus.RUNNING,
testTask.getStatus());
- Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
+ Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+ Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
// BLOCKED status test
testTask = new FragmentInstanceTask(mockDriver, 100L,
FragmentInstanceTaskStatus.BLOCKED);
executor.execute(testTask);
Assert.assertEquals(FragmentInstanceTaskStatus.BLOCKED,
testTask.getStatus());
- Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
- Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).blockedToReady(Mockito.any());
+ Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+ Assert.assertNull(testTask.getAbortCause());
+ Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).blockedToReady(Mockito.any());
}
@Test
@@ -127,11 +131,13 @@ public class FragmentInstanceTimeoutSentinelTest {
new FragmentInstanceTask(mockDriver, 100L,
FragmentInstanceTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
+ Assert.assertEquals(
+ FragmentInstanceAbortedException.BY_ALREADY_BEING_CANCELLED,
testTask.getAbortCause());
Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).blockedToReady(Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).blockedToReady(Mockito.any());
}
@Test
@@ -166,11 +172,12 @@ public class FragmentInstanceTimeoutSentinelTest {
new FragmentInstanceTask(mockDriver, 100L,
FragmentInstanceTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
- Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
+ Assert.assertNull(testTask.getAbortCause());
+ Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
Mockito.verify(mockScheduler,
Mockito.times(1)).runningToFinished(Mockito.any(), Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).blockedToReady(Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).blockedToReady(Mockito.any());
}
@Test
@@ -216,10 +223,11 @@ public class FragmentInstanceTimeoutSentinelTest {
new FragmentInstanceTask(mockDriver, 100L,
FragmentInstanceTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
- Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any());
+ Assert.assertNull(testTask.getAbortCause());
+ Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
Mockito.verify(mockScheduler,
Mockito.times(1)).runningToBlocked(Mockito.any(), Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
Mockito.verify(mockScheduler,
Mockito.times(1)).blockedToReady(Mockito.any());
}
@@ -266,10 +274,11 @@ public class FragmentInstanceTimeoutSentinelTest {
new FragmentInstanceTask(mockDriver, 100L,
FragmentInstanceTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
- Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
+ Assert.assertNull(testTask.getAbortCause());
+ Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
Mockito.verify(mockScheduler,
Mockito.times(1)).runningToReady(Mockito.any(), Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
- Mockito.verify(mockScheduler,
Mockito.times(0)).blockedToReady(Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler,
Mockito.never()).blockedToReady(Mockito.any());
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
index b668b91b36..a03fefda0a 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
@@ -132,6 +132,28 @@ public class L1PriorityQueueTest {
}
}
+ @Test
+ public void testRemoveElement() {
+ IndexedBlockingQueue<QueueElement> queue =
+ new L1PriorityQueue<>(
+ 10,
+ (o1, o2) -> {
+ if (o1.equals(o2)) {
+ return 0;
+ }
+ return Integer.compare(o1.getValue(), o2.getValue());
+ },
+ new QueueElement(new QueueElement.QueueElementID(0), 0));
+ QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 5);
+ queue.push(e1);
+ Assert.assertEquals(1, queue.size());
+ QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 10);
+ queue.push(e2);
+ Assert.assertEquals(2, queue.size());
+ Assert.assertEquals(e2, queue.remove(new QueueElement.QueueElementID(2)));
+ Assert.assertEquals(1, queue.size());
+ }
+
@Test
public void testClear() {
IndexedBlockingQueue<QueueElement> queue =
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
index d31a5b4d36..da3a6a1a4c 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
@@ -141,6 +141,33 @@ public class L2PriorityQueueTest {
}
}
+ @Test
+ public void testRemoveElement() {
+ IndexedBlockingQueue<QueueElement> queue =
+ new L2PriorityQueue<>(
+ 10,
+ (o1, o2) -> {
+ if (o1.equals(o2)) {
+ return 0;
+ }
+ int res = Integer.compare(o1.getValue(), o2.getValue());
+ if (res != 0) {
+ return res;
+ }
+ return String.CASE_INSENSITIVE_ORDER.compare(
+ o1.getId().toString(), o2.getId().toString());
+ },
+ new QueueElement(new QueueElement.QueueElementID(0), 0));
+ QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 5);
+ queue.push(e1);
+ Assert.assertEquals(1, queue.size());
+ QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 10);
+ queue.push(e2);
+ Assert.assertEquals(2, queue.size());
+ Assert.assertEquals(e2, queue.remove(new QueueElement.QueueElementID(2)));
+ Assert.assertEquals(1, queue.size());
+ }
+
@Test
public void testClear() {
IndexedBlockingQueue<QueueElement> queue =