Repository: flink
Updated Branches:
  refs/heads/release-1.1 44f48b34e -> ba5aa10b9


[FLINK-5934] Set the Scheduler in the ExecutionGraph via its constructor

Before the scheduler was set when calling 
ExecutionGraph.scheduleForExecution(). This
has the disadvantage that the ExecutionGraph has not scheduler set if something 
else
went wrong before the scheduleForExecution call. Consequently, the job will be 
stuck
in a restart loop because the recovery will fail if there is no Scheduler set. 
In
order to solve the problem, the Scheduler is not passed to the ExecutionGraph 
when
it is created.

This closes #3441.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba5aa10b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba5aa10b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba5aa10b

Branch: refs/heads/release-1.1
Commit: ba5aa10b9bd20f6137134fe8ef8c882ce9c40a7c
Parents: 44f48b3
Author: Till Rohrmann <[email protected]>
Authored: Tue Feb 28 15:20:47 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Mar 2 14:02:59 2017 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 19 ++---
 .../flink/runtime/jobmanager/JobManager.scala   |  3 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java | 24 +++---
 .../ExecutionGraphConstructionTest.java         | 25 ++++--
 .../ExecutionGraphDeploymentTest.java           | 44 +++++-----
 .../ExecutionGraphMetricsTest.java              |  3 +-
 .../ExecutionGraphRestartTest.java              | 39 +++++----
 .../ExecutionGraphSignalsTest.java              |  4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  5 +-
 .../ExecutionStateProgressTest.java             |  4 +-
 .../executiongraph/LocalInputSplitsTest.java    | 41 ++++-----
 .../executiongraph/PointwisePatternTest.java    | 22 +++--
 .../TerminalStateDeadlockTest.java              |  5 +-
 .../VertexLocationConstraintTest.java           | 88 +++++++++++---------
 .../executiongraph/VertexSlotSharingTest.java   |  4 +-
 .../TaskManagerLossFailsTasksTest.scala         |  5 +-
 .../partitioner/RescalePartitionerTest.java     |  2 +
 17 files changed, 195 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index bfd93e4..dde9ef7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -221,6 +221,7 @@ public class ExecutionGraph implements Serializable {
 
        /** Checkpoint stats tracker seperate from the coordinator in order to 
be
         * available after archiving. */
+       @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private CheckpointStatsTracker checkpointStatsTracker;
 
        /** The execution context which is used to execute futures. */
@@ -252,7 +253,8 @@ public class ExecutionGraph implements Serializable {
                        Configuration jobConfig,
                        SerializedValue<ExecutionConfig> serializedConfig,
                        FiniteDuration timeout,
-                       RestartStrategy restartStrategy) throws IOException {
+                       RestartStrategy restartStrategy,
+                       Scheduler scheduler) throws IOException {
                this(
                        futureExecutor,
                        ioExecutor,
@@ -264,6 +266,7 @@ public class ExecutionGraph implements Serializable {
                        restartStrategy,
                        new ArrayList<BlobKey>(),
                        new ArrayList<URL>(),
+                       scheduler,
                        ExecutionGraph.class.getClassLoader(),
                        new UnregisteredMetricsGroup()
                );
@@ -280,13 +283,13 @@ public class ExecutionGraph implements Serializable {
                        RestartStrategy restartStrategy,
                        List<BlobKey> requiredJarFiles,
                        List<URL> requiredClasspaths,
+                       Scheduler scheduler,
                        ClassLoader userClassLoader,
                        MetricGroup metricGroup) throws IOException {
 
                checkNotNull(jobId);
                checkNotNull(jobName);
                checkNotNull(jobConfig);
-               checkNotNull(userClassLoader);
 
                this.jobInformation = new JobInformation(
                        jobId,
@@ -303,7 +306,8 @@ public class ExecutionGraph implements Serializable {
                this.futureExecutionContext = 
ExecutionContext$.MODULE$.fromExecutor(futureExecutor);
                this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
 
-               this.userClassLoader = userClassLoader;
+               this.scheduler = Preconditions.checkNotNull(scheduler, 
"scheduler");
+               this.userClassLoader = 
Preconditions.checkNotNull(userClassLoader, "userClassLoader");
 
                this.tasks = new ConcurrentHashMap<JobVertexID, 
ExecutionJobVertex>();
                this.intermediateResults = new 
ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>();
@@ -749,17 +753,12 @@ public class ExecutionGraph implements Serializable {
                }
        }
 
-       public void scheduleForExecution(Scheduler scheduler) throws 
JobException {
+       public void scheduleForExecution() throws JobException {
                if (scheduler == null) {
                        throw new IllegalArgumentException("Scheduler must not 
be null.");
                }
 
-               if (this.scheduler != null && this.scheduler != scheduler) {
-                       throw new IllegalArgumentException("Cannot use 
different schedulers for the same job");
-               }
-
                if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
-                       this.scheduler = scheduler;
 
                        switch (scheduleMode) {
 
@@ -976,7 +975,7 @@ public class ExecutionGraph implements Serializable {
                                }
                        }
 
-                       scheduleForExecution(scheduler);
+                       scheduleForExecution();
                }
                catch (Throwable t) {
                        LOG.warn("Failed to restart the job.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1720d94..c0ede0d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1156,6 +1156,7 @@ class JobManager(
               restartStrategy,
               jobGraph.getUserJarBlobKeys,
               jobGraph.getClasspaths,
+              scheduler,
               userCodeLoader,
               jobMetrics)
 
@@ -1366,7 +1367,7 @@ class JobManager(
             // the job.
             log.info(s"Scheduling job $jobId ($jobName).")
 
-            executionGraph.scheduleForExecution(scheduler)
+            executionGraph.scheduleForExecution()
           } else {
             // Remove the job graph. Otherwise it will be lingering around and 
possibly removed from
             // ZooKeeper by this JM.

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index c23aaf8..cbbe80e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.AfterClass;
@@ -125,18 +126,19 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                        CheckpointIDCounter counter,
                        CompletedCheckpointStore store) throws Exception {
                ExecutionGraph executionGraph = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
                        TestingUtils.defaultExecutionContext(),
-                               new JobID(),
-                               "test",
-                               new Configuration(),
-                               new SerializedValue<>(new ExecutionConfig()),
-                               new FiniteDuration(1, TimeUnit.DAYS),
-                               new NoRestartStrategy(),
-                               Collections.<BlobKey>emptyList(),
-                               Collections.<URL>emptyList(),
-                               ClassLoader.getSystemClassLoader(),
-                               new UnregisteredMetricsGroup());
+                       TestingUtils.defaultExecutionContext(),
+                       new JobID(),
+                       "test",
+                       new Configuration(),
+                       new SerializedValue<>(new ExecutionConfig()),
+                       new FiniteDuration(1, TimeUnit.DAYS),
+                       new NoRestartStrategy(),
+                       Collections.<BlobKey>emptyList(),
+                       Collections.<URL>emptyList(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()),
+                       ClassLoader.getSystemClassLoader(),
+                       new UnregisteredMetricsGroup());
 
                executionGraph.enableSnapshotCheckpointing(
                                100,

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index bf3a17c..ea25dca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -119,7 +120,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -169,7 +171,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -244,7 +247,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -504,7 +508,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -569,7 +574,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                        fail("Attached wrong jobgraph");
@@ -638,7 +644,8 @@ public class ExecutionGraphConstructionTest {
                                cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        try {
                                eg.attachJobGraph(ordered);
                        }
@@ -685,7 +692,8 @@ public class ExecutionGraphConstructionTest {
                                cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
 
                        try {
                                eg.attachJobGraph(ordered);
@@ -767,7 +775,8 @@ public class ExecutionGraphConstructionTest {
                                cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        
                        
eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index cc0ffba..5dd06b7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -52,7 +52,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -100,7 +99,8 @@ public class ExecutionGraphDeploymentTest {
                                new Configuration(),
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
 
                        List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
@@ -315,6 +315,14 @@ public class ExecutionGraphDeploymentTest {
 
                v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
 
+               Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
+               for (int i = 0; i < dop1; i++) {
+                       scheduler.newInstanceAvailable(
+                               ExecutionGraphTestUtils.getInstance(
+                                       new 
ExecutionGraphTestUtils.SimpleActorGateway(
+                                               
TestingUtils.directExecutionContext())));
+               }
+
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.directExecutionContext(),
@@ -324,24 +332,18 @@ public class ExecutionGraphDeploymentTest {
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       scheduler);
 
                eg.setQueuedSchedulingAllowed(false);
 
                List<JobVertex> ordered = Arrays.asList(v1, v2);
                eg.attachJobGraph(ordered);
 
-               Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
-               for (int i = 0; i < dop1; i++) {
-                       scheduler.newInstanceAvailable(
-                               ExecutionGraphTestUtils.getInstance(
-                                       new 
ExecutionGraphTestUtils.SimpleActorGateway(
-                                               
TestingUtils.directExecutionContext())));
-               }
                assertEquals(dop1, scheduler.getNumberOfAvailableSlots());
 
                // schedule, this triggers mock deployment
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
 
                ExecutionAttemptID attemptID = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
                eg.updateState(new TaskExecutionState(jobId, attemptID, 
ExecutionState.RUNNING));
@@ -359,6 +361,14 @@ public class ExecutionGraphDeploymentTest {
                v1.setInvokableClass(BatchTask.class);
                v2.setInvokableClass(BatchTask.class);
 
+               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+               for (int i = 0; i < dop1 + dop2; i++) {
+                       scheduler.newInstanceAvailable(
+                               ExecutionGraphTestUtils.getInstance(
+                                       new 
ExecutionGraphTestUtils.SimpleActorGateway(
+                                               
TestingUtils.directExecutionContext())));
+               }
+
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.directExecutionContext(),
@@ -368,24 +378,18 @@ public class ExecutionGraphDeploymentTest {
                        new Configuration(), 
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       scheduler);
                
                eg.setQueuedSchedulingAllowed(false);
 
                List<JobVertex> ordered = Arrays.asList(v1, v2);
                eg.attachJobGraph(ordered);
 
-               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-               for (int i = 0; i < dop1 + dop2; i++) {
-                       scheduler.newInstanceAvailable(
-                                       ExecutionGraphTestUtils.getInstance(
-                                                       new 
ExecutionGraphTestUtils.SimpleActorGateway(
-                                                                       
TestingUtils.directExecutionContext())));
-               }
                assertEquals(dop1 + dop2, 
scheduler.getNumberOfAvailableSlots());
 
                // schedule, this triggers mock deployment
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
 
                Map<ExecutionAttemptID, Execution> executions = 
eg.getRegisteredExecutions();
                assertEquals(dop1 + dop2, executions.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 3e6aba5..29e1c14 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -144,6 +144,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                        testingRestartStrategy,
                        Collections.<BlobKey>emptyList(),
                        Collections.<URL>emptyList(),
+                       scheduler,
                        getClass().getClassLoader(),
                        metricGroup);
 
@@ -161,7 +162,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                // start execution
-               executionGraph.scheduleForExecution(scheduler);
+               executionGraph.scheduleForExecution();
 
                assertTrue(0L == restartingTime.getValue());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 659a912..db41eb8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -112,12 +112,12 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                
                //initiate and schedule job
                JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, 
groupVertex2);
-               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 0L));
+               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 0L), scheduler);
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
                
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
                
                //sanity checks
@@ -235,7 +235,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        // We want to manually control the restart and delay
-                       new FixedDelayRestartStrategy(Integer.MAX_VALUE, 
Long.MAX_VALUE));
+                       new FixedDelayRestartStrategy(Integer.MAX_VALUE, 
Long.MAX_VALUE),
+                       scheduler);
 
                JobVertex jobVertex = new JobVertex("NoOpInvokable");
                jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
@@ -247,7 +248,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                assertEquals(JobStatus.CREATED, executionGraph.getState());
 
-               executionGraph.scheduleForExecution(scheduler);
+               executionGraph.scheduleForExecution();
 
                assertEquals(JobStatus.RUNNING, executionGraph.getState());
 
@@ -378,12 +379,12 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                JobVertex sender = newJobVertex("Task1", 1, 
Tasks.NoOpInvokable.class);
                JobVertex receiver = newJobVertex("Task2", 1, 
Tasks.NoOpInvokable.class);
                JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
-               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000));
+               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000), scheduler);
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
 
                Iterator<ExecutionVertex> executionVertices = 
eg.getAllExecutionVertices().iterator();
@@ -446,13 +447,13 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                JobGraph jobGraph = new JobGraph("Test Job", vertex);
                jobGraph.setExecutionConfig(executionConfig);
 
-               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000000));
+               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000000), scheduler);
 
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
 
                // Fail right after cancel (for example with concurrent slot 
release)
@@ -491,13 +492,13 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                JobGraph jobGraph = new JobGraph("Test Job", vertex);
                jobGraph.setExecutionConfig(executionConfig);
 
