Repository: flink Updated Branches: refs/heads/release-1.1 44f48b34e -> ba5aa10b9
[FLINK-5934] Set the Scheduler in the ExecutionGraph via its constructor Before the scheduler was set when calling ExecutionGraph.scheduleForExecution(). This has the disadvantage that the ExecutionGraph has not scheduler set if something else went wrong before the scheduleForExecution call. Consequently, the job will be stuck in a restart loop because the recovery will fail if there is no Scheduler set. In order to solve the problem, the Scheduler is not passed to the ExecutionGraph when it is created. This closes #3441. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba5aa10b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba5aa10b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba5aa10b Branch: refs/heads/release-1.1 Commit: ba5aa10b9bd20f6137134fe8ef8c882ce9c40a7c Parents: 44f48b3 Author: Till Rohrmann <[email protected]> Authored: Tue Feb 28 15:20:47 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Mar 2 14:02:59 2017 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 19 ++--- .../flink/runtime/jobmanager/JobManager.scala | 3 +- ...ExecutionGraphCheckpointCoordinatorTest.java | 24 +++--- .../ExecutionGraphConstructionTest.java | 25 ++++-- .../ExecutionGraphDeploymentTest.java | 44 +++++----- .../ExecutionGraphMetricsTest.java | 3 +- .../ExecutionGraphRestartTest.java | 39 +++++---- .../ExecutionGraphSignalsTest.java | 4 +- .../executiongraph/ExecutionGraphTestUtils.java | 5 +- .../ExecutionStateProgressTest.java | 4 +- .../executiongraph/LocalInputSplitsTest.java | 41 ++++----- .../executiongraph/PointwisePatternTest.java | 22 +++-- .../TerminalStateDeadlockTest.java | 5 +- .../VertexLocationConstraintTest.java | 88 +++++++++++--------- .../executiongraph/VertexSlotSharingTest.java | 4 +- .../TaskManagerLossFailsTasksTest.scala | 5 +- .../partitioner/RescalePartitionerTest.java | 2 + 17 files changed, 195 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index bfd93e4..dde9ef7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -221,6 +221,7 @@ public class ExecutionGraph implements Serializable { /** Checkpoint stats tracker seperate from the coordinator in order to be * available after archiving. */ + @SuppressWarnings("NonSerializableFieldInSerializableClass") private CheckpointStatsTracker checkpointStatsTracker; /** The execution context which is used to execute futures. */ @@ -252,7 +253,8 @@ public class ExecutionGraph implements Serializable { Configuration jobConfig, SerializedValue<ExecutionConfig> serializedConfig, FiniteDuration timeout, - RestartStrategy restartStrategy) throws IOException { + RestartStrategy restartStrategy, + Scheduler scheduler) throws IOException { this( futureExecutor, ioExecutor, @@ -264,6 +266,7 @@ public class ExecutionGraph implements Serializable { restartStrategy, new ArrayList<BlobKey>(), new ArrayList<URL>(), + scheduler, ExecutionGraph.class.getClassLoader(), new UnregisteredMetricsGroup() ); @@ -280,13 +283,13 @@ public class ExecutionGraph implements Serializable { RestartStrategy restartStrategy, List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths, + Scheduler scheduler, ClassLoader userClassLoader, MetricGroup metricGroup) throws IOException { checkNotNull(jobId); checkNotNull(jobName); checkNotNull(jobConfig); - checkNotNull(userClassLoader); this.jobInformation = new JobInformation( jobId, @@ -303,7 +306,8 @@ public class ExecutionGraph implements Serializable { this.futureExecutionContext = ExecutionContext$.MODULE$.fromExecutor(futureExecutor); this.ioExecutor = Preconditions.checkNotNull(ioExecutor); - this.userClassLoader = userClassLoader; + this.scheduler = Preconditions.checkNotNull(scheduler, "scheduler"); + this.userClassLoader = Preconditions.checkNotNull(userClassLoader, "userClassLoader"); this.tasks = new ConcurrentHashMap<JobVertexID, ExecutionJobVertex>(); this.intermediateResults = new ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>(); @@ -749,17 +753,12 @@ public class ExecutionGraph implements Serializable { } } - public void scheduleForExecution(Scheduler scheduler) throws JobException { + public void scheduleForExecution() throws JobException { if (scheduler == null) { throw new IllegalArgumentException("Scheduler must not be null."); } - if (this.scheduler != null && this.scheduler != scheduler) { - throw new IllegalArgumentException("Cannot use different schedulers for the same job"); - } - if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { - this.scheduler = scheduler; switch (scheduleMode) { @@ -976,7 +975,7 @@ public class ExecutionGraph implements Serializable { } } - scheduleForExecution(scheduler); + scheduleForExecution(); } catch (Throwable t) { LOG.warn("Failed to restart the job.", t); http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 1720d94..c0ede0d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1156,6 +1156,7 @@ class JobManager( restartStrategy, jobGraph.getUserJarBlobKeys, jobGraph.getClasspaths, + scheduler, userCodeLoader, jobMetrics) @@ -1366,7 +1367,7 @@ class JobManager( // the job. log.info(s"Scheduling job $jobId ($jobName).") - executionGraph.scheduleForExecution(scheduler) + executionGraph.scheduleForExecution() } else { // Remove the job graph. Otherwise it will be lingering around and possibly removed from // ZooKeeper by this JM. http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index c23aaf8..cbbe80e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; import org.junit.AfterClass; @@ -125,18 +126,19 @@ public class ExecutionGraphCheckpointCoordinatorTest { CheckpointIDCounter counter, CompletedCheckpointStore store) throws Exception { ExecutionGraph executionGraph = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), TestingUtils.defaultExecutionContext(), - new JobID(), - "test", - new Configuration(), - new SerializedValue<>(new ExecutionConfig()), - new FiniteDuration(1, TimeUnit.DAYS), - new NoRestartStrategy(), - Collections.<BlobKey>emptyList(), - Collections.<URL>emptyList(), - ClassLoader.getSystemClassLoader(), - new UnregisteredMetricsGroup()); + TestingUtils.defaultExecutionContext(), + new JobID(), + "test", + new Configuration(), + new SerializedValue<>(new ExecutionConfig()), + new FiniteDuration(1, TimeUnit.DAYS), + new NoRestartStrategy(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + new Scheduler(TestingUtils.defaultExecutionContext()), + ClassLoader.getSystemClassLoader(), + new UnregisteredMetricsGroup()); executionGraph.enableSnapshotCheckpointing( 100, http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index bf3a17c..ea25dca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -119,7 +120,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -169,7 +171,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -244,7 +247,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -504,7 +508,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -569,7 +574,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); fail("Attached wrong jobgraph"); @@ -638,7 +644,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -685,7 +692,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); @@ -767,7 +775,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources()); http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index cc0ffba..5dd06b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -52,7 +52,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -100,7 +99,8 @@ public class ExecutionGraphDeploymentTest { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4); @@ -315,6 +315,14 @@ public class ExecutionGraphDeploymentTest { v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); + for (int i = 0; i < dop1; i++) { + scheduler.newInstanceAvailable( + ExecutionGraphTestUtils.getInstance( + new ExecutionGraphTestUtils.SimpleActorGateway( + TestingUtils.directExecutionContext()))); + } + // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( TestingUtils.directExecutionContext(), @@ -324,24 +332,18 @@ public class ExecutionGraphDeploymentTest { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + scheduler); eg.setQueuedSchedulingAllowed(false); List<JobVertex> ordered = Arrays.asList(v1, v2); eg.attachJobGraph(ordered); - Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); - for (int i = 0; i < dop1; i++) { - scheduler.newInstanceAvailable( - ExecutionGraphTestUtils.getInstance( - new ExecutionGraphTestUtils.SimpleActorGateway( - TestingUtils.directExecutionContext()))); - } assertEquals(dop1, scheduler.getNumberOfAvailableSlots()); // schedule, this triggers mock deployment - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(); eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING)); @@ -359,6 +361,14 @@ public class ExecutionGraphDeploymentTest { v1.setInvokableClass(BatchTask.class); v2.setInvokableClass(BatchTask.class); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + for (int i = 0; i < dop1 + dop2; i++) { + scheduler.newInstanceAvailable( + ExecutionGraphTestUtils.getInstance( + new ExecutionGraphTestUtils.SimpleActorGateway( + TestingUtils.directExecutionContext()))); + } + // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( TestingUtils.directExecutionContext(), @@ -368,24 +378,18 @@ public class ExecutionGraphDeploymentTest { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + scheduler); eg.setQueuedSchedulingAllowed(false); List<JobVertex> ordered = Arrays.asList(v1, v2); eg.attachJobGraph(ordered); - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - for (int i = 0; i < dop1 + dop2; i++) { - scheduler.newInstanceAvailable( - ExecutionGraphTestUtils.getInstance( - new ExecutionGraphTestUtils.SimpleActorGateway( - TestingUtils.directExecutionContext()))); - } assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots()); // schedule, this triggers mock deployment - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions(); assertEquals(dop1 + dop2, executions.size()); http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index 3e6aba5..29e1c14 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -144,6 +144,7 @@ public class ExecutionGraphMetricsTest extends TestLogger { testingRestartStrategy, Collections.<BlobKey>emptyList(), Collections.<URL>emptyList(), + scheduler, getClass().getClassLoader(), metricGroup); @@ -161,7 +162,7 @@ public class ExecutionGraphMetricsTest extends TestLogger { executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); // start execution - executionGraph.scheduleForExecution(scheduler); + executionGraph.scheduleForExecution(); assertTrue(0L == restartingTime.getValue()); http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 659a912..db41eb8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -112,12 +112,12 @@ public class ExecutionGraphRestartTest extends TestLogger { //initiate and schedule job JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, groupVertex2); - ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L)); + ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); //sanity checks @@ -235,7 +235,8 @@ public class ExecutionGraphRestartTest extends TestLogger { new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), // We want to manually control the restart and delay - new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE)); + new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE), + scheduler); JobVertex jobVertex = new JobVertex("NoOpInvokable"); jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); @@ -247,7 +248,7 @@ public class ExecutionGraphRestartTest extends TestLogger { assertEquals(JobStatus.CREATED, executionGraph.getState()); - executionGraph.scheduleForExecution(scheduler); + executionGraph.scheduleForExecution(); assertEquals(JobStatus.RUNNING, executionGraph.getState()); @@ -378,12 +379,12 @@ public class ExecutionGraphRestartTest extends TestLogger { JobVertex sender = newJobVertex("Task1", 1, Tasks.NoOpInvokable.class); JobVertex receiver = newJobVertex("Task2", 1, Tasks.NoOpInvokable.class); JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver); - ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000)); + ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator(); @@ -446,13 +447,13 @@ public class ExecutionGraphRestartTest extends TestLogger { JobGraph jobGraph = new JobGraph("Test Job", vertex); jobGraph.setExecutionConfig(executionConfig); - ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000)); + ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); // Fail right after cancel (for example with concurrent slot release) @@ -491,13 +492,13 @@ public class ExecutionGraphRestartTest extends TestLogger { JobGraph jobGraph = new JobGraph("Test Job", vertex); jobGraph.setExecutionConfig(executionConfig); - ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000)); + ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); // Fail right after cancel (for example with concurrent slot release) @@ -546,13 +547,14 @@ public class ExecutionGraphRestartTest extends TestLogger { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - controllableRestartStrategy); + controllableRestartStrategy, + scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); @@ -650,7 +652,7 @@ public class ExecutionGraphRestartTest extends TestLogger { JobGraph jobGraph = new JobGraph("Pointwise job", sender); - ExecutionGraph eg = newExecutionGraph(restartStrategy); + ExecutionGraph eg = newExecutionGraph(restartStrategy, scheduler); if (isSpy) { eg = spy(eg); } @@ -658,7 +660,7 @@ public class ExecutionGraphRestartTest extends TestLogger { assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); return new Tuple2<>(eg, instance); } @@ -670,7 +672,7 @@ public class ExecutionGraphRestartTest extends TestLogger { return groupVertex; } - private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException { + private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, Scheduler scheduler) throws IOException { return new ExecutionGraph( TestingUtils.defaultExecutionContext(), TestingUtils.defaultExecutionContext(), @@ -679,7 +681,12 @@ public class ExecutionGraphRestartTest extends TestLogger { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - restartStrategy); + restartStrategy, + scheduler); + } + + private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException { + return newExecutionGraph(restartStrategy, new Scheduler(TestingUtils.defaultExecutionContext())); } private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException { http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index fde967e..fab6505 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -139,7 +140,8 @@ public class ExecutionGraphSignalsTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(ordered); f = eg.getClass().getDeclaredField("state"); http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 89d8a9b..cf0136d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -45,6 +45,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.Messages; import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions; @@ -56,6 +57,7 @@ import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import scala.concurrent.ExecutionContext; +import scala.concurrent.ExecutionContext$; public class ExecutionGraphTestUtils { @@ -180,7 +182,8 @@ public class ExecutionGraphTestUtils { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor))); ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout())); http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 955c9db..64e1e12 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; import org.junit.Test; @@ -58,7 +59,8 @@ public class ExecutionStateProgressTest { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); graph.attachJobGraph(Collections.singletonList(ajv)); setGraphStatus(graph, JobStatus.RUNNING); http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java index 11dad92..b5e04cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java @@ -268,20 +268,7 @@ public class LocalInputSplitsTest { vertex.setInputSplitSource(new TestInputSplitSource(splits)); JobGraph jobGraph = new JobGraph("test job", vertex); - - ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - TestingUtils.defaultExecutionContext(), - jobGraph.getJobID(), - jobGraph.getName(), - jobGraph.getJobConfiguration(), - new SerializedValue<>(new ExecutionConfig()), - TIMEOUT, - new NoRestartStrategy()); - - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); - eg.setQueuedSchedulingAllowed(false); - + // create a scheduler with 6 instances where always two are on the same host Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, "host1", 1); @@ -297,7 +284,21 @@ public class LocalInputSplitsTest { scheduler.newInstanceAvailable(i5); scheduler.newInstanceAvailable(i6); - eg.scheduleForExecution(scheduler); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + jobGraph.getJobID(), + jobGraph.getName(), + jobGraph.getJobConfiguration(), + new SerializedValue<>(new ExecutionConfig()), + TIMEOUT, + new NoRestartStrategy(), + scheduler); + + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + eg.setQueuedSchedulingAllowed(false); + + eg.scheduleForExecution(); ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices(); assertEquals(6, tasks.length); @@ -334,6 +335,8 @@ public class LocalInputSplitsTest { vertex.setInputSplitSource(new TestInputSplitSource(splits)); JobGraph jobGraph = new JobGraph("test job", vertex); + + Scheduler scheduler = getScheduler(numHosts, slotsPerHost); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), @@ -343,14 +346,14 @@ public class LocalInputSplitsTest { jobGraph.getJobConfiguration(), new SerializedValue<>(new ExecutionConfig()), TIMEOUT, - new NoRestartStrategy()); + new NoRestartStrategy(), + scheduler); eg.setQueuedSchedulingAllowed(false); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); - - Scheduler scheduler = getScheduler(numHosts, slotsPerHost); - eg.scheduleForExecution(scheduler); + + eg.scheduleForExecution(); ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices(); assertEquals(parallelism, tasks.length); http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index 0e147e3..b59dcda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -73,7 +74,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -119,7 +121,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -166,7 +169,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -214,7 +218,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -260,7 +265,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -326,7 +332,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -383,7 +390,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java index 504822b..11e382f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -190,11 +190,12 @@ public class TerminalStateDeadlockTest { EMPTY_CONFIG, new SerializedValue<>(new ExecutionConfig()), TIMEOUT, - new FixedDelayRestartStrategy(1, 0)); + new FixedDelayRestartStrategy(1, 0), + new Scheduler(TestingUtils.defaultExecutionContext())); } @Override - public void scheduleForExecution(Scheduler scheduler) { + public void scheduleForExecution() { // notify that we are done with the "restarting" synchronized (this) { done = true; http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java index b160561..67e3ede 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java @@ -88,7 +88,8 @@ public class VertexLocationConstraintTest { jg.getJobConfiguration(), new SerializedValue<>(new ExecutionConfig()), timeout, - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(Collections.singletonList(jobVertex)); ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID()); @@ -155,14 +156,15 @@ public class VertexLocationConstraintTest { JobGraph jg = new JobGraph("test job", jobVertex); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - TestingUtils.defaultExecutionContext(), - jg.getJobID(), - jg.getName(), - jg.getJobConfiguration(), - new SerializedValue<>(new ExecutionConfig()), - timeout, - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + jg.getJobID(), + jg.getName(), + jg.getJobConfiguration(), + new SerializedValue<>(new ExecutionConfig()), + timeout, + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(Collections.singletonList(jobVertex)); ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID()); @@ -233,14 +235,15 @@ public class VertexLocationConstraintTest { JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - TestingUtils.defaultExecutionContext(), - jg.getJobID(), - jg.getName(), - jg.getJobConfiguration(), - new SerializedValue<>(new ExecutionConfig()), - timeout, - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + jg.getJobID(), + jg.getName(), + jg.getJobConfiguration(), + new SerializedValue<>(new ExecutionConfig()), + timeout, + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID()); @@ -302,14 +305,15 @@ public class VertexLocationConstraintTest { JobGraph jg = new JobGraph("test job", jobVertex); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - TestingUtils.defaultExecutionContext(), - jg.getJobID(), - jg.getName(), - jg.getJobConfiguration(), - new SerializedValue<>(new ExecutionConfig()), - timeout, - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + jg.getJobID(), + jg.getName(), + jg.getJobConfiguration(), + new SerializedValue<>(new ExecutionConfig()), + timeout, + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(Collections.singletonList(jobVertex)); ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID()); @@ -373,14 +377,15 @@ public class VertexLocationConstraintTest { jobVertex2.setSlotSharingGroup(sharingGroup); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - TestingUtils.defaultExecutionContext(), - jg.getJobID(), - jg.getName(), - jg.getJobConfiguration(), - new SerializedValue<>(new ExecutionConfig()), - timeout, - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + jg.getJobID(), + jg.getName(), + jg.getJobConfiguration(), + new SerializedValue<>(new ExecutionConfig()), + timeout, + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID()); @@ -415,14 +420,15 @@ public class VertexLocationConstraintTest { JobGraph jg = new JobGraph("test job", vertex); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - TestingUtils.defaultExecutionContext(), - jg.getJobID(), - jg.getName(), - jg.getJobConfiguration(), - new SerializedValue<>(new ExecutionConfig()), - timeout, - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + jg.getJobID(), + jg.getName(), + jg.getJobConfiguration(), + new SerializedValue<>(new ExecutionConfig()), + timeout, + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(Collections.singletonList(vertex)); ExecutionVertex ev = eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0]; http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index 27708a2..5c85592 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -87,7 +88,8 @@ public class VertexSlotSharingTest { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(vertices); // verify that the vertices are all in the same slot sharing group http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala index 7c77ce7..7470888 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -61,13 +61,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers { new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout, - new NoRestartStrategy()) + new NoRestartStrategy(), + scheduler) eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) eg.getState should equal(JobStatus.CREATED) - eg.scheduleForExecution(scheduler) + eg.scheduleForExecution() eg.getState should equal(JobStatus.RUNNING) instance1.markDead() http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java index 03e3535..9660c9e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.streaming.api.datastream.DataStream; @@ -144,6 +145,7 @@ public class RescalePartitionerTest extends TestLogger { new NoRestartStrategy(), new ArrayList<BlobKey>(), new ArrayList<URL>(), + new Scheduler(TestingUtils.defaultExecutionContext()), ExecutionGraph.class.getClassLoader(), new UnregisteredMetricsGroup()); try {
