This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3278995372d1ea27b6fd86806e9a860a644694c7
Author: Zhu Zhu <[email protected]>
AuthorDate: Fri Jul 15 16:06:13 2022 +0800

    [FLINK-28612][runtime] SpeculativeScheduler cancels pending slot allocation 
after canceling pending executions
    
    This closes #20312.
---
 .../adaptivebatch/SpeculativeScheduler.java        | 30 ++++++++++++++++------
 .../adaptivebatch/SpeculativeSchedulerTest.java    | 27 ++++++++++++++++++-
 2 files changed, 48 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index 6d9302c1226..b1482caec55 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -178,15 +178,29 @@ public class SpeculativeScheduler extends 
AdaptiveBatchScheduler
 
     private CompletableFuture<?> cancelPendingExecutions(
             final ExecutionVertexID executionVertexId) {
-        // cancel all the related pending requests to avoid that slots 
returned by the canceled
-        // vertices are used to fulfill these pending requests
-        // do not cancel the FINISHED execution
-        cancelAllPendingSlotRequestsForVertex(executionVertexId);
-        return FutureUtils.combineAll(
+        final List<Execution> pendingExecutions =
                 
getExecutionVertex(executionVertexId).getCurrentExecutions().stream()
-                        .filter(e -> e.getState() != ExecutionState.FINISHED)
-                        .map(this::cancelExecution)
-                        .collect(Collectors.toList()));
+                        .filter(
+                                e ->
+                                        !e.getState().isTerminal()
+                                                && e.getState() != 
ExecutionState.CANCELING)
+                        .collect(Collectors.toList());
+        if (pendingExecutions.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        log.info(
+                "Canceling {} un-finished executions of {} because one of its 
executions has finished.",
+                pendingExecutions.size(),
+                executionVertexId);
+
+        final CompletableFuture<?> future =
+                FutureUtils.combineAll(
+                        pendingExecutions.stream()
+                                .map(this::cancelExecution)
+                                .collect(Collectors.toList()));
+        cancelAllPendingSlotRequestsForVertex(executionVertexId);
+        return future;
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
index ef78ad085f7..29d73161153 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
@@ -45,6 +45,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.TestExecutionOperationsDecorator;
+import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocator;
+import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -90,6 +92,8 @@ class SpeculativeSchedulerTest {
     private TestExecutionOperationsDecorator testExecutionOperations;
     private TestBlocklistOperations testBlocklistOperations;
     private TestRestartBackoffTimeStrategy restartStrategy;
+    private TestExecutionSlotAllocatorFactory 
testExecutionSlotAllocatorFactory;
+    private TestExecutionSlotAllocator testExecutionSlotAllocator;
 
     @BeforeEach
     void setUp() {
@@ -100,6 +104,9 @@ class SpeculativeSchedulerTest {
                 new TestExecutionOperationsDecorator(new 
DefaultExecutionOperations());
         testBlocklistOperations = new TestBlocklistOperations();
         restartStrategy = new TestRestartBackoffTimeStrategy(true, 0);
+        testExecutionSlotAllocatorFactory = new 
TestExecutionSlotAllocatorFactory();
+        testExecutionSlotAllocator =
+                
testExecutionSlotAllocatorFactory.getTestExecutionSlotAllocator();
     }
 
     @AfterEach
@@ -213,7 +220,7 @@ class SpeculativeSchedulerTest {
     }
 
     @Test
-    void testCancelOtherCurrentExecutionsWhenAnyExecutionFinished() {
+    void testCancelOtherDeployedCurrentExecutionsWhenAnyExecutionFinished() {
         final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
         final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
         final Execution attempt1 = ev.getCurrentExecutionAttempt();
@@ -226,6 +233,23 @@ class SpeculativeSchedulerTest {
         assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELING);
     }
 
+    @Test
+    void testCancelOtherScheduledCurrentExecutionsWhenAnyExecutionFinished() {
+        testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        
testExecutionSlotAllocator.completePendingRequest(attempt1.getAttemptId());
+        notifySlowTask(scheduler, attempt1);
+        final Execution attempt2 = getExecution(ev, 1);
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(attempt1.getAttemptId(), 
ExecutionState.FINISHED));
+
+        assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELED);
+    }
+
     @Test
     void testExceptionHistoryIfPartitionExceptionHappened() {
         final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
@@ -408,6 +432,7 @@ class SpeculativeSchedulerTest {
                 .setFutureExecutor(futureExecutor)
                 .setDelayExecutor(taskRestartExecutor)
                 .setRestartBackoffTimeStrategy(restartStrategy)
+                
.setExecutionSlotAllocatorFactory(testExecutionSlotAllocatorFactory)
                 .setJobMasterConfiguration(configuration);
     }
 

Reply via email to