-               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000000));
+               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000000), scheduler);
 
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
 
                // Fail right after cancel (for example with concurrent slot 
release)
@@ -546,13 +547,14 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       controllableRestartStrategy);
+                       controllableRestartStrategy,
+                       scheduler);
 
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
 
                assertEquals(JobStatus.RUNNING, eg.getState());
 
@@ -650,7 +652,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
-               ExecutionGraph eg = newExecutionGraph(restartStrategy);
+               ExecutionGraph eg = newExecutionGraph(restartStrategy, 
scheduler);
                if (isSpy) {
                        eg = spy(eg);
                }
@@ -658,7 +660,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
                return new Tuple2<>(eg, instance);
        }
@@ -670,7 +672,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                return groupVertex;
        }
 
-       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy) throws IOException {
+       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy, Scheduler scheduler) throws IOException {
                return new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
                        TestingUtils.defaultExecutionContext(),
@@ -679,7 +681,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       restartStrategy);
+                       restartStrategy,
+                       scheduler);
+       }
+
+       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy) throws IOException {
+               return newExecutionGraph(restartStrategy, new 
Scheduler(TestingUtils.defaultExecutionContext()));
        }
 
        private static void restartAfterFailure(ExecutionGraph eg, 
FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index fde967e..fab6505 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -139,7 +140,8 @@ public class ExecutionGraphSignalsTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                eg.attachJobGraph(ordered);
 
                f = eg.getClass().getDeclaredField("state");

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 89d8a9b..cf0136d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import 
org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
@@ -56,6 +57,7 @@ import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
 
 public class ExecutionGraphTestUtils {
 
@@ -180,7 +182,8 @@ public class ExecutionGraphTestUtils {
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new 
Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor)));
 
                ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 
