This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new de16f34 [FLINK-23806][runtime] Avoid StackOverflowException when a
large scale job failed to acquire enough slots in time
de16f34 is described below
commit de16f34193799e7f3aade15b9bc57549f8010621
Author: Zhu Zhu <[email protected]>
AuthorDate: Mon Aug 16 17:22:57 2021 +0800
[FLINK-23806][runtime] Avoid StackOverflowException when a large scale job
failed to acquire enough slots in time
---
.../flink/runtime/scheduler/DefaultScheduler.java | 5 +-
.../runtime/scheduler/DefaultSchedulerTest.java | 67 ++++++++++++++++++++++
.../scheduler/TestExecutionSlotAllocator.java | 19 ++++++
3 files changed, 90 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 968ff16..50f7b87 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -312,6 +312,10 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
}
private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID>
verticesToRestart) {
+ // clean up all the related pending requests to avoid that immediately
returned slot
+ // is used to fulfill the pending requests of these tasks
+ verticesToRestart.stream().forEach(executionSlotAllocator::cancel);
+
final List<CompletableFuture<?>> cancelFutures =
verticesToRestart.stream()
.map(this::cancelExecutionVertex)
@@ -325,7 +329,6 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
notifyCoordinatorOfCancellation(vertex);
- executionSlotAllocator.cancel(executionVertexId);
return executionVertexOperations.cancel(vertex);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 3dfdac3..f9c7995 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -87,7 +87,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -1035,6 +1037,56 @@ public class DefaultSchedulerTest extends TestLogger {
}
@Test
+ public void
pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSlots()
+ throws Exception {
+ final int parallelism = 10;
+ final JobGraph jobGraph = sourceSinkJobGraph(parallelism);
+ testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
testExecutionSlotAllocator.enableCompletePendingRequestsWithReturnedSlots();
+
+ final DefaultScheduler scheduler =
+ createScheduler(
+ jobGraph,
+
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ new PipelinedRegionSchedulingStrategy.Factory(),
+ new RestartAllFailoverStrategy.Factory());
+ scheduler.startScheduling();
+
+ final ExecutionVertex ev1 =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0);
+
+ final Set<CompletableFuture<LogicalSlot>> pendingLogicalSlotFutures =
+
testExecutionSlotAllocator.getPendingRequests().values().stream()
+
.map(SlotExecutionVertexAssignment::getLogicalSlotFuture)
+ .collect(Collectors.toSet());
+ assertThat(pendingLogicalSlotFutures, hasSize(parallelism * 2));
+
+ testExecutionSlotAllocator.completePendingRequest(ev1.getID());
+ assertThat(
+
pendingLogicalSlotFutures.stream().filter(CompletableFuture::isDone).count(),
+ is(1L));
+
+ final String exceptionMessage = "expected exception";
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ ev1.getCurrentExecutionAttempt().getAttemptId(),
+ ExecutionState.FAILED,
+ new RuntimeException(exceptionMessage)));
+
+ assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(),
hasSize(0));
+
+ // the failed task will return its slot before triggering failover.
And the slot
+ // will be returned and re-assigned to another task which is waiting
for a slot.
+ // failover will be triggered after that and the re-assigned slot will
be returned
+ // once the attached task is canceled, but the slot will not be
assigned to other
+ // tasks which are identified to be restarted soon.
+ assertThat(testExecutionSlotAllocator.getReturnedSlots(), hasSize(2));
+ assertThat(
+
pendingLogicalSlotFutures.stream().filter(CompletableFuture::isCancelled).count(),
+ is(parallelism * 2L - 2L));
+ }
+
+ @Test
public void testExceptionHistoryWithGlobalFailOver() {
final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
@@ -1382,6 +1434,21 @@ public class DefaultSchedulerTest extends TestLogger {
return JobGraphTestUtils.streamingJobGraph(source, sink);
}
+ private static JobGraph sourceSinkJobGraph(final int parallelism) {
+ final JobVertex source = new JobVertex("source");
+ source.setParallelism(parallelism);
+ source.setInvokableClass(NoOpInvokable.class);
+
+ final JobVertex sink = new JobVertex("sink");
+ sink.setParallelism(parallelism);
+ sink.setInvokableClass(NoOpInvokable.class);
+
+ sink.connectNewDataSetAsInput(
+ source, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED);
+
+ return JobGraphTestUtils.streamingJobGraph(source, sink);
+ }
+
private static JobVertex getOnlyJobVertex(final JobGraph jobGraph) {
final List<JobVertex> sortedVertices =
jobGraph.getVerticesSortedTopologicallyFromSources();
Preconditions.checkState(sortedVertices.size() == 1);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
index 59fea52..095903e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
@@ -46,6 +46,8 @@ public class TestExecutionSlotAllocator implements
ExecutionSlotAllocator, SlotO
private boolean autoCompletePendingRequests = true;
+ private boolean completePendingRequestsWithReturnedSlots = false;
+
private final List<LogicalSlot> returnedSlots = new ArrayList<>();
public TestExecutionSlotAllocator() {
@@ -133,6 +135,10 @@ public class TestExecutionSlotAllocator implements
ExecutionSlotAllocator, SlotO
autoCompletePendingRequests = false;
}
+ public void enableCompletePendingRequestsWithReturnedSlots() {
+ completePendingRequestsWithReturnedSlots = true;
+ }
+
@Override
public void cancel(final ExecutionVertexID executionVertexId) {
final SlotExecutionVertexAssignment slotVertexAssignment =
@@ -145,6 +151,19 @@ public class TestExecutionSlotAllocator implements
ExecutionSlotAllocator, SlotO
@Override
public void returnLogicalSlot(final LogicalSlot logicalSlot) {
returnedSlots.add(logicalSlot);
+
+ if (completePendingRequestsWithReturnedSlots) {
+ if (pendingRequests.size() > 0) {
+ // logical slots are not re-usable, creating a new one instead.
+ final LogicalSlot slot =
+
logicalSlotBuilder.setSlotOwner(this).createTestingLogicalSlot();
+
+ final SlotExecutionVertexAssignment slotVertexAssignment =
+
pendingRequests.remove(pendingRequests.keySet().stream().findAny().get());
+
+ slotVertexAssignment.getLogicalSlotFuture().complete(slot);
+ }
+ }
}
public List<LogicalSlot> getReturnedSlots() {