This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 58cc2a5  [FLINK-20626][runtime] Fix issue concurrent of concurrent 
failing and canceling of an ExecutionGraph
58cc2a5 is described below

commit 58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2
Author: Zhu Zhu <[email protected]>
AuthorDate: Wed Dec 16 19:56:04 2020 +0800

    [FLINK-20626][runtime] Fix issue concurrent of concurrent failing and 
canceling of an ExecutionGraph
    
    Update ExecutionGraphRestartTest to be based on the new scheduler to verify 
it.
---
 .../runtime/executiongraph/ExecutionGraph.java     |   5 +
 .../executiongraph/ExecutionGraphRestartTest.java  | 642 ++-------------------
 .../executiongraph/ExecutionGraphSuspendTest.java  |   6 +
 3 files changed, 74 insertions(+), 579 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8ac151e..a439d84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1435,6 +1435,11 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                        cancelVerticesAsync().whenComplete((aVoid, throwable) 
-> {
                                if (transitionState(JobStatus.FAILING, 
JobStatus.FAILED, cause)) {
                                        onTerminalState(JobStatus.FAILED);
+                               } else if (state == JobStatus.CANCELLING) {
+                                       transitionState(JobStatus.CANCELLING, 
JobStatus.CANCELED);
+                                       onTerminalState(JobStatus.CANCELED);
+                               } else if (!state.isTerminalState()) {
+                                       throw new IllegalStateException("Cannot 
complete job failing from an unexpected state: " + state);
                                }
                        }));
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 28eb45c..870c1b1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -22,26 +22,16 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
-import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
-import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import 
org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
@@ -49,39 +39,30 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
 import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Before;
 import org.junit.Test;
 
-import javax.annotation.Nonnull;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Tests the restart behaviour of the {@link ExecutionGraph}.
@@ -95,30 +76,15 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
        private static final JobID TEST_JOB_ID = new JobID();
 
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testNoManualRestart() throws Exception {
-               ExecutionGraph eg = TestingExecutionGraphBuilder
-                       .newBuilder()
-                       .setSlotProvider(new SimpleSlotProvider(NUM_TASKS))
-                       .setJobGraph(createJobGraph())
-                       .build();
-
-               startAndScheduleExecutionGraph(eg);
-
-               eg.getAllExecutionVertices().iterator().next().fail(new 
Exception("Test Exception"));
-
-               completeCanceling(eg);
-
-               assertEquals(JobStatus.FAILED, eg.getState());
+       private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
 
-               // This should not restart the graph.
-               eg.restart(eg.getGlobalModVersion());
-
-               assertEquals(JobStatus.FAILED, eg.getState());
+       @Before
+       public void setUp() {
+               taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
        }
 
+       // 
------------------------------------------------------------------------
+
        private void completeCanceling(ExecutionGraph eg) {
                executeOperationForAllExecutions(eg, 
Execution::completeCancelling);
        }
@@ -129,34 +95,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
                }
        }
 
-       @Test
-       public void testRestartAutomatically() throws Exception {
-               try (SlotPool slotPool = createSlotPoolImpl()) {
-                       ExecutionGraph executionGraph = 
TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setJobGraph(createJobGraph())
-                               
.setRestartStrategy(TestRestartStrategy.directExecuting())
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool))
-                               .build();
-
-                       startAndScheduleExecutionGraph(executionGraph);
-
-                       
executionGraph.getAllExecutionVertices().iterator().next().fail(new 
Exception("Test Exception"));
-
-                       assertEquals(JobStatus.FAILING, 
executionGraph.getState());
-
-                       for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
-                               
vertex.getCurrentExecutionAttempt().completeCancelling();
-                       }
-
-                       assertEquals(JobStatus.RUNNING, 
executionGraph.getState());
-                       finishAllVertices(executionGraph);
-                       assertEquals(JobStatus.FINISHED, 
executionGraph.getState());
-               }
-
-       }
-
-       @Nonnull
        private SlotPoolImpl createSlotPoolImpl() {
                return new TestingSlotPoolImpl(TEST_JOB_ID);
        }
