This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new de16f34  [FLINK-23806][runtime] Avoid StackOverflowException when a 
large scale job failed to acquire enough slots in time
de16f34 is described below

commit de16f34193799e7f3aade15b9bc57549f8010621
Author: Zhu Zhu <[email protected]>
AuthorDate: Mon Aug 16 17:22:57 2021 +0800

    [FLINK-23806][runtime] Avoid StackOverflowException when a large scale job 
failed to acquire enough slots in time
---
 .../flink/runtime/scheduler/DefaultScheduler.java  |  5 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    | 67 ++++++++++++++++++++++
 .../scheduler/TestExecutionSlotAllocator.java      | 19 ++++++
 3 files changed, 90 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 968ff16..50f7b87 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -312,6 +312,10 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
     }
 
     private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> 
verticesToRestart) {
+        // clean up all the related pending requests to avoid that immediately 
returned slot
+        // is used to fulfill the pending requests of these tasks
+        verticesToRestart.stream().forEach(executionSlotAllocator::cancel);
+
         final List<CompletableFuture<?>> cancelFutures =
                 verticesToRestart.stream()
                         .map(this::cancelExecutionVertex)
@@ -325,7 +329,6 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
         notifyCoordinatorOfCancellation(vertex);
 
-        executionSlotAllocator.cancel(executionVertexId);
         return executionVertexOperations.cancel(vertex);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 3dfdac3..f9c7995 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -87,7 +87,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -1035,6 +1037,56 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     @Test
+    public void 
pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSlots()
+            throws Exception {
+        final int parallelism = 10;
+        final JobGraph jobGraph = sourceSinkJobGraph(parallelism);
+        testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+        
testExecutionSlotAllocator.enableCompletePendingRequestsWithReturnedSlots();
+
+        final DefaultScheduler scheduler =
+                createScheduler(
+                        jobGraph,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        new PipelinedRegionSchedulingStrategy.Factory(),
+                        new RestartAllFailoverStrategy.Factory());
+        scheduler.startScheduling();
+
+        final ExecutionVertex ev1 =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0);
+
+        final Set<CompletableFuture<LogicalSlot>> pendingLogicalSlotFutures =
+                
testExecutionSlotAllocator.getPendingRequests().values().stream()
+                        
.map(SlotExecutionVertexAssignment::getLogicalSlotFuture)
+                        .collect(Collectors.toSet());
+        assertThat(pendingLogicalSlotFutures, hasSize(parallelism * 2));
+
+        testExecutionSlotAllocator.completePendingRequest(ev1.getID());
+        assertThat(
+                
pendingLogicalSlotFutures.stream().filter(CompletableFuture::isDone).count(),
+                is(1L));
+
+        final String exceptionMessage = "expected exception";
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        ev1.getCurrentExecutionAttempt().getAttemptId(),
+                        ExecutionState.FAILED,
+                        new RuntimeException(exceptionMessage)));
+
+        assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
+
+        // the failed task will return its slot before triggering failover. 
And the slot
+        // will be returned and re-assigned to another task which is waiting 
for a slot.
+        // failover will be triggered after that and the re-assigned slot will 
be returned
+        // once the attached task is canceled, but the slot will not be 
assigned to other
+        // tasks which are identified to be restarted soon.
+        assertThat(testExecutionSlotAllocator.getReturnedSlots(), hasSize(2));
+        assertThat(
+                
pendingLogicalSlotFutures.stream().filter(CompletableFuture::isCancelled).count(),
+                is(parallelism * 2L - 2L));
+    }
+
+    @Test
     public void testExceptionHistoryWithGlobalFailOver() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
@@ -1382,6 +1434,21 @@ public class DefaultSchedulerTest extends TestLogger {
         return JobGraphTestUtils.streamingJobGraph(source, sink);
     }
 
+    private static JobGraph sourceSinkJobGraph(final int parallelism) {
+        final JobVertex source = new JobVertex("source");
+        source.setParallelism(parallelism);
+        source.setInvokableClass(NoOpInvokable.class);
+
+        final JobVertex sink = new JobVertex("sink");
+        sink.setParallelism(parallelism);
+        sink.setInvokableClass(NoOpInvokable.class);
+
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+
+        return JobGraphTestUtils.streamingJobGraph(source, sink);
+    }
+
     private static JobVertex getOnlyJobVertex(final JobGraph jobGraph) {
         final List<JobVertex> sortedVertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
         Preconditions.checkState(sortedVertices.size() == 1);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
index 59fea52..095903e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
@@ -46,6 +46,8 @@ public class TestExecutionSlotAllocator implements 
ExecutionSlotAllocator, SlotO
 
     private boolean autoCompletePendingRequests = true;
 
+    private boolean completePendingRequestsWithReturnedSlots = false;
+
     private final List<LogicalSlot> returnedSlots = new ArrayList<>();
 
     public TestExecutionSlotAllocator() {
@@ -133,6 +135,10 @@ public class TestExecutionSlotAllocator implements 
ExecutionSlotAllocator, SlotO
         autoCompletePendingRequests = false;
     }
 
+    public void enableCompletePendingRequestsWithReturnedSlots() {
+        completePendingRequestsWithReturnedSlots = true;
+    }
+
     @Override
     public void cancel(final ExecutionVertexID executionVertexId) {
         final SlotExecutionVertexAssignment slotVertexAssignment =
@@ -145,6 +151,19 @@ public class TestExecutionSlotAllocator implements 
ExecutionSlotAllocator, SlotO
     @Override
     public void returnLogicalSlot(final LogicalSlot logicalSlot) {
         returnedSlots.add(logicalSlot);
+
+        if (completePendingRequestsWithReturnedSlots) {
+            if (pendingRequests.size() > 0) {
+                // logical slots are not re-usable, creating a new one instead.
+                final LogicalSlot slot =
+                        
logicalSlotBuilder.setSlotOwner(this).createTestingLogicalSlot();
+
+                final SlotExecutionVertexAssignment slotVertexAssignment =
+                        
pendingRequests.remove(pendingRequests.keySet().stream().findAny().get());
+
+                slotVertexAssignment.getLogicalSlotFuture().complete(slot);
+            }
+        }
     }
 
     public List<LogicalSlot> getReturnedSlots() {

Reply via email to