This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 4054aa0 [FLINK-19806][runtime] Harden DefaultScheduler for concurrent
suspending and failing
4054aa0 is described below
commit 4054aa01ebb70aa7e4bf9b3bf96916f96869b42d
Author: Zhu Zhu <[email protected]>
AuthorDate: Tue Nov 17 12:01:17 2020 +0800
[FLINK-19806][runtime] Harden DefaultScheduler for concurrent suspending
and failing
---
.../runtime/executiongraph/ExecutionGraph.java | 7 +-
.../flink/runtime/scheduler/SchedulerBase.java | 11 +++
.../executiongraph/ExecutionGraphSuspendTest.java | 110 ++++++++++++---------
3 files changed, 76 insertions(+), 52 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index dfd20ab..880a42d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1410,7 +1410,7 @@ public class ExecutionGraph implements
AccessExecutionGraph {
}
public void failJob(Throwable cause) {
- if (state == JobStatus.FAILING ||
state.isGloballyTerminalState()) {
+ if (state == JobStatus.FAILING || state.isTerminalState()) {
return;
}
@@ -1419,8 +1419,9 @@ public class ExecutionGraph implements
AccessExecutionGraph {
FutureUtils.assertNoException(
cancelVerticesAsync().whenComplete((aVoid, throwable)
-> {
- transitionState(JobStatus.FAILED, cause);
- onTerminalState(JobStatus.FAILED);
+ if (transitionState(JobStatus.FAILING,
JobStatus.FAILED, cause)) {
+ onTerminalState(JobStatus.FAILED);
+ }
}));
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 95eec11..42d3e02 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -437,6 +437,17 @@ public abstract class SchedulerBase implements SchedulerNG
{
return executionGraph.getCheckpointCoordinator();
}
+ /**
+ * ExecutionGraph is exposed to make it easier to rework tests to be
based on the new scheduler.
+ * ExecutionGraph is expected to be used only for state check. Yet at
the moment, before all the
+ * actions are factored out from ExecutionGraph and its sub-components,
some actions may still
+ * be performed directly on it.
+ */
+ @VisibleForTesting
+ public ExecutionGraph getExecutionGraph() {
+ return executionGraph;
+ }
+
//
------------------------------------------------------------------------
// SchedulerNG
//
------------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 2eba86b..325086c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -21,12 +21,14 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobStatus;
import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
-import
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
@@ -49,19 +51,20 @@ public class ExecutionGraphSuspendTest extends TestLogger {
public void testSuspendedOutOfCreated() throws Exception {
final InteractionsCountingTaskManagerGateway gateway = new
InteractionsCountingTaskManagerGateway();
final int parallelism = 10;
- final ExecutionGraph eg = createExecutionGraph(gateway,
parallelism);
+ final SchedulerBase scheduler = createScheduler(gateway,
parallelism);
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
assertEquals(JobStatus.CREATED, eg.getState());
// suspend
- eg.suspend(new Exception("suspend"));
+ scheduler.suspend(new Exception("suspend"));
assertEquals(JobStatus.SUSPENDED, eg.getState());
validateAllVerticesInState(eg, ExecutionState.CANCELED);
validateCancelRpcCalls(gateway, 0);
- ensureCannotLeaveSuspendedState(eg, gateway);
+ ensureCannotLeaveSuspendedState(scheduler, gateway);
}
/**
@@ -71,19 +74,20 @@ public class ExecutionGraphSuspendTest extends TestLogger {
public void testSuspendedOutOfDeploying() throws Exception {
final int parallelism = 10;
final InteractionsCountingTaskManagerGateway gateway = new
InteractionsCountingTaskManagerGateway(parallelism);
- final ExecutionGraph eg = createExecutionGraph(gateway,
parallelism);
+ final SchedulerBase scheduler = createScheduler(gateway,
parallelism);
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
- eg.scheduleForExecution();
+ scheduler.startScheduling();
assertEquals(JobStatus.RUNNING, eg.getState());
validateAllVerticesInState(eg, ExecutionState.DEPLOYING);
// suspend
- eg.suspend(new Exception("suspend"));
+ scheduler.suspend(new Exception("suspend"));
assertEquals(JobStatus.SUSPENDED, eg.getState());
validateCancelRpcCalls(gateway, parallelism);
- ensureCannotLeaveSuspendedState(eg, gateway);
+ ensureCannotLeaveSuspendedState(scheduler, gateway);
}
/**
@@ -93,21 +97,22 @@ public class ExecutionGraphSuspendTest extends TestLogger {
public void testSuspendedOutOfRunning() throws Exception {
final int parallelism = 10;
final InteractionsCountingTaskManagerGateway gateway = new
InteractionsCountingTaskManagerGateway(parallelism);
- final ExecutionGraph eg = createExecutionGraph(gateway,
parallelism);
+ final SchedulerBase scheduler = createScheduler(gateway,
parallelism);
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
- eg.scheduleForExecution();
+ scheduler.startScheduling();
ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
assertEquals(JobStatus.RUNNING, eg.getState());
validateAllVerticesInState(eg, ExecutionState.RUNNING);
// suspend
- eg.suspend(new Exception("suspend"));
+ scheduler.suspend(new Exception("suspend"));
assertEquals(JobStatus.SUSPENDED, eg.getState());
validateCancelRpcCalls(gateway, parallelism);
- ensureCannotLeaveSuspendedState(eg, gateway);
+ ensureCannotLeaveSuspendedState(scheduler, gateway);
}
/**
@@ -117,21 +122,22 @@ public class ExecutionGraphSuspendTest extends TestLogger
{
public void testSuspendedOutOfFailing() throws Exception {
final int parallelism = 10;
final InteractionsCountingTaskManagerGateway gateway = new
InteractionsCountingTaskManagerGateway(parallelism);
- final ExecutionGraph eg = createExecutionGraph(gateway,
parallelism);
+ final SchedulerBase scheduler = createScheduler(gateway,
parallelism);
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
- eg.scheduleForExecution();
+ scheduler.startScheduling();
ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
- eg.failGlobal(new Exception("fail global"));
+ scheduler.handleGlobalFailure(new Exception("fail global"));
assertEquals(JobStatus.FAILING, eg.getState());
validateCancelRpcCalls(gateway, parallelism);
// suspend
- eg.suspend(new Exception("suspend"));
+ scheduler.suspend(new Exception("suspend"));
assertEquals(JobStatus.SUSPENDED, eg.getState());
- ensureCannotLeaveSuspendedState(eg, gateway);
+ ensureCannotLeaveSuspendedState(scheduler, gateway);
}
/**
@@ -141,12 +147,13 @@ public class ExecutionGraphSuspendTest extends TestLogger
{
public void testSuspendedOutOfFailed() throws Exception {
final InteractionsCountingTaskManagerGateway gateway = new
InteractionsCountingTaskManagerGateway();
final int parallelism = 10;
- final ExecutionGraph eg = createExecutionGraph(gateway,
parallelism);
+ final SchedulerBase scheduler = createScheduler(gateway,
parallelism);
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
- eg.scheduleForExecution();
+ scheduler.startScheduling();
ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
- eg.failGlobal(new Exception("fail global"));
+ scheduler.handleGlobalFailure(new Exception("fail global"));
assertEquals(JobStatus.FAILING, eg.getState());
validateCancelRpcCalls(gateway, parallelism);
@@ -155,7 +162,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
assertEquals(JobStatus.FAILED, eg.getState());
// suspend
- eg.suspend(new Exception("suspend"));
+ scheduler.suspend(new Exception("suspend"));
// still in failed state
assertEquals(JobStatus.FAILED, eg.getState());
@@ -169,22 +176,23 @@ public class ExecutionGraphSuspendTest extends TestLogger
{
public void testSuspendedOutOfCanceling() throws Exception {
final int parallelism = 10;
final InteractionsCountingTaskManagerGateway gateway = new
InteractionsCountingTaskManagerGateway(parallelism);
- final ExecutionGraph eg = createExecutionGraph(gateway,
parallelism);
+ final SchedulerBase scheduler = createScheduler(gateway,
parallelism);
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
- eg.scheduleForExecution();
+ scheduler.startScheduling();
ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
- eg.cancel();
+ scheduler.cancel();
assertEquals(JobStatus.CANCELLING, eg.getState());
validateCancelRpcCalls(gateway, parallelism);
// suspend
- eg.suspend(new Exception("suspend"));
+ scheduler.suspend(new Exception("suspend"));
assertEquals(JobStatus.SUSPENDED, eg.getState());
- ensureCannotLeaveSuspendedState(eg, gateway);
+ ensureCannotLeaveSuspendedState(scheduler, gateway);
}
/**
@@ -194,12 +202,13 @@ public class ExecutionGraphSuspendTest extends TestLogger
{
public void testSuspendedOutOfCanceled() throws Exception {
final InteractionsCountingTaskManagerGateway gateway = new
InteractionsCountingTaskManagerGateway();
final int parallelism = 10;
- final ExecutionGraph eg = createExecutionGraph(gateway,
parallelism);
+ final SchedulerBase scheduler = createScheduler(gateway,
parallelism);
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
- eg.scheduleForExecution();
+ scheduler.startScheduling();
ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
- eg.cancel();
+ scheduler.cancel();
assertEquals(JobStatus.CANCELLING, eg.getState());
validateCancelRpcCalls(gateway, parallelism);
@@ -208,7 +217,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
assertEquals(JobStatus.CANCELED,
eg.getTerminationFuture().get());
// suspend
- eg.suspend(new Exception("suspend"));
+ scheduler.suspend(new Exception("suspend"));
// still in failed state
assertEquals(JobStatus.CANCELED, eg.getState());
@@ -220,22 +229,27 @@ public class ExecutionGraphSuspendTest extends TestLogger
{
*/
@Test
public void testSuspendWhileRestarting() throws Exception {
- final ExecutionGraph eg =
ExecutionGraphTestUtils.createSimpleTestGraph(new
InfiniteDelayRestartStrategy(10));
-
eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
- eg.scheduleForExecution();
+ final SchedulerBase scheduler =
SchedulerTestingUtils.newSchedulerBuilder(new JobGraph())
+ .setRestartBackoffTimeStrategy(new
TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE))
+ .build();
+
+
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ scheduler.startScheduling();
+
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
assertEquals(JobStatus.RUNNING, eg.getState());
ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
- eg.failGlobal(new Exception("test"));
- assertEquals(JobStatus.FAILING, eg.getState());
+ scheduler.handleGlobalFailure(new Exception("test"));
+ assertEquals(JobStatus.RESTARTING, eg.getState());
ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
assertEquals(JobStatus.RESTARTING, eg.getState());
final Exception exception = new Exception("Suspended");
- eg.suspend(exception);
+ scheduler.suspend(exception);
assertEquals(JobStatus.SUSPENDED, eg.getState());
@@ -246,20 +260,22 @@ public class ExecutionGraphSuspendTest extends TestLogger
{
// utilities
//
------------------------------------------------------------------------
- private static void ensureCannotLeaveSuspendedState(ExecutionGraph eg,
InteractionsCountingTaskManagerGateway gateway) {
+ private static void ensureCannotLeaveSuspendedState(SchedulerBase
scheduler, InteractionsCountingTaskManagerGateway gateway) {
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
+
gateway.waitUntilAllTasksAreSubmitted();
assertEquals(JobStatus.SUSPENDED, eg.getState());
gateway.resetCounts();
- eg.failGlobal(new Exception("fail"));
+ scheduler.handleGlobalFailure(new Exception("fail"));
assertEquals(JobStatus.SUSPENDED, eg.getState());
validateNoInteractions(gateway);
- eg.cancel();
+ scheduler.cancel();
assertEquals(JobStatus.SUSPENDED, eg.getState());
validateNoInteractions(gateway);
- eg.suspend(new Exception("suspend again"));
+ scheduler.suspend(new Exception("suspend again"));
assertEquals(JobStatus.SUSPENDED, eg.getState());
validateNoInteractions(gateway);
@@ -282,19 +298,15 @@ public class ExecutionGraphSuspendTest extends TestLogger
{
assertThat(gateway.getCancelTaskCount(), is(num));
}
- private static ExecutionGraph createExecutionGraph(TaskManagerGateway
gateway, int parallelism) throws Exception {
+ private static SchedulerBase createScheduler(TaskManagerGateway
gateway, int parallelism) throws Exception {
final JobVertex vertex = new JobVertex("vertex");
vertex.setInvokableClass(NoOpInvokable.class);
vertex.setParallelism(parallelism);
final SlotProvider slotProvider = new
SimpleSlotProvider(parallelism, gateway);
- ExecutionGraph simpleTestGraph =
ExecutionGraphTestUtils.createSimpleTestGraph(
- slotProvider,
- new FixedDelayRestartStrategy(0, 0),
- vertex);
-
simpleTestGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
- return simpleTestGraph;
+ final SchedulerBase scheduler =
SchedulerTestingUtils.createScheduler(new JobGraph(vertex), slotProvider);
+
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ return scheduler;
}
-
}