@@ -166,95 +104,55 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                // We want to manually control the restart and delay
                try (SlotPool slotPool = createSlotPoolImpl()) {
                        TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
-                       final ExecutionGraph executionGraph = 
TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setJobGraph(createJobGraph())
-                               .setRestartStrategy(new 
InfiniteDelayRestartStrategy())
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool, taskManagerLocation))
+                       SchedulerBase scheduler = SchedulerTestingUtils
+                               .newSchedulerBuilderWithDefaultSlotAllocator(
+                                       createJobGraph(),
+                                       createSchedulerWithSlots(slotPool, 
taskManagerLocation))
+                               .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE))
+                               .setDelayExecutor(taskRestartExecutor)
                                .build();
+                       ExecutionGraph executionGraph = 
scheduler.getExecutionGraph();
 
-                       startAndScheduleExecutionGraph(executionGraph);
+                       startScheduling(scheduler);
 
                        // Release the TaskManager and wait for the job to 
restart
                        
slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new 
Exception("Test Exception"));
                        assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
 
                        // Canceling needs to abort the restart
-                       executionGraph.cancel();
+                       scheduler.cancel();
 
                        assertEquals(JobStatus.CANCELED, 
executionGraph.getState());
 
-                       // The restart has been aborted
-                       
executionGraph.restart(executionGraph.getGlobalModVersion());
+                       taskRestartExecutor.triggerScheduledTasks();
 
                        assertEquals(JobStatus.CANCELED, 
executionGraph.getState());
-               }
-
-       }
-
-       @Test
-       public void testFailWhileRestarting() throws Exception {
-               try (SlotPool slotPool = createSlotPoolImpl()) {
-                       TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
-                       final ExecutionGraph executionGraph = 
TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setJobGraph(createJobGraph())
-                               .setRestartStrategy(new 
InfiniteDelayRestartStrategy())
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool, taskManagerLocation))
-                               .build();
-
-                       startAndScheduleExecutionGraph(executionGraph);
-
-                       // Release the TaskManager and wait for the job to 
restart
-                       
slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new 
Exception("Test Exception"));
-
-                       assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
-
-                       // If we fail when being in RESTARTING, then we should 
try to restart again
-                       final long globalModVersion = 
executionGraph.getGlobalModVersion();
-                       final Exception testException = new Exception("Test 
exception");
-                       executionGraph.failGlobal(testException);
-
-                       assertNotEquals(globalModVersion, 
executionGraph.getGlobalModVersion());
-                       assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
-                       assertEquals(testException, 
executionGraph.getFailureCause()); // we should have updated the failure cause
-
-                       // but it should fail when sending a 
SuppressRestartsException
-                       executionGraph.failGlobal(new 
SuppressRestartsException(new Exception("Suppress restart exception")));
-
-                       assertEquals(JobStatus.FAILED, 
executionGraph.getState());
-
-                       // The restart has been aborted
-                       
executionGraph.restart(executionGraph.getGlobalModVersion());
-
-                       assertEquals(JobStatus.FAILED, 
executionGraph.getState());
+                       for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
+                               assertEquals(ExecutionState.FAILED, 
vertex.getExecutionState());
+                       }
                }
        }
 
        @Test
        public void testCancelWhileFailing() throws Exception {
                try (SlotPool slotPool = createSlotPoolImpl()) {
-                       final ExecutionGraph graph = 
TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setJobGraph(createJobGraph())
-                               .setRestartStrategy(new 
InfiniteDelayRestartStrategy())
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool))
+                       SchedulerBase scheduler = SchedulerTestingUtils
+                               
.newSchedulerBuilderWithDefaultSlotAllocator(createJobGraph(), 
createSchedulerWithSlots(slotPool))
+                               .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
                                .build();
+                       ExecutionGraph graph = scheduler.getExecutionGraph();
 
-                       startAndScheduleExecutionGraph(graph);
+                       startScheduling(scheduler);
 
                        assertEquals(JobStatus.RUNNING, graph.getState());
 
-                       // switch all tasks to running
-                       for (ExecutionVertex vertex : 
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
-                               
vertex.getCurrentExecutionAttempt().switchToRunning();
-                       }
+                       switchAllTasksToRunning(graph);
 
-                       graph.failGlobal(new Exception("test"));
+                       scheduler.handleGlobalFailure(new Exception("test"));
 
                        assertEquals(JobStatus.FAILING, graph.getState());
 
-                       graph.cancel();
+                       scheduler.cancel();
 
                        assertEquals(JobStatus.CANCELLING, graph.getState());
 
@@ -263,28 +161,27 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
 
                        assertEquals(JobStatus.CANCELED, graph.getState());
                }
