This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3278995372d1ea27b6fd86806e9a860a644694c7 Author: Zhu Zhu <[email protected]> AuthorDate: Fri Jul 15 16:06:13 2022 +0800 [FLINK-28612][runtime] SpeculativeScheduler cancels pending slot allocation after canceling pending executions This closes #20312. --- .../adaptivebatch/SpeculativeScheduler.java | 30 ++++++++++++++++------ .../adaptivebatch/SpeculativeSchedulerTest.java | 27 ++++++++++++++++++- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java index 6d9302c1226..b1482caec55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java @@ -178,15 +178,29 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler private CompletableFuture<?> cancelPendingExecutions( final ExecutionVertexID executionVertexId) { - // cancel all the related pending requests to avoid that slots returned by the canceled - // vertices are used to fulfill these pending requests - // do not cancel the FINISHED execution - cancelAllPendingSlotRequestsForVertex(executionVertexId); - return FutureUtils.combineAll( + final List<Execution> pendingExecutions = getExecutionVertex(executionVertexId).getCurrentExecutions().stream() - .filter(e -> e.getState() != ExecutionState.FINISHED) - .map(this::cancelExecution) - .collect(Collectors.toList())); + .filter( + e -> + !e.getState().isTerminal() + && e.getState() != ExecutionState.CANCELING) + .collect(Collectors.toList()); + if (pendingExecutions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + log.info( + "Canceling {} un-finished executions of {} because one of its executions has finished.", + pendingExecutions.size(), + executionVertexId); + + final CompletableFuture<?> future = + FutureUtils.combineAll( + pendingExecutions.stream() + .map(this::cancelExecution) + .collect(Collectors.toList())); + cancelAllPendingSlotRequestsForVertex(executionVertexId); + return future; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java index ef78ad085f7..29d73161153 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java @@ -45,6 +45,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.scheduler.DefaultExecutionOperations; import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.TestExecutionOperationsDecorator; +import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocator; +import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; @@ -90,6 +92,8 @@ class SpeculativeSchedulerTest { private TestExecutionOperationsDecorator testExecutionOperations; private TestBlocklistOperations testBlocklistOperations; private TestRestartBackoffTimeStrategy restartStrategy; + private TestExecutionSlotAllocatorFactory testExecutionSlotAllocatorFactory; + private TestExecutionSlotAllocator testExecutionSlotAllocator; @BeforeEach void setUp() { @@ -100,6 +104,9 @@ class SpeculativeSchedulerTest { new TestExecutionOperationsDecorator(new DefaultExecutionOperations()); testBlocklistOperations = new TestBlocklistOperations(); restartStrategy = new TestRestartBackoffTimeStrategy(true, 0); + testExecutionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory(); + testExecutionSlotAllocator = + testExecutionSlotAllocatorFactory.getTestExecutionSlotAllocator(); } @AfterEach @@ -213,7 +220,7 @@ class SpeculativeSchedulerTest { } @Test - void testCancelOtherCurrentExecutionsWhenAnyExecutionFinished() { + void testCancelOtherDeployedCurrentExecutionsWhenAnyExecutionFinished() { final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -226,6 +233,23 @@ class SpeculativeSchedulerTest { assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELING); } + @Test + void testCancelOtherScheduledCurrentExecutionsWhenAnyExecutionFinished() { + testExecutionSlotAllocator.disableAutoCompletePendingRequests(); + + final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); + final Execution attempt1 = ev.getCurrentExecutionAttempt(); + + testExecutionSlotAllocator.completePendingRequest(attempt1.getAttemptId()); + notifySlowTask(scheduler, attempt1); + final Execution attempt2 = getExecution(ev, 1); + scheduler.updateTaskExecutionState( + new TaskExecutionState(attempt1.getAttemptId(), ExecutionState.FINISHED)); + + assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELED); + } + @Test void testExceptionHistoryIfPartitionExceptionHappened() { final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); @@ -408,6 +432,7 @@ class SpeculativeSchedulerTest { .setFutureExecutor(futureExecutor) .setDelayExecutor(taskRestartExecutor) .setRestartBackoffTimeStrategy(restartStrategy) + .setExecutionSlotAllocatorFactory(testExecutionSlotAllocatorFactory) .setJobMasterConfiguration(configuration); }
