This is an automated email from the ASF dual-hosted git repository. ericpai pushed a commit to branch feature/mpp-sche-clean in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e61bffb4d24c7c6377250c1cce4579bdc932432c Author: ericpai <[email protected]> AuthorDate: Tue Apr 19 17:25:52 2022 +0800 Add Driver.failed() call in FragmentInstanceScheduler --- .../schedule/FragmentInstanceAbortedException.java | 35 ++++++++++++++ .../db/mpp/schedule/FragmentInstanceScheduler.java | 9 ++++ .../mpp/schedule/FragmentInstanceTaskExecutor.java | 1 + .../schedule/FragmentInstanceTimeoutSentinel.java | 1 + .../db/mpp/schedule/task/FragmentInstanceTask.java | 10 ++++ .../db/mpp/schedule/DefaultTaskSchedulerTest.java | 18 +++++++ .../schedule/FragmentInstanceSchedulerTest.java | 20 ++++++++ .../FragmentInstanceTimeoutSentinelTest.java | 55 +++++++++++++--------- 8 files changed, 126 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java new file mode 100644 index 0000000000..20017340f6 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.schedule; + +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; +import org.apache.iotdb.db.mpp.execution.Driver; + +/** A common exception to pass to {@link Driver#failed(Throwable)} */ +public class FragmentInstanceAbortedException extends Exception { + + public static final String BY_TIMEOUT = "timeout"; + public static final String BY_FRAGMENT_ABORT_CALLED = "fragment abort called"; + public static final String BY_QUERY_CASCADING_ABORTED = "query cascading aborted"; + public static final String BY_ALREADY_BEING_CANCELLED = "already being cancelled"; + + public FragmentInstanceAbortedException(FragmentInstanceId id, String causeMsg) { + super(String.format("FragmentInstance %s is aborted by %s", id.toString(), causeMsg)); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java index c7bdb95285..5c25e7e129 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java @@ -154,6 +154,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS for (FragmentInstanceTask task : queryRelatedTasks) { task.lock(); try { + task.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED); clearFragmentInstanceTask(task); } finally { task.unlock(); @@ -170,6 +171,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS } task.lock(); try { + task.setAbortCause(FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED); clearFragmentInstanceTask(task); } finally { task.unlock(); @@ -190,6 +192,12 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) { task.setStatus(FragmentInstanceTaskStatus.ABORTED); } + if (task.getAbortCause() != null) { + task.getFragmentInstance() + .failed( + new FragmentInstanceAbortedException( + task.getFragmentInstance().getInfo(), task.getAbortCause())); + } if (task.getStatus() == FragmentInstanceTaskStatus.ABORTED) { blockManager.forceDeregisterFragmentInstance( new TFragmentInstanceId( @@ -345,6 +353,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS } otherTask.lock(); try { + otherTask.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED); clearFragmentInstanceTask(otherTask); } finally { otherTask.unlock(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java index ed704c4ca4..fd19b67ee0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java @@ -59,6 +59,7 @@ public class FragmentInstanceTaskExecutor extends AbstractExecutor { // long cost = System.nanoTime() - startTime; // If the future is cancelled, the task is in an error and should be thrown. if (future.isCancelled()) { + task.setAbortCause(FragmentInstanceAbortedException.BY_ALREADY_BEING_CANCELLED); scheduler.toAborted(task); return; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java index c1327a3db7..e7aaaf4e47 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java @@ -50,6 +50,7 @@ public class FragmentInstanceTimeoutSentinel extends AbstractExecutor { // After this time, the task must be timeout. Thread.sleep(waitTime); } + task.setAbortCause(FragmentInstanceAbortedException.BY_TIMEOUT); scheduler.toAborted(task); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java index e30b1f15bc..abebdaf30d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java @@ -53,6 +53,8 @@ public class FragmentInstanceTask implements IDIndexedAccessible { // Running stats private long cpuWallNano; + private String abortCause; + /** Initialize a dummy instance for queryHolder */ public FragmentInstanceTask() { this(new StubFragmentInstance(), 0L, null); @@ -139,6 +141,14 @@ public class FragmentInstanceTask implements IDIndexedAccessible { return o instanceof FragmentInstanceTask && ((FragmentInstanceTask) o).getId().equals(id); } + public String getAbortCause() { + return abortCause; + } + + public void setAbortCause(String abortCause) { + this.abortCause = abortCause; + } + /** a comparator of ddl, the less the ddl is, the low order it has. */ public static class TimeoutComparator implements Comparator<FragmentInstanceTask> { diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java index e73529aa06..82ca5dfd2c 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java @@ -94,6 +94,7 @@ public class DefaultTaskSchedulerTest { Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId())); Assert.assertTrue(manager.getQueryMap().containsKey(queryId)); Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask)); + Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any()); clear(); } @@ -141,6 +142,7 @@ public class DefaultTaskSchedulerTest { Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId())); Assert.assertTrue(manager.getQueryMap().containsKey(queryId)); Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask)); + Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any()); clear(); } @@ -193,6 +195,7 @@ public class DefaultTaskSchedulerTest { Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId())); Assert.assertTrue(manager.getQueryMap().containsKey(queryId)); Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask)); + Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any()); clear(); } @@ -245,6 +248,7 @@ public class DefaultTaskSchedulerTest { Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId())); Assert.assertTrue(manager.getQueryMap().containsKey(queryId)); Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask)); + Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any()); clear(); } @@ -296,6 +300,7 @@ public class DefaultTaskSchedulerTest { Assert.assertNull(manager.getReadyQueue().get(testTask.getId())); Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId())); Assert.assertFalse(manager.getQueryMap().containsKey(queryId)); + Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any()); clear(); } @@ -342,6 +347,9 @@ public class DefaultTaskSchedulerTest { Assert.assertTrue(manager.getQueryMap().containsKey(queryId)); Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask1)); Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask2)); + + Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any()); + Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any()); clear(); } FragmentInstanceTaskStatus[] validStates = @@ -351,6 +359,11 @@ public class DefaultTaskSchedulerTest { FragmentInstanceTaskStatus.BLOCKED, }; for (FragmentInstanceTaskStatus status : validStates) { + Mockito.reset(mockDriver1); + Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1); + Mockito.reset(mockDriver2); + Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2); + FragmentInstanceTask testTask1 = new FragmentInstanceTask(mockDriver1, 100L, status); FragmentInstanceTask testTask2 = @@ -377,6 +390,11 @@ public class DefaultTaskSchedulerTest { Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getId())); Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getId())); Assert.assertFalse(manager.getQueryMap().containsKey(queryId)); + + // The mockDriver1.failed() will be called outside the scheduler + Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any()); + Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any()); + clear(); } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java index 233908ca44..56435496ab 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java @@ -117,6 +117,8 @@ public class FragmentInstanceSchedulerTest { Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus()); // Abort one FragmentInstance + Mockito.reset(mockDriver1); + Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1); manager.abortFragmentInstance(instanceId1); Mockito.verify(mockDataBlockManager, Mockito.times(1)) .forceDeregisterFragmentInstance(Mockito.any()); @@ -129,9 +131,18 @@ public class FragmentInstanceSchedulerTest { Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus()); Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus()); Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus()); + Mockito.verify(mockDriver1, Mockito.times(1)).failed(Mockito.any()); + Assert.assertEquals( + FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED, task1.getAbortCause()); // Abort the whole query Mockito.reset(mockDataBlockManager); + Mockito.reset(mockDriver1); + Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1); + Mockito.reset(mockDriver2); + Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2); + Mockito.reset(mockDriver3); + Mockito.when(mockDriver3.getInfo()).thenReturn(instanceId3); manager.abortQuery(queryId); Mockito.verify(mockDataBlockManager, Mockito.times(2)) .forceDeregisterFragmentInstance(Mockito.any()); @@ -144,5 +155,14 @@ public class FragmentInstanceSchedulerTest { Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task2.getStatus()); Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task3.getStatus()); Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus()); + Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any()); + Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any()); + Mockito.verify(mockDriver3, Mockito.times(1)).failed(Mockito.any()); + Mockito.verify(mockDriver4, Mockito.never()).failed(Mockito.any()); + Assert.assertEquals( + FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED, task2.getAbortCause()); + Assert.assertEquals( + FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED, task3.getAbortCause()); + Assert.assertNull(task4.getAbortCause()); } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java index 87d1de0870..862f4ca207 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java @@ -69,29 +69,33 @@ public class FragmentInstanceTimeoutSentinelTest { new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.FINISHED); executor.execute(testTask); Assert.assertEquals(FragmentInstanceTaskStatus.FINISHED, testTask.getStatus()); - Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any()); + Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any()); + Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any()); // ABORTED status test testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.ABORTED); executor.execute(testTask); Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, testTask.getStatus()); - Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any()); + Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any()); + Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any()); // RUNNING status test testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.RUNNING); executor.execute(testTask); Assert.assertEquals(FragmentInstanceTaskStatus.RUNNING, testTask.getStatus()); - Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any()); + Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any()); + Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any()); // BLOCKED status test testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.BLOCKED); executor.execute(testTask); Assert.assertEquals(FragmentInstanceTaskStatus.BLOCKED, testTask.getStatus()); - Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).blockedToReady(Mockito.any()); + Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any()); + Assert.assertNull(testTask.getAbortCause()); + Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any()); } @Test @@ -127,11 +131,13 @@ public class FragmentInstanceTimeoutSentinelTest { new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY); executor.execute(testTask); Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any()); + Assert.assertEquals( + FragmentInstanceAbortedException.BY_ALREADY_BEING_CANCELLED, testTask.getAbortCause()); Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).blockedToReady(Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any()); } @Test @@ -166,11 +172,12 @@ public class FragmentInstanceTimeoutSentinelTest { new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY); executor.execute(testTask); Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any()); + Assert.assertNull(testTask.getAbortCause()); + Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any()); Mockito.verify(mockScheduler, Mockito.times(1)).runningToFinished(Mockito.any(), Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).blockedToReady(Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any()); } @Test @@ -216,10 +223,11 @@ public class FragmentInstanceTimeoutSentinelTest { new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY); executor.execute(testTask); Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any()); + Assert.assertNull(testTask.getAbortCause()); + Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any()); Mockito.verify(mockScheduler, Mockito.times(1)).runningToBlocked(Mockito.any(), Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any()); Mockito.verify(mockScheduler, Mockito.times(1)).blockedToReady(Mockito.any()); } @@ -266,10 +274,11 @@ public class FragmentInstanceTimeoutSentinelTest { new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY); executor.execute(testTask); Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any()); + Assert.assertNull(testTask.getAbortCause()); + Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any()); Mockito.verify(mockScheduler, Mockito.times(1)).runningToReady(Mockito.any(), Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any()); - Mockito.verify(mockScheduler, Mockito.times(0)).blockedToReady(Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any()); + Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any()); } }