-
        }
 
        @Test
        public void testFailWhileCanceling() throws Exception {
                try (SlotPool slotPool = createSlotPoolImpl()) {
-                       final ExecutionGraph graph = 
TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setJobGraph(createJobGraph())
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool))
+                       SchedulerBase scheduler = SchedulerTestingUtils
+                               
.newSchedulerBuilderWithDefaultSlotAllocator(createJobGraph(), 
createSchedulerWithSlots(slotPool))
+                               .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
                                .build();
+                       ExecutionGraph graph = scheduler.getExecutionGraph();
 
-                       startAndScheduleExecutionGraph(graph);
+                       startScheduling(scheduler);
 
                        assertEquals(JobStatus.RUNNING, graph.getState());
                        switchAllTasksToRunning(graph);
 
-                       graph.cancel();
+                       scheduler.cancel();
 
                        assertEquals(JobStatus.CANCELLING, graph.getState());
 
-                       graph.failGlobal(new Exception("test"));
+                       scheduler.handleGlobalFailure(new Exception("test"));
 
                        assertEquals(JobStatus.FAILING, graph.getState());
 
@@ -293,74 +190,12 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
 
                        assertEquals(JobStatus.FAILED, graph.getState());
                }
-
-       }
-
-       @Test
-       public void testTaskFailingWhileGlobalFailing() throws Exception {
-               try (SlotPool slotPool = createSlotPoolImpl()) {
-                       final ExecutionGraph graph = 
TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setRestartStrategy(new 
InfiniteDelayRestartStrategy())
-                               .setFailoverStrategyFactory(new 
TestFailoverStrategy.Factory())
-                               .setJobGraph(createJobGraph())
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool))
-                               .build();
-
-                       startAndScheduleExecutionGraph(graph);
-
-                       final TestFailoverStrategy failoverStrategy = 
(TestFailoverStrategy) graph.getFailoverStrategy();
-
-                       // switch all tasks to running
-                       for (ExecutionVertex vertex : 
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
-                               
vertex.getCurrentExecutionAttempt().switchToRunning();
-                       }
-
-                       graph.failGlobal(new Exception("test"));
-
-                       
graph.getAllExecutionVertices().iterator().next().fail(new Exception("Test task 
failure"));
-
-                       // no local failover should happen when in global 
failover cancelling
-                       assertEquals(0, 
failoverStrategy.getLocalFailoverCount());
-               }
-
        }
 
        private void switchAllTasksToRunning(ExecutionGraph graph) {
                executeOperationForAllExecutions(graph, 
Execution::switchToRunning);
        }
 
