This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 58cc2a5 [FLINK-20626][runtime] Fix issue concurrent of concurrent
failing and canceling of an ExecutionGraph
58cc2a5 is described below
commit 58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2
Author: Zhu Zhu <[email protected]>
AuthorDate: Wed Dec 16 19:56:04 2020 +0800
[FLINK-20626][runtime] Fix issue concurrent of concurrent failing and
canceling of an ExecutionGraph
Update ExecutionGraphRestartTest to be based on the new scheduler to verify
it.
---
.../runtime/executiongraph/ExecutionGraph.java | 5 +
.../executiongraph/ExecutionGraphRestartTest.java | 642 ++-------------------
.../executiongraph/ExecutionGraphSuspendTest.java | 6 +
3 files changed, 74 insertions(+), 579 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8ac151e..a439d84 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1435,6 +1435,11 @@ public class ExecutionGraph implements
AccessExecutionGraph {
cancelVerticesAsync().whenComplete((aVoid, throwable)
-> {
if (transitionState(JobStatus.FAILING,
JobStatus.FAILED, cause)) {
onTerminalState(JobStatus.FAILED);
+ } else if (state == JobStatus.CANCELLING) {
+ transitionState(JobStatus.CANCELLING,
JobStatus.CANCELED);
+ onTerminalState(JobStatus.CANCELED);
+ } else if (!state.isTerminalState()) {
+ throw new IllegalStateException("Cannot
complete job failing from an unexpected state: " + state);
}
}));
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 28eb45c..870c1b1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -22,26 +22,16 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
-import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
-import
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import
org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
@@ -49,39 +39,30 @@ import
org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
+import org.junit.Before;
import org.junit.Test;
-import javax.annotation.Nonnull;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
-import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
-import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
-import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
-import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
/**
* Tests the restart behaviour of the {@link ExecutionGraph}.
@@ -95,30 +76,15 @@ public class ExecutionGraphRestartTest extends TestLogger {
private static final JobID TEST_JOB_ID = new JobID();
- //
------------------------------------------------------------------------
-
- @Test
- public void testNoManualRestart() throws Exception {
- ExecutionGraph eg = TestingExecutionGraphBuilder
- .newBuilder()
- .setSlotProvider(new SimpleSlotProvider(NUM_TASKS))
- .setJobGraph(createJobGraph())
- .build();
-
- startAndScheduleExecutionGraph(eg);
-
- eg.getAllExecutionVertices().iterator().next().fail(new
Exception("Test Exception"));
-
- completeCanceling(eg);
-
- assertEquals(JobStatus.FAILED, eg.getState());
+ private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
- // This should not restart the graph.
- eg.restart(eg.getGlobalModVersion());
-
- assertEquals(JobStatus.FAILED, eg.getState());
+ @Before
+ public void setUp() {
+ taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
}
+ //
------------------------------------------------------------------------
+
private void completeCanceling(ExecutionGraph eg) {
executeOperationForAllExecutions(eg,
Execution::completeCancelling);
}
@@ -129,34 +95,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
}
}
- @Test
- public void testRestartAutomatically() throws Exception {
- try (SlotPool slotPool = createSlotPoolImpl()) {
- ExecutionGraph executionGraph =
TestingExecutionGraphBuilder
- .newBuilder()
- .setJobGraph(createJobGraph())
-
.setRestartStrategy(TestRestartStrategy.directExecuting())
-
.setSlotProvider(createSchedulerWithSlots(slotPool))
- .build();
-
- startAndScheduleExecutionGraph(executionGraph);
-
-
executionGraph.getAllExecutionVertices().iterator().next().fail(new
Exception("Test Exception"));
-
- assertEquals(JobStatus.FAILING,
executionGraph.getState());
-
- for (ExecutionVertex vertex :
executionGraph.getAllExecutionVertices()) {
-
vertex.getCurrentExecutionAttempt().completeCancelling();
- }
-
- assertEquals(JobStatus.RUNNING,
executionGraph.getState());
- finishAllVertices(executionGraph);
- assertEquals(JobStatus.FINISHED,
executionGraph.getState());
- }
-
- }
-
- @Nonnull
private SlotPoolImpl createSlotPoolImpl() {
return new TestingSlotPoolImpl(TEST_JOB_ID);
}
@@ -166,95 +104,55 @@ public class ExecutionGraphRestartTest extends TestLogger
{
// We want to manually control the restart and delay
try (SlotPool slotPool = createSlotPoolImpl()) {
TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
- final ExecutionGraph executionGraph =
TestingExecutionGraphBuilder
- .newBuilder()
- .setJobGraph(createJobGraph())
- .setRestartStrategy(new
InfiniteDelayRestartStrategy())
-
.setSlotProvider(createSchedulerWithSlots(slotPool, taskManagerLocation))
+ SchedulerBase scheduler = SchedulerTestingUtils
+ .newSchedulerBuilderWithDefaultSlotAllocator(
+ createJobGraph(),
+ createSchedulerWithSlots(slotPool,
taskManagerLocation))
+ .setRestartBackoffTimeStrategy(new
TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE))
+ .setDelayExecutor(taskRestartExecutor)
.build();
+ ExecutionGraph executionGraph =
scheduler.getExecutionGraph();
- startAndScheduleExecutionGraph(executionGraph);
+ startScheduling(scheduler);
// Release the TaskManager and wait for the job to
restart
slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new
Exception("Test Exception"));
assertEquals(JobStatus.RESTARTING,
executionGraph.getState());
// Canceling needs to abort the restart
- executionGraph.cancel();
+ scheduler.cancel();
assertEquals(JobStatus.CANCELED,
executionGraph.getState());
- // The restart has been aborted
-
executionGraph.restart(executionGraph.getGlobalModVersion());
+ taskRestartExecutor.triggerScheduledTasks();
assertEquals(JobStatus.CANCELED,
executionGraph.getState());
- }
-
- }
-
- @Test
- public void testFailWhileRestarting() throws Exception {
- try (SlotPool slotPool = createSlotPoolImpl()) {
- TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
- final ExecutionGraph executionGraph =
TestingExecutionGraphBuilder
- .newBuilder()
- .setJobGraph(createJobGraph())
- .setRestartStrategy(new
InfiniteDelayRestartStrategy())
-
.setSlotProvider(createSchedulerWithSlots(slotPool, taskManagerLocation))
- .build();
-
- startAndScheduleExecutionGraph(executionGraph);
-
- // Release the TaskManager and wait for the job to
restart
-
slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new
Exception("Test Exception"));
-
- assertEquals(JobStatus.RESTARTING,
executionGraph.getState());
-
- // If we fail when being in RESTARTING, then we should
try to restart again
- final long globalModVersion =
executionGraph.getGlobalModVersion();
- final Exception testException = new Exception("Test
exception");
- executionGraph.failGlobal(testException);
-
- assertNotEquals(globalModVersion,
executionGraph.getGlobalModVersion());
- assertEquals(JobStatus.RESTARTING,
executionGraph.getState());
- assertEquals(testException,
executionGraph.getFailureCause()); // we should have updated the failure cause
-
- // but it should fail when sending a
SuppressRestartsException
- executionGraph.failGlobal(new
SuppressRestartsException(new Exception("Suppress restart exception")));
-
- assertEquals(JobStatus.FAILED,
executionGraph.getState());
-
- // The restart has been aborted
-
executionGraph.restart(executionGraph.getGlobalModVersion());
-
- assertEquals(JobStatus.FAILED,
executionGraph.getState());
+ for (ExecutionVertex vertex :
executionGraph.getAllExecutionVertices()) {
+ assertEquals(ExecutionState.FAILED,
vertex.getExecutionState());
+ }
}
}
@Test
public void testCancelWhileFailing() throws Exception {
try (SlotPool slotPool = createSlotPoolImpl()) {
- final ExecutionGraph graph =
TestingExecutionGraphBuilder
- .newBuilder()
- .setJobGraph(createJobGraph())
- .setRestartStrategy(new
InfiniteDelayRestartStrategy())
-
.setSlotProvider(createSchedulerWithSlots(slotPool))
+ SchedulerBase scheduler = SchedulerTestingUtils
+
.newSchedulerBuilderWithDefaultSlotAllocator(createJobGraph(),
createSchedulerWithSlots(slotPool))
+ .setRestartBackoffTimeStrategy(new
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
.build();
+ ExecutionGraph graph = scheduler.getExecutionGraph();
- startAndScheduleExecutionGraph(graph);
+ startScheduling(scheduler);
assertEquals(JobStatus.RUNNING, graph.getState());
- // switch all tasks to running
- for (ExecutionVertex vertex :
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
-
vertex.getCurrentExecutionAttempt().switchToRunning();
- }
+ switchAllTasksToRunning(graph);
- graph.failGlobal(new Exception("test"));
+ scheduler.handleGlobalFailure(new Exception("test"));
assertEquals(JobStatus.FAILING, graph.getState());
- graph.cancel();
+ scheduler.cancel();
assertEquals(JobStatus.CANCELLING, graph.getState());
@@ -263,28 +161,27 @@ public class ExecutionGraphRestartTest extends TestLogger
{
assertEquals(JobStatus.CANCELED, graph.getState());
}
-
}
@Test
public void testFailWhileCanceling() throws Exception {
try (SlotPool slotPool = createSlotPoolImpl()) {
- final ExecutionGraph graph =
TestingExecutionGraphBuilder
- .newBuilder()
- .setJobGraph(createJobGraph())
-
.setSlotProvider(createSchedulerWithSlots(slotPool))
+ SchedulerBase scheduler = SchedulerTestingUtils
+
.newSchedulerBuilderWithDefaultSlotAllocator(createJobGraph(),
createSchedulerWithSlots(slotPool))
+ .setRestartBackoffTimeStrategy(new
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
.build();
+ ExecutionGraph graph = scheduler.getExecutionGraph();
- startAndScheduleExecutionGraph(graph);
+ startScheduling(scheduler);
assertEquals(JobStatus.RUNNING, graph.getState());
switchAllTasksToRunning(graph);
- graph.cancel();
+ scheduler.cancel();
assertEquals(JobStatus.CANCELLING, graph.getState());
- graph.failGlobal(new Exception("test"));
+ scheduler.handleGlobalFailure(new Exception("test"));
assertEquals(JobStatus.FAILING, graph.getState());
@@ -293,74 +190,12 @@ public class ExecutionGraphRestartTest extends TestLogger
{
assertEquals(JobStatus.FAILED, graph.getState());
}
-
- }
-
- @Test
- public void testTaskFailingWhileGlobalFailing() throws Exception {
- try (SlotPool slotPool = createSlotPoolImpl()) {
- final ExecutionGraph graph =
TestingExecutionGraphBuilder
- .newBuilder()
- .setRestartStrategy(new
InfiniteDelayRestartStrategy())
- .setFailoverStrategyFactory(new
TestFailoverStrategy.Factory())
- .setJobGraph(createJobGraph())
-
.setSlotProvider(createSchedulerWithSlots(slotPool))
- .build();
-
- startAndScheduleExecutionGraph(graph);
-
- final TestFailoverStrategy failoverStrategy =
(TestFailoverStrategy) graph.getFailoverStrategy();
-
- // switch all tasks to running
- for (ExecutionVertex vertex :
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
-
vertex.getCurrentExecutionAttempt().switchToRunning();
- }
-
- graph.failGlobal(new Exception("test"));
-
-
graph.getAllExecutionVertices().iterator().next().fail(new Exception("Test task
failure"));
-
- // no local failover should happen when in global
failover cancelling
- assertEquals(0,
failoverStrategy.getLocalFailoverCount());
- }
-
}
private void switchAllTasksToRunning(ExecutionGraph graph) {
executeOperationForAllExecutions(graph,
Execution::switchToRunning);
}
- @Test
- public void testNoRestartOnSuppressException() throws Exception {
- try (SlotPool slotPool = createSlotPoolImpl()) {
- ExecutionGraph eg = TestingExecutionGraphBuilder
- .newBuilder()
- .setJobGraph(createJobGraph())
- .setRestartStrategy(new
FixedDelayRestartStrategy(Integer.MAX_VALUE, 0))
-
.setSlotProvider(createSchedulerWithSlots(slotPool))
- .build();
-
- startAndScheduleExecutionGraph(eg);
-
- // Fail with unrecoverable Exception
- eg.getAllExecutionVertices().iterator().next().fail(
- new SuppressRestartsException(new
Exception("Test Exception")));
-
- assertEquals(JobStatus.FAILING, eg.getState());
-
- completeCanceling(eg);
-
- eg.waitUntilTerminal();
- assertEquals(JobStatus.FAILED, eg.getState());
-
- RestartStrategy restartStrategy =
eg.getRestartStrategy();
- assertTrue(restartStrategy instanceof
FixedDelayRestartStrategy);
-
- assertEquals(0, ((FixedDelayRestartStrategy)
restartStrategy).getCurrentRestartAttempt());
- }
-
- }
-
/**
* Tests that a failing execution does not affect a restarted job. This
is important if a
* callback handler fails an execution after it has already reached a
final state and the job
@@ -373,14 +208,16 @@ public class ExecutionGraphRestartTest extends TestLogger
{
JobGraph jobGraph = new JobGraph("Pointwise job", sender,
receiver);
try (SlotPool slotPool = createSlotPoolImpl()) {
- ExecutionGraph eg = TestingExecutionGraphBuilder
- .newBuilder()
-
.setRestartStrategy(TestRestartStrategy.directExecuting())
- .setJobGraph(jobGraph)
-
.setSlotProvider(createSchedulerWithSlots(slotPool, new
LocalTaskManagerLocation(), 2))
+ SchedulerBase scheduler = SchedulerTestingUtils
+ .newSchedulerBuilderWithDefaultSlotAllocator(
+ jobGraph,
+ createSchedulerWithSlots(slotPool, new
LocalTaskManagerLocation(), 2))
+ .setRestartBackoffTimeStrategy(new
TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE))
+ .setDelayExecutor(taskRestartExecutor)
.build();
+ ExecutionGraph eg = scheduler.getExecutionGraph();
- startAndScheduleExecutionGraph(eg);
+ startScheduling(scheduler);
Iterator<ExecutionVertex> executionVertices =
eg.getAllExecutionVertices().iterator();
@@ -392,6 +229,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
failedExecution.fail(new Exception("Test Exception"));
failedExecution.completeCancelling();
+ taskRestartExecutor.triggerScheduledTasks();
+
assertEquals(JobStatus.RUNNING, eg.getState());
// At this point all resources have been assigned
@@ -422,16 +261,19 @@ public class ExecutionGraphRestartTest extends TestLogger
{
@Test
public void testFailExecutionAfterCancel() throws Exception {
try (SlotPool slotPool = createSlotPoolImpl()) {
- ExecutionGraph eg =
TestingExecutionGraphBuilder.newBuilder()
- .setRestartStrategy(new
InfiniteDelayRestartStrategy())
- .setJobGraph(createJobGraphToCancel())
-
.setSlotProvider(createSchedulerWithSlots(slotPool, new
LocalTaskManagerLocation(), 2))
+ SchedulerBase scheduler = SchedulerTestingUtils
+ .newSchedulerBuilderWithDefaultSlotAllocator(
+ createJobGraphToCancel(),
+ createSchedulerWithSlots(slotPool, new
LocalTaskManagerLocation(), 2))
+ .setRestartBackoffTimeStrategy(new
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
+ .setDelayExecutor(taskRestartExecutor)
.build();
+ ExecutionGraph eg = scheduler.getExecutionGraph();
- startAndScheduleExecutionGraph(eg);
+ startScheduling(scheduler);
// Fail right after cancel (for example with concurrent
slot release)
- eg.cancel();
+ scheduler.cancel();
for (ExecutionVertex v : eg.getAllExecutionVertices()) {
v.getCurrentExecutionAttempt().fail(new
Exception("Test Exception"));
@@ -446,332 +288,15 @@ public class ExecutionGraphRestartTest extends
TestLogger {
}
}
- /**
- * Tests that it is possible to fail a graph via a call to
- * {@link ExecutionGraph#failGlobal(Throwable)} after cancellation.
- */
- @Test
- public void testFailExecutionGraphAfterCancel() throws Exception {
- try (SlotPool slotPool = createSlotPoolImpl()) {
- ExecutionGraph eg = TestingExecutionGraphBuilder
- .newBuilder()
- .setRestartStrategy(new
InfiniteDelayRestartStrategy())
- .setJobGraph(createJobGraphToCancel())
-
.setSlotProvider(createSchedulerWithSlots(slotPool, new
LocalTaskManagerLocation(), 2))
- .build();
-
- startAndScheduleExecutionGraph(eg);
-
- // Fail right after cancel (for example with concurrent
slot release)
- eg.cancel();
- assertEquals(JobStatus.CANCELLING, eg.getState());
-
- eg.failGlobal(new Exception("Test Exception"));
- assertEquals(JobStatus.FAILING, eg.getState());
-
- Execution execution =
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
-
- execution.completeCancelling();
- assertEquals(JobStatus.RESTARTING, eg.getState());
- }
- }
-
- /**
- * Tests that a suspend call while restarting a job, will abort the
restarting.
- */
- @Test
- public void testSuspendWhileRestarting() throws Exception {
- TestRestartStrategy controllableRestartStrategy =
TestRestartStrategy.manuallyTriggered();
- try (SlotPool slotPool = createSlotPoolImpl()) {
- TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
- ExecutionGraph eg = TestingExecutionGraphBuilder
- .newBuilder()
- .setJobGraph(createJobGraph())
- .setRestartStrategy(controllableRestartStrategy)
-
.setSlotProvider(createSchedulerWithSlots(slotPool, taskManagerLocation))
- .build();
-
- startAndScheduleExecutionGraph(eg);
-
- // Release the TaskManager and wait for the job to
restart
-
slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new
Exception("Test Exception"));
-
- assertEquals(1,
controllableRestartStrategy.getNumberOfQueuedActions());
-
- assertEquals(JobStatus.RESTARTING, eg.getState());
-
- eg.suspend(new Exception("Test exception"));
-
- assertEquals(JobStatus.SUSPENDED, eg.getState());
-
- controllableRestartStrategy.triggerAll().join();
-
- assertEquals(JobStatus.SUSPENDED, eg.getState());
- }
- }
-
- @Test
- public void testLocalFailAndRestart() throws Exception {
- final int parallelism = 10;
- final TestRestartStrategy triggeredRestartStrategy =
TestRestartStrategy.manuallyTriggered();
-
- final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job",
createNoOpVertex(parallelism));
- jobGraph.setScheduleMode(ScheduleMode.EAGER);
-
- final ExecutionGraph eg = TestingExecutionGraphBuilder
- .newBuilder()
- .setJobGraph(jobGraph)
- .setRestartStrategy(triggeredRestartStrategy)
- .build();
-
- startAndScheduleExecutionGraph(eg);
-
- switchToRunning(eg);
-
- final ExecutionJobVertex vertex =
eg.getVerticesTopologically().iterator().next();
- final Execution first =
vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
- final Execution last =
vertex.getTaskVertices()[vertex.getParallelism() -
1].getCurrentExecutionAttempt();
-
- // Have two executions fail
- first.fail(new Exception("intended test failure 1"));
- last.fail(new Exception("intended test failure 2"));
-
- assertEquals(JobStatus.FAILING, eg.getState());
-
- completeCancellingForAllVertices(eg);
-
- // Now trigger the restart
- assertEquals(1,
triggeredRestartStrategy.getNumberOfQueuedActions());
- triggeredRestartStrategy.triggerAll().join();
-
- assertEquals(JobStatus.RUNNING, eg.getState());
-
- switchToRunning(eg);
- finishAllVertices(eg);
-
- eg.waitUntilTerminal();
- assertEquals(JobStatus.FINISHED, eg.getState());
- }
-
- @Test
- public void testGlobalFailAndRestarts() throws Exception {
- final int parallelism = 10;
- final JobVertex vertex = createNoOpVertex(parallelism);
- final NotCancelAckingTaskGateway taskManagerGateway = new
NotCancelAckingTaskGateway();
- final SlotProvider slots = new SimpleSlotProvider(parallelism,
taskManagerGateway);
- final TestRestartStrategy restartStrategy =
TestRestartStrategy.manuallyTriggered();
-
- final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job",
vertex);
- jobGraph.setScheduleMode(ScheduleMode.EAGER);
- final ExecutionGraph eg = TestingExecutionGraphBuilder
- .newBuilder()
- .setJobGraph(jobGraph)
- .setSlotProvider(slots)
- .setRestartStrategy(restartStrategy)
- .build();
-
- startAndScheduleExecutionGraph(eg);
-
- switchToRunning(eg);
-
- // fail into 'RESTARTING'
- eg.failGlobal(new Exception("intended test failure 1"));
- assertEquals(JobStatus.FAILING, eg.getState());
-
- completeCancellingForAllVertices(eg);
-
- assertEquals(JobStatus.RESTARTING, eg.getState());
-
- eg.failGlobal(new Exception("intended test failure 2"));
- assertEquals(JobStatus.RESTARTING, eg.getState());
-
- restartStrategy.triggerAll().join();
-
- assertEquals(JobStatus.RUNNING, eg.getState());
-
- switchToRunning(eg);
- finishAllVertices(eg);
-
- eg.waitUntilTerminal();
- assertEquals(JobStatus.FINISHED, eg.getState());
-
- assertThat("Too many restarts", eg.getNumberOfRestarts(),
is(lessThanOrEqualTo(2L)));
- }
-
- /**
- * SlotPool#failAllocation should not fail with a {@link
java.util.ConcurrentModificationException}
- * if there is a concurrent scheduling operation. See FLINK-13421.
- */
- @Test
- public void
slotPoolExecutionGraph_ConcurrentSchedulingAndAllocationFailure_ShouldNotFailWithConcurrentModificationException()
throws Exception {
- final SlotSharingGroup group = new SlotSharingGroup();
- final JobVertex vertex1 = createNoOpVertex("vertex1", 1);
- vertex1.setSlotSharingGroup(group);
- final JobVertex vertex2 = createNoOpVertex("vertex2", 3);
- vertex2.setSlotSharingGroup(group);
- final JobVertex vertex3 = createNoOpVertex("vertex3", 1);
- vertex3.setSlotSharingGroup(group);
- vertex3.connectNewDataSetAsInput(vertex2,
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-
- try (SlotPool slotPool = createSlotPoolImpl()) {
- final SlotProvider slots =
createSchedulerWithSlots(slotPool, new LocalTaskManagerLocation(), 2);
-
- final AllocationID allocationId =
slotPool.getAvailableSlotsInformation().iterator().next().getAllocationId();
-
- final JobGraph jobGraph = new JobGraph(TEST_JOB_ID,
"Test Job", vertex1, vertex2, vertex3);
- jobGraph.setScheduleMode(ScheduleMode.EAGER);
- final ExecutionGraph eg = TestingExecutionGraphBuilder
- .newBuilder()
- .setJobGraph(jobGraph)
- .setSlotProvider(slots)
- .setAllocationTimeout(Time.minutes(60))
- .build();
-
- startAndScheduleExecutionGraph(eg);
-
- slotPool.failAllocation(
- allocationId,
- new Exception("test exception"));
-
- eg.waitUntilTerminal();
- }
- }
-
- @Test
- public void testRestartWithEagerSchedulingAndSlotSharing() throws
Exception {
- final int parallelism = 20;
-
- try (SlotPool slotPool = createSlotPoolImpl()) {
- final Scheduler scheduler =
createSchedulerWithSlots(slotPool, new LocalTaskManagerLocation(), parallelism);
-
- final SlotSharingGroup sharingGroup = new
SlotSharingGroup();
-
- final JobVertex source = new JobVertex("source");
- source.setInvokableClass(NoOpInvokable.class);
- source.setParallelism(parallelism);
- source.setSlotSharingGroup(sharingGroup);
-
- final JobVertex sink = new JobVertex("sink");
- sink.setInvokableClass(NoOpInvokable.class);
- sink.setParallelism(parallelism);
- sink.setSlotSharingGroup(sharingGroup);
- sink.connectNewDataSetAsInput(source,
DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
-
- TestRestartStrategy restartStrategy =
TestRestartStrategy.directExecuting();
-
- final JobGraph jobGraph = new JobGraph(TEST_JOB_ID,
"Test Job", source, sink);
- jobGraph.setScheduleMode(ScheduleMode.EAGER);
- final ExecutionGraph eg = TestingExecutionGraphBuilder
- .newBuilder()
- .setJobGraph(jobGraph)
- .setSlotProvider(scheduler)
- .setRestartStrategy(restartStrategy)
- .build();
-
- startAndScheduleExecutionGraph(eg);
-
- switchToRunning(eg);
-
- // fail into 'RESTARTING'
-
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
- new Exception("intended test failure"));
-
- assertEquals(JobStatus.FAILING, eg.getState());
-
- completeCancellingForAllVertices(eg);
-
- assertEquals(JobStatus.RUNNING, eg.getState());
-
- // clean termination
- switchToRunning(eg);
- finishAllVertices(eg);
-
- assertEquals(JobStatus.FINISHED, eg.getState());
- }
- }
-
- /**
- * Tests that the {@link ExecutionGraph} can handle failures while
- * being in the RESTARTING state.
- */
- @Test
- public void testFailureWhileRestarting() throws Exception {
-
- final TestRestartStrategy restartStrategy =
TestRestartStrategy.manuallyTriggered();
- final ExecutionGraph executionGraph =
TestingExecutionGraphBuilder.newBuilder()
- .setJobGraph(createJobGraph())
- .setRestartStrategy(restartStrategy)
- .setSlotProvider(new TestingSlotProvider(ignored -> new
CompletableFuture<>()))
- .build();
-
- startAndScheduleExecutionGraph(executionGraph);
-
- assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
-
- executionGraph.failGlobal(new FlinkException("Test exception"));
-
- restartStrategy.triggerAll().join();
-
- executionGraph.failGlobal(new FlinkException("Concurrent
exception"));
-
- restartStrategy.triggerAll().join();
-
- assertEquals(JobStatus.RUNNING, executionGraph.getState());
- }
-
- @Test
- public void
failGlobalIfExecutionIsStillRunning_failingAnExecutionTwice_ShouldTriggerOnlyOneFailover()
throws Exception {
- JobVertex sender =
ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
- JobVertex receiver =
ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
- JobGraph jobGraph = new JobGraph("Pointwise job", sender,
receiver);
-
- try (SlotPool slotPool = createSlotPoolImpl()) {
- ExecutionGraph eg = TestingExecutionGraphBuilder
- .newBuilder()
- .setRestartStrategy(new TestRestartStrategy(1,
false))
- .setJobGraph(jobGraph)
-
.setSlotProvider(createSchedulerWithSlots(slotPool, new
LocalTaskManagerLocation(), 2))
- .build();
-
- startAndScheduleExecutionGraph(eg);
-
- Iterator<ExecutionVertex> executionVertices =
eg.getAllExecutionVertices().iterator();
-
- Execution finishedExecution =
executionVertices.next().getCurrentExecutionAttempt();
- Execution failedExecution =
executionVertices.next().getCurrentExecutionAttempt();
-
- finishedExecution.markFinished();
-
- failedExecution.fail(new Exception("Test Exception"));
- failedExecution.completeCancelling();
-
- assertEquals(JobStatus.RUNNING, eg.getState());
-
- // At this point all resources have been assigned
- for (ExecutionVertex vertex :
eg.getAllExecutionVertices()) {
- assertNotNull("No assigned resource (test
instability).", vertex.getCurrentAssignedResource());
-
vertex.getCurrentExecutionAttempt().switchToRunning();
- }
-
- // fail global with old finished execution, this should
not affect the execution
- eg.failGlobalIfExecutionIsStillRunning(new
Exception("This should have no effect"), finishedExecution.getAttemptId());
-
- assertThat(eg.getState(), is(JobStatus.RUNNING));
-
- // the state of the finished execution should have not
changed since it is terminal
- assertThat(finishedExecution.getState(),
is(ExecutionState.FINISHED));
- }
- }
-
//
------------------------------------------------------------------------
// Utilities
//
------------------------------------------------------------------------
- private static void startAndScheduleExecutionGraph(ExecutionGraph
executionGraph) throws Exception {
- executionGraph.start(mainThreadExecutor);
- assertThat(executionGraph.getState(), is(JobStatus.CREATED));
- executionGraph.scheduleForExecution();
- assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
+ private static void startScheduling(SchedulerBase scheduler) throws
Exception {
+ scheduler.setMainThreadExecutor(mainThreadExecutor);
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.CREATED));
+ scheduler.startScheduling();
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
}
private static Scheduler createSchedulerWithSlots(SlotPool slotPool)
throws Exception {
@@ -822,45 +347,4 @@ public class ExecutionGraphRestartTest extends TestLogger {
jobGraph.setExecutionConfig(executionConfig);
return jobGraph;
}
-
- /**
- * Test failover strategy which records local failover count.
- */
- static class TestFailoverStrategy extends FailoverStrategy {
-
- private int localFailoverCount = 0;
-
- @Override
- public void onTaskFailure(Execution taskExecution, Throwable
cause) {
- localFailoverCount++;
- }
-
- @Override
- public void notifyNewVertices(List<ExecutionJobVertex>
newJobVerticesTopological) {
- }
-
- @Override
- public String getStrategyName() {
- return "Test Failover Strategy";
- }
-
- int getLocalFailoverCount() {
- return localFailoverCount;
- }
-
- //
------------------------------------------------------------------------
- // factory
- //
------------------------------------------------------------------------
-
- /**
- * Factory that instantiates the TestFailoverStrategy.
- */
- public static class Factory implements FailoverStrategy.Factory
{
-
- @Override
- public FailoverStrategy create(ExecutionGraph
executionGraph) {
- return new TestFailoverStrategy();
- }
- }
- }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 325086c..68852ff 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobStatus;
import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
@@ -229,8 +230,10 @@ public class ExecutionGraphSuspendTest extends TestLogger {
*/
@Test
public void testSuspendWhileRestarting() throws Exception {
+ final ManuallyTriggeredScheduledExecutor taskRestartExecutor =
new ManuallyTriggeredScheduledExecutor();
final SchedulerBase scheduler =
SchedulerTestingUtils.newSchedulerBuilder(new JobGraph())
.setRestartBackoffTimeStrategy(new
TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE))
+ .setDelayExecutor(taskRestartExecutor)
.build();
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
@@ -254,6 +257,9 @@ public class ExecutionGraphSuspendTest extends TestLogger {
assertEquals(JobStatus.SUSPENDED, eg.getState());
assertEquals(exception, eg.getFailureCause());
+
+ taskRestartExecutor.triggerScheduledTasks();
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
}
//
------------------------------------------------------------------------