1,
                                AkkaUtils.getDefaultTimeout()));

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 955c9db..64e1e12 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
@@ -58,7 +59,8 @@ public class ExecutionStateProgressTest {
                                new Configuration(),
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        graph.attachJobGraph(Collections.singletonList(ajv));
 
                        setGraphStatus(graph, JobStatus.RUNNING);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index 11dad92..b5e04cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -268,20 +268,7 @@ public class LocalInputSplitsTest {
                        vertex.setInputSplitSource(new 
TestInputSplitSource(splits));
                        
                        JobGraph jobGraph = new JobGraph("test job", vertex);
-                       
-                       ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               TestingUtils.defaultExecutionContext(),
-                               jobGraph.getJobID(),
-                               jobGraph.getName(),  
-                               jobGraph.getJobConfiguration(),
-                               new SerializedValue<>(new ExecutionConfig()),
-                               TIMEOUT,
-                               new NoRestartStrategy());
-                       
-                       
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-                       eg.setQueuedSchedulingAllowed(false);
-                       
+
                        // create a scheduler with 6 instances where always two 
are on the same host
                        Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, 
"host1", 1);
@@ -297,7 +284,21 @@ public class LocalInputSplitsTest {
                        scheduler.newInstanceAvailable(i5);
                        scheduler.newInstanceAvailable(i6);
                        
-                       eg.scheduleForExecution(scheduler);
+                       ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
+                               jobGraph.getJobID(),
+                               jobGraph.getName(),  
+                               jobGraph.getJobConfiguration(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               TIMEOUT,
+                               new NoRestartStrategy(),
+                               scheduler);
+                       
+                       
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+                       eg.setQueuedSchedulingAllowed(false);
+                       
+                       eg.scheduleForExecution();
                        
                        ExecutionVertex[] tasks = 
eg.getVerticesTopologically().iterator().next().getTaskVertices();
                        assertEquals(6, tasks.length);
@@ -334,6 +335,8 @@ public class LocalInputSplitsTest {
                vertex.setInputSplitSource(new TestInputSplitSource(splits));
                
                JobGraph jobGraph = new JobGraph("test job", vertex);
+
+               Scheduler scheduler = getScheduler(numHosts, slotsPerHost);
                
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
@@ -343,14 +346,14 @@ public class LocalInputSplitsTest {
                        jobGraph.getJobConfiguration(),
                                new SerializedValue<>(new ExecutionConfig()),
                        TIMEOUT,
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       scheduler);
                
                eg.setQueuedSchedulingAllowed(false);
                
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-               
-               Scheduler scheduler = getScheduler(numHosts, slotsPerHost);
-               eg.scheduleForExecution(scheduler);
+
+               eg.scheduleForExecution();
                
                ExecutionVertex[] tasks = 
eg.getVerticesTopologically().iterator().next().getTaskVertices();
                assertEquals(parallelism, tasks.length);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 0e147e3..b59dcda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -73,7 +74,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -119,7 +121,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -166,7 +169,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -214,7 +218,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -260,7 +265,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -326,7 +332,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -383,7 +390,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 504822b..11e382f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -190,11 +190,12 @@ public class TerminalStateDeadlockTest {
                                EMPTY_CONFIG,
                                new SerializedValue<>(new ExecutionConfig()),
                                TIMEOUT,
-                               new FixedDelayRestartStrategy(1, 0));
+                               new FixedDelayRestartStrategy(1, 0),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                }
 
                @Override
-               public void scheduleForExecution(Scheduler scheduler) {
+               public void scheduleForExecution() {
                        // notify that we are done with the "restarting"
                        synchronized (this) {
                                done = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index b160561..67e3ede 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -88,7 +88,8 @@ public class VertexLocationConstraintTest {
                                jg.getJobConfiguration(),
                                new SerializedValue<>(new ExecutionConfig()),
                                timeout,
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
                        
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
@@ -155,14 +156,15 @@ public class VertexLocationConstraintTest {
                        JobGraph jg = new JobGraph("test job", jobVertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       TestingUtils.defaultExecutionContext(),
-                                       jg.getJobID(),
-                                       jg.getName(),
-                                       jg.getJobConfiguration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       timeout,
-                                       new NoRestartStrategy());
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
+                               jg.getJobID(),
+                               jg.getName(),
+                               jg.getJobConfiguration(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               timeout,
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
                        
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
@@ -233,14 +235,15 @@ public class VertexLocationConstraintTest {
                        JobGraph jg = new JobGraph("test job", jobVertex1, 
jobVertex2);
                        
                        ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       TestingUtils.defaultExecutionContext(),
-                                       jg.getJobID(),
-                                       jg.getName(),
-                                       jg.getJobConfiguration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       timeout,
-                                       new NoRestartStrategy());
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
+                               jg.getJobID(),
+                               jg.getName(),
+                               jg.getJobConfiguration(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               timeout,
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
                        
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex1.getID());
@@ -302,14 +305,15 @@ public class VertexLocationConstraintTest {
                        JobGraph jg = new JobGraph("test job", jobVertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       TestingUtils.defaultExecutionContext(),
-                                       jg.getJobID(),
-                                       jg.getName(),
-                                       jg.getJobConfiguration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       timeout,
-                                       new NoRestartStrategy());
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
+                               jg.getJobID(),
+                               jg.getName(),
+                               jg.getJobConfiguration(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               timeout,
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
                        
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
@@ -373,14 +377,15 @@ public class VertexLocationConstraintTest {
                        jobVertex2.setSlotSharingGroup(sharingGroup);
                        
                        ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       TestingUtils.defaultExecutionContext(),
-                                       jg.getJobID(),
-                                       jg.getName(),
-                                       jg.getJobConfiguration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       timeout,
-                                       new NoRestartStrategy());
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
+                               jg.getJobID(),
+                               jg.getName(),
+                               jg.getJobConfiguration(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               timeout,
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
                        
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex1.getID());
@@ -415,14 +420,15 @@ public class VertexLocationConstraintTest {
                        JobGraph jg = new JobGraph("test job", vertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       TestingUtils.defaultExecutionContext(),
-                                       jg.getJobID(),
-                                       jg.getName(),
-                                       jg.getJobConfiguration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       timeout,
-                                       new NoRestartStrategy());
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
+                               jg.getJobID(),
+                               jg.getName(),
+                               jg.getJobConfiguration(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               timeout,
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        eg.attachJobGraph(Collections.singletonList(vertex));
                        
                        ExecutionVertex ev = 
eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 27708a2..5c85592 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -87,7 +88,8 @@ public class VertexSlotSharingTest {
                                new Configuration(),
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        eg.attachJobGraph(vertices);
                        
                        // verify that the vertices are all in the same slot 
sharing group

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 7c77ce7..7470888 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -61,13 +61,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike 
with Matchers {
           new Configuration(),
           new SerializedValue(new ExecutionConfig()),
           AkkaUtils.getDefaultTimeout,
-          new NoRestartStrategy())
+          new NoRestartStrategy(),
+          scheduler)
 
         eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
 
         eg.getState should equal(JobStatus.CREATED)
 
-        eg.scheduleForExecution(scheduler)
+        eg.scheduleForExecution()
         eg.getState should equal(JobStatus.RUNNING)
 
         instance1.markDead()

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 03e3535..9660c9e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -144,6 +145,7 @@ public class RescalePartitionerTest extends TestLogger {
                        new NoRestartStrategy(),
                        new ArrayList<BlobKey>(),
                        new ArrayList<URL>(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()),
                        ExecutionGraph.class.getClassLoader(),
                        new UnregisteredMetricsGroup());
                try {

Reply via email to