-       @Test
-       public void testNoRestartOnSuppressException() throws Exception {
-               try (SlotPool slotPool = createSlotPoolImpl()) {
-                       ExecutionGraph eg = TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setJobGraph(createJobGraph())
-                               .setRestartStrategy(new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, 0))
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool))
-                               .build();
-
-                       startAndScheduleExecutionGraph(eg);
-
-                       // Fail with unrecoverable Exception
-                       eg.getAllExecutionVertices().iterator().next().fail(
-                               new SuppressRestartsException(new 
Exception("Test Exception")));
-
-                       assertEquals(JobStatus.FAILING, eg.getState());
-
-                       completeCanceling(eg);
-
-                       eg.waitUntilTerminal();
-                       assertEquals(JobStatus.FAILED, eg.getState());
-
-                       RestartStrategy restartStrategy = 
eg.getRestartStrategy();
-                       assertTrue(restartStrategy instanceof 
FixedDelayRestartStrategy);
-
-                       assertEquals(0, ((FixedDelayRestartStrategy) 
restartStrategy).getCurrentRestartAttempt());
-               }
-
-       }
-
        /**
         * Tests that a failing execution does not affect a restarted job. This 
is important if a
         * callback handler fails an execution after it has already reached a 
final state and the job
@@ -373,14 +208,16 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
 
                try (SlotPool slotPool = createSlotPoolImpl()) {
-                       ExecutionGraph eg = TestingExecutionGraphBuilder
-                               .newBuilder()
-                               
.setRestartStrategy(TestRestartStrategy.directExecuting())
-                               .setJobGraph(jobGraph)
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool, new 
LocalTaskManagerLocation(), 2))
+                       SchedulerBase scheduler = SchedulerTestingUtils
+                               .newSchedulerBuilderWithDefaultSlotAllocator(
+                                       jobGraph,
+                                       createSchedulerWithSlots(slotPool, new 
LocalTaskManagerLocation(), 2))
+                               .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE))
+                               .setDelayExecutor(taskRestartExecutor)
                                .build();
+                       ExecutionGraph eg = scheduler.getExecutionGraph();
 
-                       startAndScheduleExecutionGraph(eg);
+                       startScheduling(scheduler);
 
                        Iterator<ExecutionVertex> executionVertices = 
eg.getAllExecutionVertices().iterator();
 
@@ -392,6 +229,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
                        failedExecution.fail(new Exception("Test Exception"));
                        failedExecution.completeCancelling();
 
+                       taskRestartExecutor.triggerScheduledTasks();
+
                        assertEquals(JobStatus.RUNNING, eg.getState());
 
                        // At this point all resources have been assigned
@@ -422,16 +261,19 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
        @Test
        public void testFailExecutionAfterCancel() throws Exception {
                try (SlotPool slotPool = createSlotPoolImpl()) {
-                       ExecutionGraph eg = 
TestingExecutionGraphBuilder.newBuilder()
-                               .setRestartStrategy(new 
InfiniteDelayRestartStrategy())
-                               .setJobGraph(createJobGraphToCancel())
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool, new 
LocalTaskManagerLocation(), 2))
+                       SchedulerBase scheduler = SchedulerTestingUtils
+                               .newSchedulerBuilderWithDefaultSlotAllocator(
+                                       createJobGraphToCancel(),
+                                       createSchedulerWithSlots(slotPool, new 
LocalTaskManagerLocation(), 2))
+                               .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
+                               .setDelayExecutor(taskRestartExecutor)
                                .build();
+                       ExecutionGraph eg = scheduler.getExecutionGraph();
 
-                       startAndScheduleExecutionGraph(eg);
+                       startScheduling(scheduler);
 
                        // Fail right after cancel (for example with concurrent 
slot release)
-                       eg.cancel();
+                       scheduler.cancel();
 
                        for (ExecutionVertex v : eg.getAllExecutionVertices()) {
                                v.getCurrentExecutionAttempt().fail(new 
Exception("Test Exception"));
@@ -446,332 +288,15 @@ public class ExecutionGraphRestartTest extends 
TestLogger {
                }
        }
 
-       /**
-        * Tests that it is possible to fail a graph via a call to
-        * {@link ExecutionGraph#failGlobal(Throwable)} after cancellation.
-        */
-       @Test
-       public void testFailExecutionGraphAfterCancel() throws Exception {
-               try (SlotPool slotPool = createSlotPoolImpl()) {
-                       ExecutionGraph eg = TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setRestartStrategy(new 
InfiniteDelayRestartStrategy())
-                               .setJobGraph(createJobGraphToCancel())
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool, new 
LocalTaskManagerLocation(), 2))
-                               .build();
-
-                       startAndScheduleExecutionGraph(eg);
-
-                       // Fail right after cancel (for example with concurrent 
slot release)
-                       eg.cancel();
-                       assertEquals(JobStatus.CANCELLING, eg.getState());
-
-                       eg.failGlobal(new Exception("Test Exception"));
-                       assertEquals(JobStatus.FAILING, eg.getState());
-
-                       Execution execution = 
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
-
-                       execution.completeCancelling();
-                       assertEquals(JobStatus.RESTARTING, eg.getState());
-               }
-       }
-
-       /**
-        * Tests that a suspend call while restarting a job, will abort the 
restarting.
-        */
-       @Test
-       public void testSuspendWhileRestarting() throws Exception {
-               TestRestartStrategy controllableRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
-               try (SlotPool slotPool = createSlotPoolImpl()) {
-                       TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
-                       ExecutionGraph eg = TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setJobGraph(createJobGraph())
-                               .setRestartStrategy(controllableRestartStrategy)
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool, taskManagerLocation))
-                               .build();
-
-                       startAndScheduleExecutionGraph(eg);
-
-                       // Release the TaskManager and wait for the job to 
restart
-                       
slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new 
Exception("Test Exception"));
-
-                       assertEquals(1, 
controllableRestartStrategy.getNumberOfQueuedActions());
-
-                       assertEquals(JobStatus.RESTARTING, eg.getState());
-
-                       eg.suspend(new Exception("Test exception"));
-
-                       assertEquals(JobStatus.SUSPENDED, eg.getState());
-
-                       controllableRestartStrategy.triggerAll().join();
-
-                       assertEquals(JobStatus.SUSPENDED, eg.getState());
-               }
-       }
-
-       @Test
-       public void testLocalFailAndRestart() throws Exception {
-               final int parallelism = 10;
-               final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
-
-               final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job", 
createNoOpVertex(parallelism));
-               jobGraph.setScheduleMode(ScheduleMode.EAGER);
-
-               final ExecutionGraph eg = TestingExecutionGraphBuilder
-                       .newBuilder()
-                       .setJobGraph(jobGraph)
-                       .setRestartStrategy(triggeredRestartStrategy)
-                       .build();
-
-               startAndScheduleExecutionGraph(eg);
-
-               switchToRunning(eg);
-
-               final ExecutionJobVertex vertex = 
eg.getVerticesTopologically().iterator().next();
-               final Execution first = 
vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
-               final Execution last = 
vertex.getTaskVertices()[vertex.getParallelism() - 
1].getCurrentExecutionAttempt();
-
-               // Have two executions fail
-               first.fail(new Exception("intended test failure 1"));
-               last.fail(new Exception("intended test failure 2"));
-
-               assertEquals(JobStatus.FAILING, eg.getState());
-
-               completeCancellingForAllVertices(eg);
-
-               // Now trigger the restart
-               assertEquals(1, 
triggeredRestartStrategy.getNumberOfQueuedActions());
-               triggeredRestartStrategy.triggerAll().join();
-
-               assertEquals(JobStatus.RUNNING, eg.getState());
-
-               switchToRunning(eg);
-               finishAllVertices(eg);
-
-               eg.waitUntilTerminal();
-               assertEquals(JobStatus.FINISHED, eg.getState());
-       }
-
-       @Test
-       public void testGlobalFailAndRestarts() throws Exception {
-               final int parallelism = 10;
-               final JobVertex vertex = createNoOpVertex(parallelism);
-               final NotCancelAckingTaskGateway taskManagerGateway = new 
NotCancelAckingTaskGateway();
-               final SlotProvider slots = new SimpleSlotProvider(parallelism, 
taskManagerGateway);
-               final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
-
-               final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job", 
vertex);
-               jobGraph.setScheduleMode(ScheduleMode.EAGER);
-               final ExecutionGraph eg = TestingExecutionGraphBuilder
-                       .newBuilder()
-                       .setJobGraph(jobGraph)
-                       .setSlotProvider(slots)
-                       .setRestartStrategy(restartStrategy)
-                       .build();
-
-               startAndScheduleExecutionGraph(eg);
-
-               switchToRunning(eg);
-
-               // fail into 'RESTARTING'
-               eg.failGlobal(new Exception("intended test failure 1"));
-               assertEquals(JobStatus.FAILING, eg.getState());
-
-               completeCancellingForAllVertices(eg);
-
-               assertEquals(JobStatus.RESTARTING, eg.getState());
-
-               eg.failGlobal(new Exception("intended test failure 2"));
-               assertEquals(JobStatus.RESTARTING, eg.getState());
-
-               restartStrategy.triggerAll().join();
-
-               assertEquals(JobStatus.RUNNING, eg.getState());
-
-               switchToRunning(eg);
-               finishAllVertices(eg);
-
-               eg.waitUntilTerminal();
-               assertEquals(JobStatus.FINISHED, eg.getState());
-
-               assertThat("Too many restarts", eg.getNumberOfRestarts(), 
is(lessThanOrEqualTo(2L)));
-       }
-
-       /**
-        * SlotPool#failAllocation should not fail with a {@link 
java.util.ConcurrentModificationException}
-        * if there is a concurrent scheduling operation. See FLINK-13421.
-        */
-       @Test
-       public void 
slotPoolExecutionGraph_ConcurrentSchedulingAndAllocationFailure_ShouldNotFailWithConcurrentModificationException()
 throws Exception {
-               final SlotSharingGroup group = new SlotSharingGroup();
-               final JobVertex vertex1 = createNoOpVertex("vertex1", 1);
-               vertex1.setSlotSharingGroup(group);
-               final JobVertex vertex2 = createNoOpVertex("vertex2", 3);
-               vertex2.setSlotSharingGroup(group);
-               final JobVertex vertex3 = createNoOpVertex("vertex3", 1);
-               vertex3.setSlotSharingGroup(group);
-               vertex3.connectNewDataSetAsInput(vertex2, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-
-               try (SlotPool slotPool = createSlotPoolImpl()) {
-                       final SlotProvider slots = 
createSchedulerWithSlots(slotPool, new LocalTaskManagerLocation(), 2);
-
-                       final AllocationID allocationId = 
slotPool.getAvailableSlotsInformation().iterator().next().getAllocationId();
-
-                       final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, 
"Test Job", vertex1, vertex2, vertex3);
-                       jobGraph.setScheduleMode(ScheduleMode.EAGER);
-                       final ExecutionGraph eg = TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setJobGraph(jobGraph)
-                               .setSlotProvider(slots)
-                               .setAllocationTimeout(Time.minutes(60))
-                               .build();
-
-                       startAndScheduleExecutionGraph(eg);
-
-                       slotPool.failAllocation(
-                               allocationId,
-                               new Exception("test exception"));
-
-                       eg.waitUntilTerminal();
-               }
-       }
-
-       @Test
-       public void testRestartWithEagerSchedulingAndSlotSharing() throws 
Exception {
-               final int parallelism = 20;
-
-               try (SlotPool slotPool = createSlotPoolImpl()) {
-                       final Scheduler scheduler = 
createSchedulerWithSlots(slotPool, new LocalTaskManagerLocation(), parallelism);
-
-                       final SlotSharingGroup sharingGroup = new 
SlotSharingGroup();
-
-                       final JobVertex source = new JobVertex("source");
-                       source.setInvokableClass(NoOpInvokable.class);
-                       source.setParallelism(parallelism);
-                       source.setSlotSharingGroup(sharingGroup);
-
-                       final JobVertex sink = new JobVertex("sink");
-                       sink.setInvokableClass(NoOpInvokable.class);
-                       sink.setParallelism(parallelism);
-                       sink.setSlotSharingGroup(sharingGroup);
-                       sink.connectNewDataSetAsInput(source, 
DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
-
-                       TestRestartStrategy restartStrategy = 
TestRestartStrategy.directExecuting();
-
-                       final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, 
"Test Job", source, sink);
-                       jobGraph.setScheduleMode(ScheduleMode.EAGER);
-                       final ExecutionGraph eg = TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setJobGraph(jobGraph)
-                               .setSlotProvider(scheduler)
-                               .setRestartStrategy(restartStrategy)
-                               .build();
-
-                       startAndScheduleExecutionGraph(eg);
-
-                       switchToRunning(eg);
-
-                       // fail into 'RESTARTING'
-                       
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
-                               new Exception("intended test failure"));
-
-                       assertEquals(JobStatus.FAILING, eg.getState());
-
-                       completeCancellingForAllVertices(eg);
-
-                       assertEquals(JobStatus.RUNNING, eg.getState());
-
-                       // clean termination
-                       switchToRunning(eg);
-                       finishAllVertices(eg);
-
-                       assertEquals(JobStatus.FINISHED, eg.getState());
-               }
-       }
-
-       /**
-        * Tests that the {@link ExecutionGraph} can handle failures while
-        * being in the RESTARTING state.
-        */
-       @Test
-       public void testFailureWhileRestarting() throws Exception {
-
-               final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
-               final ExecutionGraph executionGraph = 
TestingExecutionGraphBuilder.newBuilder()
-                       .setJobGraph(createJobGraph())
-                       .setRestartStrategy(restartStrategy)
-                       .setSlotProvider(new TestingSlotProvider(ignored -> new 
CompletableFuture<>()))
-                       .build();
-
-               startAndScheduleExecutionGraph(executionGraph);
-
-               assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
-
-               executionGraph.failGlobal(new FlinkException("Test exception"));
-
-               restartStrategy.triggerAll().join();
-
-               executionGraph.failGlobal(new FlinkException("Concurrent 
exception"));
-
-               restartStrategy.triggerAll().join();
-
-               assertEquals(JobStatus.RUNNING, executionGraph.getState());
-       }
-
-       @Test
-       public void 
failGlobalIfExecutionIsStillRunning_failingAnExecutionTwice_ShouldTriggerOnlyOneFailover()
 throws Exception {
-               JobVertex sender = 
ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
-               JobVertex receiver = 
ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
-               JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
-
-               try (SlotPool slotPool = createSlotPoolImpl()) {
-                       ExecutionGraph eg = TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setRestartStrategy(new TestRestartStrategy(1, 
false))
-                               .setJobGraph(jobGraph)
-                               
.setSlotProvider(createSchedulerWithSlots(slotPool, new 
LocalTaskManagerLocation(), 2))
-                               .build();
-
-                       startAndScheduleExecutionGraph(eg);
-
-                       Iterator<ExecutionVertex> executionVertices = 
eg.getAllExecutionVertices().iterator();
-
-                       Execution finishedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
-                       Execution failedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
-
-                       finishedExecution.markFinished();
-
-                       failedExecution.fail(new Exception("Test Exception"));
-                       failedExecution.completeCancelling();
-
-                       assertEquals(JobStatus.RUNNING, eg.getState());
-
-                       // At this point all resources have been assigned
-                       for (ExecutionVertex vertex : 
eg.getAllExecutionVertices()) {
-                               assertNotNull("No assigned resource (test 
instability).", vertex.getCurrentAssignedResource());
-                               
vertex.getCurrentExecutionAttempt().switchToRunning();
-                       }
-
-                       // fail global with old finished execution, this should 
not affect the execution
-                       eg.failGlobalIfExecutionIsStillRunning(new 
Exception("This should have no effect"), finishedExecution.getAttemptId());
-
-                       assertThat(eg.getState(), is(JobStatus.RUNNING));
-
-                       // the state of the finished execution should have not 
changed since it is terminal
-                       assertThat(finishedExecution.getState(), 
is(ExecutionState.FINISHED));
-               }
-       }
-
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private static void startAndScheduleExecutionGraph(ExecutionGraph 
executionGraph) throws Exception {
-               executionGraph.start(mainThreadExecutor);
-               assertThat(executionGraph.getState(), is(JobStatus.CREATED));
-               executionGraph.scheduleForExecution();
-               assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
+       private static void startScheduling(SchedulerBase scheduler) throws 
Exception {
+               scheduler.setMainThreadExecutor(mainThreadExecutor);
+               assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.CREATED));
+               scheduler.startScheduling();
+               assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
        }
 
        private static Scheduler createSchedulerWithSlots(SlotPool slotPool) 
throws Exception {
@@ -822,45 +347,4 @@ public class ExecutionGraphRestartTest extends TestLogger {
                jobGraph.setExecutionConfig(executionConfig);
                return jobGraph;
        }
-
-       /**
-        * Test failover strategy which records local failover count.
-        */
-       static class TestFailoverStrategy extends FailoverStrategy {
-
-               private int localFailoverCount = 0;
-
-               @Override
-               public void onTaskFailure(Execution taskExecution, Throwable 
cause) {
-                       localFailoverCount++;
-               }
-
-               @Override
-               public void notifyNewVertices(List<ExecutionJobVertex> 
newJobVerticesTopological) {
-               }
-
-               @Override
-               public String getStrategyName() {
-                       return "Test Failover Strategy";
-               }
-
-               int getLocalFailoverCount() {
-                       return localFailoverCount;
-               }
-
-               // 
------------------------------------------------------------------------
-               //  factory
-               // 
------------------------------------------------------------------------
-
-               /**
-                * Factory that instantiates the TestFailoverStrategy.
-                */
-               public static class Factory implements FailoverStrategy.Factory 
{
-
-                       @Override
-                       public FailoverStrategy create(ExecutionGraph 
executionGraph) {
-                               return new TestFailoverStrategy();
-                       }
-               }
-       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 325086c..68852ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobStatus;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
@@ -229,8 +230,10 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         */
        @Test
        public void testSuspendWhileRestarting() throws Exception {
+               final ManuallyTriggeredScheduledExecutor taskRestartExecutor = 
new ManuallyTriggeredScheduledExecutor();
                final SchedulerBase scheduler = 
SchedulerTestingUtils.newSchedulerBuilder(new JobGraph())
                        .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE))
+                       .setDelayExecutor(taskRestartExecutor)
                        .build();
 
                
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
@@ -254,6 +257,9 @@ public class ExecutionGraphSuspendTest extends TestLogger {
                assertEquals(JobStatus.SUSPENDED, eg.getState());
 
                assertEquals(exception, eg.getFailureCause());
+
+               taskRestartExecutor.triggerScheduledTasks();
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
        }
 
        // 
------------------------------------------------------------------------

Reply via email to