This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 4054aa0  [FLINK-19806][runtime] Harden DefaultScheduler for concurrent 
suspending and failing
4054aa0 is described below

commit 4054aa01ebb70aa7e4bf9b3bf96916f96869b42d
Author: Zhu Zhu <[email protected]>
AuthorDate: Tue Nov 17 12:01:17 2020 +0800

    [FLINK-19806][runtime] Harden DefaultScheduler for concurrent suspending 
and failing
---
 .../runtime/executiongraph/ExecutionGraph.java     |   7 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |  11 +++
 .../executiongraph/ExecutionGraphSuspendTest.java  | 110 ++++++++++++---------
 3 files changed, 76 insertions(+), 52 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index dfd20ab..880a42d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1410,7 +1410,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
        }
 
        public void failJob(Throwable cause) {
-               if (state == JobStatus.FAILING || 
state.isGloballyTerminalState()) {
+               if (state == JobStatus.FAILING || state.isTerminalState()) {
                        return;
                }
 
@@ -1419,8 +1419,9 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
 
                FutureUtils.assertNoException(
                        cancelVerticesAsync().whenComplete((aVoid, throwable) 
-> {
-                               transitionState(JobStatus.FAILED, cause);
-                               onTerminalState(JobStatus.FAILED);
+                               if (transitionState(JobStatus.FAILING, 
JobStatus.FAILED, cause)) {
+                                       onTerminalState(JobStatus.FAILED);
+                               }
                        }));
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 95eec11..42d3e02 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -437,6 +437,17 @@ public abstract class SchedulerBase implements SchedulerNG 
{
                return executionGraph.getCheckpointCoordinator();
        }
 
+       /**
+        * ExecutionGraph is exposed to make it easier to rework tests to be 
based on the new scheduler.
+        * ExecutionGraph is expected to be used only for state check. Yet at 
the moment, before all the
+        * actions are factored out from ExecutionGraph and its sub-components, 
some actions may still
+        * be performed directly on it.
+        */
+       @VisibleForTesting
+       public ExecutionGraph getExecutionGraph() {
+               return executionGraph;
+       }
+
        // 
------------------------------------------------------------------------
        // SchedulerNG
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 2eba86b..325086c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -21,12 +21,14 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobStatus;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
-import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
@@ -49,19 +51,20 @@ public class ExecutionGraphSuspendTest extends TestLogger {
        public void testSuspendedOutOfCreated() throws Exception {
                final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway();
                final int parallelism = 10;
-               final ExecutionGraph eg = createExecutionGraph(gateway, 
parallelism);
+               final SchedulerBase scheduler = createScheduler(gateway, 
parallelism);
+               final ExecutionGraph eg = scheduler.getExecutionGraph();
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
                // suspend
 
-               eg.suspend(new Exception("suspend"));
+               scheduler.suspend(new Exception("suspend"));
 
                assertEquals(JobStatus.SUSPENDED, eg.getState());
                validateAllVerticesInState(eg, ExecutionState.CANCELED);
                validateCancelRpcCalls(gateway, 0);
 
-               ensureCannotLeaveSuspendedState(eg, gateway);
+               ensureCannotLeaveSuspendedState(scheduler, gateway);
        }
 
        /**
@@ -71,19 +74,20 @@ public class ExecutionGraphSuspendTest extends TestLogger {
        public void testSuspendedOutOfDeploying() throws Exception {
                final int parallelism = 10;
                final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway(parallelism);
-               final ExecutionGraph eg = createExecutionGraph(gateway, 
parallelism);
+               final SchedulerBase scheduler = createScheduler(gateway, 
parallelism);
+               final ExecutionGraph eg = scheduler.getExecutionGraph();
 
-               eg.scheduleForExecution();
+               scheduler.startScheduling();
                assertEquals(JobStatus.RUNNING, eg.getState());
                validateAllVerticesInState(eg, ExecutionState.DEPLOYING);
 
                // suspend
-               eg.suspend(new Exception("suspend"));
+               scheduler.suspend(new Exception("suspend"));
 
                assertEquals(JobStatus.SUSPENDED, eg.getState());
                validateCancelRpcCalls(gateway, parallelism);
 
-               ensureCannotLeaveSuspendedState(eg, gateway);
+               ensureCannotLeaveSuspendedState(scheduler, gateway);
        }
 
        /**
@@ -93,21 +97,22 @@ public class ExecutionGraphSuspendTest extends TestLogger {
        public void testSuspendedOutOfRunning() throws Exception {
                final int parallelism = 10;
                final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway(parallelism);
-               final ExecutionGraph eg = createExecutionGraph(gateway, 
parallelism);
+               final SchedulerBase scheduler = createScheduler(gateway, 
parallelism);
+               final ExecutionGraph eg = scheduler.getExecutionGraph();
 
-               eg.scheduleForExecution();
+               scheduler.startScheduling();
                ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
 
                assertEquals(JobStatus.RUNNING, eg.getState());
                validateAllVerticesInState(eg, ExecutionState.RUNNING);
 
                // suspend
-               eg.suspend(new Exception("suspend"));
+               scheduler.suspend(new Exception("suspend"));
 
                assertEquals(JobStatus.SUSPENDED, eg.getState());
                validateCancelRpcCalls(gateway, parallelism);
 
-               ensureCannotLeaveSuspendedState(eg, gateway);
+               ensureCannotLeaveSuspendedState(scheduler, gateway);
        }
 
        /**
@@ -117,21 +122,22 @@ public class ExecutionGraphSuspendTest extends TestLogger 
{
        public void testSuspendedOutOfFailing() throws Exception {
                final int parallelism = 10;
                final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway(parallelism);
-               final ExecutionGraph eg = createExecutionGraph(gateway, 
parallelism);
+               final SchedulerBase scheduler = createScheduler(gateway, 
parallelism);
+               final ExecutionGraph eg = scheduler.getExecutionGraph();
 
-               eg.scheduleForExecution();
+               scheduler.startScheduling();
                ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
 
-               eg.failGlobal(new Exception("fail global"));
+               scheduler.handleGlobalFailure(new Exception("fail global"));
 
                assertEquals(JobStatus.FAILING, eg.getState());
                validateCancelRpcCalls(gateway, parallelism);
 
                // suspend
-               eg.suspend(new Exception("suspend"));
+               scheduler.suspend(new Exception("suspend"));
 
                assertEquals(JobStatus.SUSPENDED, eg.getState());
-               ensureCannotLeaveSuspendedState(eg, gateway);
+               ensureCannotLeaveSuspendedState(scheduler, gateway);
        }
 
        /**
@@ -141,12 +147,13 @@ public class ExecutionGraphSuspendTest extends TestLogger 
{
        public void testSuspendedOutOfFailed() throws Exception {
                final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway();
                final int parallelism = 10;
-               final ExecutionGraph eg = createExecutionGraph(gateway, 
parallelism);
+               final SchedulerBase scheduler = createScheduler(gateway, 
parallelism);
+               final ExecutionGraph eg = scheduler.getExecutionGraph();
 
-               eg.scheduleForExecution();
+               scheduler.startScheduling();
                ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
 
-               eg.failGlobal(new Exception("fail global"));
+               scheduler.handleGlobalFailure(new Exception("fail global"));
 
                assertEquals(JobStatus.FAILING, eg.getState());
                validateCancelRpcCalls(gateway, parallelism);
@@ -155,7 +162,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
                assertEquals(JobStatus.FAILED, eg.getState());
 
                // suspend
-               eg.suspend(new Exception("suspend"));
+               scheduler.suspend(new Exception("suspend"));
 
                // still in failed state
                assertEquals(JobStatus.FAILED, eg.getState());
@@ -169,22 +176,23 @@ public class ExecutionGraphSuspendTest extends TestLogger 
{
        public void testSuspendedOutOfCanceling() throws Exception {
                final int parallelism = 10;
                final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway(parallelism);
-               final ExecutionGraph eg = createExecutionGraph(gateway, 
parallelism);
+               final SchedulerBase scheduler = createScheduler(gateway, 
parallelism);
+               final ExecutionGraph eg = scheduler.getExecutionGraph();
 
-               eg.scheduleForExecution();
+               scheduler.startScheduling();
                ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
 
-               eg.cancel();
+               scheduler.cancel();
 
                assertEquals(JobStatus.CANCELLING, eg.getState());
                validateCancelRpcCalls(gateway, parallelism);
 
                // suspend
-               eg.suspend(new Exception("suspend"));
+               scheduler.suspend(new Exception("suspend"));
 
                assertEquals(JobStatus.SUSPENDED, eg.getState());
 
-               ensureCannotLeaveSuspendedState(eg, gateway);
+               ensureCannotLeaveSuspendedState(scheduler, gateway);
        }
 
        /**
@@ -194,12 +202,13 @@ public class ExecutionGraphSuspendTest extends TestLogger 
{
        public void testSuspendedOutOfCanceled() throws Exception {
                final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway();
                final int parallelism = 10;
-               final ExecutionGraph eg = createExecutionGraph(gateway, 
parallelism);
+               final SchedulerBase scheduler = createScheduler(gateway, 
parallelism);
+               final ExecutionGraph eg = scheduler.getExecutionGraph();
 
-               eg.scheduleForExecution();
+               scheduler.startScheduling();
                ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
 
-               eg.cancel();
+               scheduler.cancel();
 
                assertEquals(JobStatus.CANCELLING, eg.getState());
                validateCancelRpcCalls(gateway, parallelism);
@@ -208,7 +217,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
                assertEquals(JobStatus.CANCELED, 
eg.getTerminationFuture().get());
 
                // suspend
-               eg.suspend(new Exception("suspend"));
+               scheduler.suspend(new Exception("suspend"));
 
                // still in failed state
                assertEquals(JobStatus.CANCELED, eg.getState());
@@ -220,22 +229,27 @@ public class ExecutionGraphSuspendTest extends TestLogger 
{
         */
        @Test
        public void testSuspendWhileRestarting() throws Exception {
-               final ExecutionGraph eg = 
ExecutionGraphTestUtils.createSimpleTestGraph(new 
InfiniteDelayRestartStrategy(10));
-               
eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-               eg.scheduleForExecution();
+               final SchedulerBase scheduler = 
SchedulerTestingUtils.newSchedulerBuilder(new JobGraph())
+                       .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE))
+                       .build();
+
+               
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+               scheduler.startScheduling();
+
+               final ExecutionGraph eg = scheduler.getExecutionGraph();
 
                assertEquals(JobStatus.RUNNING, eg.getState());
                ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
 
-               eg.failGlobal(new Exception("test"));
-               assertEquals(JobStatus.FAILING, eg.getState());
+               scheduler.handleGlobalFailure(new Exception("test"));
+               assertEquals(JobStatus.RESTARTING, eg.getState());
 
                ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
                assertEquals(JobStatus.RESTARTING, eg.getState());
 
                final Exception exception = new Exception("Suspended");
 
-               eg.suspend(exception);
+               scheduler.suspend(exception);
 
                assertEquals(JobStatus.SUSPENDED, eg.getState());
 
@@ -246,20 +260,22 @@ public class ExecutionGraphSuspendTest extends TestLogger 
{
        //  utilities
        // 
------------------------------------------------------------------------
 
-       private static void ensureCannotLeaveSuspendedState(ExecutionGraph eg, 
InteractionsCountingTaskManagerGateway gateway) {
+       private static void ensureCannotLeaveSuspendedState(SchedulerBase 
scheduler, InteractionsCountingTaskManagerGateway gateway) {
+               final ExecutionGraph eg = scheduler.getExecutionGraph();
+
                gateway.waitUntilAllTasksAreSubmitted();
                assertEquals(JobStatus.SUSPENDED, eg.getState());
                gateway.resetCounts();
 
-               eg.failGlobal(new Exception("fail"));
+               scheduler.handleGlobalFailure(new Exception("fail"));
                assertEquals(JobStatus.SUSPENDED, eg.getState());
                validateNoInteractions(gateway);
 
-               eg.cancel();
+               scheduler.cancel();
                assertEquals(JobStatus.SUSPENDED, eg.getState());
                validateNoInteractions(gateway);
 
-               eg.suspend(new Exception("suspend again"));
+               scheduler.suspend(new Exception("suspend again"));
                assertEquals(JobStatus.SUSPENDED, eg.getState());
                validateNoInteractions(gateway);
 
@@ -282,19 +298,15 @@ public class ExecutionGraphSuspendTest extends TestLogger 
{
                assertThat(gateway.getCancelTaskCount(), is(num));
        }
 
-       private static ExecutionGraph createExecutionGraph(TaskManagerGateway 
gateway, int parallelism) throws Exception {
+       private static SchedulerBase createScheduler(TaskManagerGateway 
gateway, int parallelism) throws Exception {
                final JobVertex vertex = new JobVertex("vertex");
                vertex.setInvokableClass(NoOpInvokable.class);
                vertex.setParallelism(parallelism);
 
                final SlotProvider slotProvider = new 
SimpleSlotProvider(parallelism, gateway);
 
-               ExecutionGraph simpleTestGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
-                       slotProvider,
-                       new FixedDelayRestartStrategy(0, 0),
-                       vertex);
-               
simpleTestGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-               return simpleTestGraph;
+               final SchedulerBase scheduler = 
SchedulerTestingUtils.createScheduler(new JobGraph(vertex), slotProvider);
+               
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+               return scheduler;
        }
-
 }

Reply via email to