This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 5e83f3e [FLINK-23806][runtime] Avoid StackOverflowException when a
large scale job failed to acquire enough slots in time
5e83f3e is described below
commit 5e83f3e6f3d9bef893a28e68b6ed2534589f1e30
Author: Zhu Zhu <[email protected]>
AuthorDate: Mon Aug 16 17:22:57 2021 +0800
[FLINK-23806][runtime] Avoid StackOverflowException when a large scale job
failed to acquire enough slots in time
---
.../flink/runtime/scheduler/DefaultScheduler.java | 5 +-
.../runtime/scheduler/DefaultSchedulerTest.java | 90 ++++++++++++++++++++++
.../scheduler/TestExecutionSlotAllocator.java | 24 ++++++
3 files changed, 118 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index c9244db..9d24949 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -322,6 +322,10 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
}
private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID>
verticesToRestart) {
+ // clean up all the related pending requests to avoid that immediately
returned slot
+ // is used to fulfill the pending requests of these tasks
+ verticesToRestart.stream().forEach(executionSlotAllocator::cancel);
+
final List<CompletableFuture<?>> cancelFutures =
verticesToRestart.stream()
.map(this::cancelExecutionVertex)
@@ -335,7 +339,6 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
notifyCoordinatorOfCancellation(vertex);
- executionSlotAllocator.cancel(executionVertexId);
return executionVertexOperations.cancel(vertex);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 7e7c32d..5c4a2af 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -34,6 +34,9 @@ import
org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
import
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -45,10 +48,12 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import
org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
@@ -72,6 +77,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -898,6 +905,57 @@ public class DefaultSchedulerTest extends TestLogger {
assertThat(sourceVertex.getLocationConstraint().getSlotRequestId(),
is(nullValue()));
}
+ @Test
+ public void
pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSlots()
+ throws Exception {
+ final int parallelism = 10;
+ final JobGraph jobGraph = sourceSinkJobGraph(parallelism);
+ testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
testExecutionSlotAllocator.enableCompletePendingRequestsWithReturnedSlots();
+
+ final DefaultScheduler scheduler =
+ createScheduler(
+ jobGraph,
+ new PipelinedRegionSchedulingStrategy.Factory(),
+ new RestartAllFailoverStrategy.Factory());
+
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ scheduler.startScheduling();
+
+ final ExecutionVertex ev1 =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0);
+
+ final Set<CompletableFuture<LogicalSlot>> pendingLogicalSlotFutures =
+
testExecutionSlotAllocator.getPendingRequests().values().stream()
+
.map(SlotExecutionVertexAssignment::getLogicalSlotFuture)
+ .collect(Collectors.toSet());
+ assertThat(pendingLogicalSlotFutures, hasSize(parallelism * 2));
+
+ testExecutionSlotAllocator.completePendingRequest(ev1.getID());
+ assertThat(
+
pendingLogicalSlotFutures.stream().filter(CompletableFuture::isDone).count(),
+ is(1L));
+
+ final String exceptionMessage = "expected exception";
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ ev1.getCurrentExecutionAttempt().getAttemptId(),
+ ExecutionState.FAILED,
+ new RuntimeException(exceptionMessage)));
+
+ assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(),
hasSize(0));
+
+ // the failed task will return its slot before triggering failover.
And the slot
+ // will be returned and re-assigned to another task which is waiting
for a slot.
+ // failover will be triggered after that and the re-assigned slot will
be returned
+ // once the attached task is canceled, but the slot will not be
assigned to other
+ // tasks which are identified to be restarted soon.
+ assertThat(testExecutionSlotAllocator.getReturnedSlots(), hasSize(2));
+ assertThat(
+
pendingLogicalSlotFutures.stream().filter(CompletableFuture::isCancelled).count(),
+ is(parallelism * 2L - 2L));
+ }
+
private static JobVertex createVertexWithAllInputConstraints(String name,
int parallelism) {
final JobVertex v = new JobVertex(name);
v.setParallelism(parallelism);
@@ -951,6 +1009,26 @@ public class DefaultSchedulerTest extends TestLogger {
return jobGraph;
}
+ private static JobGraph sourceSinkJobGraph(final int parallelism) {
+ final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Testjob");
+ jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+ final JobVertex source = new JobVertex("source");
+ source.setParallelism(parallelism);
+ source.setInvokableClass(NoOpInvokable.class);
+ jobGraph.addVertex(source);
+
+ final JobVertex sink = new JobVertex("sink");
+ sink.setParallelism(parallelism);
+ sink.setInvokableClass(NoOpInvokable.class);
+ jobGraph.addVertex(sink);
+
+ sink.connectNewDataSetAsInput(
+ source, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED);
+
+ return jobGraph;
+ }
+
private static JobVertex getOnlyJobVertex(final JobGraph jobGraph) {
final List<JobVertex> sortedVertices =
jobGraph.getVerticesSortedTopologicallyFromSources();
Preconditions.checkState(sortedVertices.size() == 1);
@@ -975,6 +1053,17 @@ public class DefaultSchedulerTest extends TestLogger {
private DefaultScheduler createScheduler(
final JobGraph jobGraph, final SchedulingStrategyFactory
schedulingStrategyFactory)
throws Exception {
+ return createScheduler(
+ jobGraph,
+ schedulingStrategyFactory,
+ new RestartPipelinedRegionFailoverStrategy.Factory());
+ }
+
+ private DefaultScheduler createScheduler(
+ final JobGraph jobGraph,
+ final SchedulingStrategyFactory schedulingStrategyFactory,
+ final FailoverStrategy.Factory failoverStrategyFactory)
+ throws Exception {
return SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
.setLogger(log)
@@ -987,6 +1076,7 @@ public class DefaultSchedulerTest extends TestLogger {
.setExecutionVertexOperations(testExecutionVertexOperations)
.setExecutionVertexVersioner(executionVertexVersioner)
.setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
+ .setFailoverStrategyFactory(failoverStrategyFactory)
.build();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
index ebbefea..84848ef 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
@@ -27,6 +27,7 @@ import
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,8 @@ public class TestExecutionSlotAllocator implements
ExecutionSlotAllocator, SlotO
private boolean autoCompletePendingRequests = true;
+ private boolean completePendingRequestsWithReturnedSlots = false;
+
private final List<LogicalSlot> returnedSlots = new ArrayList<>();
public TestExecutionSlotAllocator() {}
@@ -130,6 +133,10 @@ public class TestExecutionSlotAllocator implements
ExecutionSlotAllocator, SlotO
autoCompletePendingRequests = false;
}
+ public void enableCompletePendingRequestsWithReturnedSlots() {
+ completePendingRequestsWithReturnedSlots = true;
+ }
+
@Override
public void cancel(final ExecutionVertexID executionVertexId) {
final SlotExecutionVertexAssignment slotVertexAssignment =
@@ -142,6 +149,23 @@ public class TestExecutionSlotAllocator implements
ExecutionSlotAllocator, SlotO
@Override
public void returnLogicalSlot(final LogicalSlot logicalSlot) {
returnedSlots.add(logicalSlot);
+
+ if (completePendingRequestsWithReturnedSlots) {
+ if (pendingRequests.size() > 0) {
+ // logical slots are not re-usable, creating a new one instead.
+ final LogicalSlot slot =
+
logicalSlotBuilder.setSlotOwner(this).createTestingLogicalSlot();
+
+ final SlotExecutionVertexAssignment slotVertexAssignment =
+
pendingRequests.remove(pendingRequests.keySet().stream().findAny().get());
+
+ slotVertexAssignment.getLogicalSlotFuture().complete(slot);
+ }
+ }
+ }
+
+ public Map<ExecutionVertexID, SlotExecutionVertexAssignment>
getPendingRequests() {
+ return Collections.unmodifiableMap(pendingRequests);
}
public List<LogicalSlot> getReturnedSlots() {