[FLINK-5934] Set the Scheduler in the ExecutionGraph via its constructor

Before the scheduler was set when calling 
ExecutionGraph.scheduleForExecution(). This
has the disadvantage that the ExecutionGraph has not scheduler set if something 
else
went wrong before the scheduleForExecution call. Consequently, the job will be 
stuck
in a restart loop because the recovery will fail if there is no Scheduler set. 
In
order to solve the problem, the Scheduler is not passed to the ExecutionGraph 
when
it is created.

This closes #3437.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c968563
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c968563
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c968563

Branch: refs/heads/master
Commit: 5c968563df26642f81ca94df391f31c51b4f37e6
Parents: cde3cdd
Author: Till Rohrmann <[email protected]>
Authored: Tue Feb 28 15:20:47 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Mar 2 12:23:10 2017 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  64 ++++---
 .../executiongraph/ExecutionGraphBuilder.java   |   3 +
 .../flink/runtime/jobmaster/JobMaster.java      |  33 ++--
 .../flink/runtime/jobmanager/JobManager.scala   |   3 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   2 +
 .../ArchivedExecutionGraphTest.java             |   5 +-
 .../ExecutionGraphConstructionTest.java         |  25 ++-
 .../ExecutionGraphDeploymentTest.java           |  47 ++---
 .../ExecutionGraphMetricsTest.java              |   3 +-
 .../ExecutionGraphRestartTest.java              |  39 +++--
 .../ExecutionGraphSchedulingTest.java           | 173 +++++++++----------
 .../ExecutionGraphSignalsTest.java              |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   5 +-
 .../ExecutionStateProgressTest.java             |   4 +-
 .../ExecutionVertexLocalityTest.java            |  26 +--
 .../executiongraph/LegacyJobVertexIdTest.java   |  20 ++-
 .../executiongraph/PointwisePatternTest.java    |  22 ++-
 .../TerminalStateDeadlockTest.java              |   5 +-
 .../executiongraph/VertexSlotSharingTest.java   |   4 +-
 .../TaskManagerLossFailsTasksTest.scala         |   5 +-
 .../partitioner/RescalePartitionerTest.java     |   2 +
 21 files changed, 274 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a76a421..5fa40fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -141,6 +141,12 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
         */
        private final SerializedValue<JobInformation> serializedJobInformation;
 
+       /** The executor which is used to execute futures. */
+       private final ScheduledExecutorService futureExecutor;
+
+       /** The executor which is used to execute blocking io operations */
+       private final Executor ioExecutor;
+
        /** {@code true} if all source tasks are stoppable. */
        private boolean isStoppable = true;
 
@@ -172,6 +178,18 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        /** The timeout for all messages that require a 
response/acknowledgement */
        private final Time rpcCallTimeout;
 
+       /** Strategy to use for restarts */
+       private final RestartStrategy restartStrategy;
+
+       /** The slot provider to use for allocating slots for tasks as they are 
needed */
+       private final SlotProvider slotProvider;
+
+       /** The classloader for the user code. Needed for calls into user code 
classes */
+       private final ClassLoader userClassLoader;
+
+       /** Registered KvState instances reported by the TaskManagers. */
+       private final KvStateLocationRegistry kvStateLocationRegistry;
+
        // ------ Configuration of the Execution -------
 
        /** Flag to indicate whether the scheduler may queue tasks for 
execution, or needs to be able
@@ -199,31 +217,14 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
        // ------ Fields that are relevant to the execution and need to be 
cleared before archiving  -------
 
-       /** The slot provider to use for allocating slots for tasks as they are 
needed */
-       private SlotProvider slotProvider;
-
-       /** Strategy to use for restarts */
-       private RestartStrategy restartStrategy;
-
-       /** The classloader for the user code. Needed for calls into user code 
classes */
-       private ClassLoader userClassLoader;
-
        /** The coordinator for checkpoints, if snapshot checkpoints are 
enabled */
        private CheckpointCoordinator checkpointCoordinator;
 
        /** Checkpoint stats tracker separate from the coordinator in order to 
be
         * available after archiving. */
+       @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private CheckpointStatsTracker checkpointStatsTracker;
 
-       /** The executor which is used to execute futures. */
-       private final ScheduledExecutorService futureExecutor;
-
-       /** The executor which is used to execute blocking io operations */
-       private final Executor ioExecutor;
-
-       /** Registered KvState instances reported by the TaskManagers. */
-       private KvStateLocationRegistry kvStateLocationRegistry;
-
        // ------ Fields that are only relevant for archived execution graphs 
------------
        private String jsonPlan;
 
@@ -242,7 +243,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        Configuration jobConfig,
                        SerializedValue<ExecutionConfig> serializedConfig,
                        Time timeout,
-                       RestartStrategy restartStrategy) throws IOException {
+                       RestartStrategy restartStrategy,
+                       SlotProvider slotProvider) throws IOException {
                this(
                        futureExecutor,
                        ioExecutor,
@@ -254,6 +256,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        restartStrategy,
                        Collections.<BlobKey>emptyList(),
                        Collections.<URL>emptyList(),
+                       slotProvider,
                        ExecutionGraph.class.getClassLoader(),
                        new UnregisteredMetricsGroup()
                );
@@ -270,6 +273,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        RestartStrategy restartStrategy,
                        List<BlobKey> requiredJarFiles,
                        List<URL> requiredClasspaths,
+                       SlotProvider slotProvider,
                        ClassLoader userClassLoader,
                        MetricGroup metricGroup) throws IOException {
 
@@ -277,7 +281,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                checkNotNull(jobId);
                checkNotNull(jobName);
                checkNotNull(jobConfig);
-               checkNotNull(userClassLoader);
 
                this.jobInformation = new JobInformation(
                        jobId,
@@ -293,12 +296,13 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                this.futureExecutor = 
Preconditions.checkNotNull(futureExecutor);
                this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
 
-               this.userClassLoader = userClassLoader;
+               this.slotProvider = Preconditions.checkNotNull(slotProvider, 
"scheduler");
+               this.userClassLoader = 
Preconditions.checkNotNull(userClassLoader, "userClassLoader");
 
-               this.tasks = new ConcurrentHashMap<JobVertexID, 
ExecutionJobVertex>();
-               this.intermediateResults = new 
ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>();
-               this.verticesInCreationOrder = new 
ArrayList<ExecutionJobVertex>();
-               this.currentExecutions = new 
ConcurrentHashMap<ExecutionAttemptID, Execution>();
+               this.tasks = new ConcurrentHashMap<>(16);
+               this.intermediateResults = new ConcurrentHashMap<>(16);
+               this.verticesInCreationOrder = new ArrayList<>(16);
+               this.currentExecutions = new ConcurrentHashMap<>(16);
 
                this.jobStatusListeners  = new CopyOnWriteArrayList<>();
                this.executionListeners = new CopyOnWriteArrayList<>();
@@ -732,15 +736,9 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                }
        }
 
-       public void scheduleForExecution(SlotProvider slotProvider) throws 
JobException {
-               checkNotNull(slotProvider);
-
-               if (this.slotProvider != null && this.slotProvider != 
slotProvider) {
-                       throw new IllegalArgumentException("Cannot use 
different slot providers for the same job");
-               }
+       public void scheduleForExecution() throws JobException {
 
                if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
-                       this.slotProvider = slotProvider;
 
                        switch (scheduleMode) {
 
@@ -1070,7 +1068,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                                }
                        }
 
-                       scheduleForExecution(slotProvider);
+                       scheduleForExecution();
                }
                catch (Throwable t) {
                        LOG.warn("Failed to restart the job.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 2a79302..ec7103c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -68,6 +69,7 @@ public class ExecutionGraphBuilder {
                        Configuration jobManagerConfig,
                        ScheduledExecutorService futureExecutor,
                        Executor ioExecutor,
+                       SlotProvider slotProvider,
                        ClassLoader classLoader,
                        CheckpointRecoveryFactory recoveryFactory,
                        Time timeout,
@@ -98,6 +100,7 @@ public class ExecutionGraphBuilder {
                                                restartStrategy,
                                                jobGraph.getUserJarBlobKeys(),
                                                jobGraph.getClasspaths(),
+                                               slotProvider,
                                                classLoader,
                                                metrics);
                } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 941248f..145216d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -232,26 +232,27 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
 
+               this.slotPool = new SlotPool(rpcService, jobGraph.getJobID());
+               this.slotPoolGateway = slotPool.getSelf();
+
                this.executionGraph = ExecutionGraphBuilder.buildGraph(
-                               null,
-                               jobGraph,
-                               configuration,
-                               executorService,
-                               executorService,
-                               userCodeLoader,
-                               checkpointRecoveryFactory,
-                               rpcAskTimeout,
-                               restartStrategy,
-                               jobMetricGroup,
-                               -1,
-                               log);
+                       null,
+                       jobGraph,
+                       configuration,
+                       executorService,
+                       executorService,
+                       slotPool.getSlotProvider(),
+                       userCodeLoader,
+                       checkpointRecoveryFactory,
+                       rpcAskTimeout,
+                       restartStrategy,
+                       jobMetricGroup,
+                       -1,
+                       log);
 
                // register self as job status change listener
                executionGraph.registerJobStatusListener(new 
JobManagerJobStatusListener());
 
-               this.slotPool = new SlotPool(rpcService, jobGraph.getJobID());
-               this.slotPoolGateway = slotPool.getSelf();
-
                this.registeredTaskManagers = new HashMap<>(4);
        }
 
@@ -340,7 +341,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        @Override
                        public void run() {
                                try {
-                                       
executionGraph.scheduleForExecution(slotPool.getSlotProvider());
+                                       executionGraph.scheduleForExecution();
                                }
                                catch (Throwable t) {
                                        executionGraph.fail(t);

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 87cd4ac..9d53aa2 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1292,6 +1292,7 @@ class JobManager(
           flinkConfiguration,
           futureExecutor,
           ioExecutor,
+          scheduler,
           userCodeLoader,
           checkpointRecoveryFactory,
           Time.of(timeout.length, timeout.unit),
@@ -1410,7 +1411,7 @@ class JobManager(
             // the job.
             log.info(s"Scheduling job $jobId ($jobName).")
 
-            executionGraph.scheduleForExecution(scheduler)
+            executionGraph.scheduleForExecution()
           } else {
             // Remove the job graph. Otherwise it will be lingering around and 
possibly removed from
             // ZooKeeper by this JM.

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 8f565dd..98b4c4d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.AfterClass;
@@ -103,6 +104,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                        new NoRestartStrategy(),
                        Collections.<BlobKey>emptyList(),
                        Collections.<URL>emptyList(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()),
                        ClassLoader.getSystemClassLoader(),
                        new UnregisteredMetricsGroup());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index b3e9d5d..077ab53 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -103,7 +104,9 @@ public class ArchivedExecutionGraphTest {
                        new Configuration(),
                        new SerializedValue<>(config),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       mock(SlotProvider.class));
+
                runtimeGraph.attachJobGraph(vertices);
 
                List<ExecutionJobVertex> jobVertices = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index aed1095..fa48384 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -119,7 +120,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -169,7 +171,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -244,7 +247,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -504,7 +508,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -569,7 +574,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                        fail("Attached wrong jobgraph");
@@ -638,7 +644,8 @@ public class ExecutionGraphConstructionTest {
                                cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        try {
                                eg.attachJobGraph(ordered);
                        }
@@ -685,7 +692,8 @@ public class ExecutionGraphConstructionTest {
                                cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
 
                        try {
                                eg.attachJobGraph(ordered);
@@ -767,7 +775,8 @@ public class ExecutionGraphConstructionTest {
                                cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        
                        
eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 6b05987..f119671 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -97,7 +97,8 @@ public class ExecutionGraphDeploymentTest {
                                new Configuration(),
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
 
                        List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
@@ -312,6 +313,15 @@ public class ExecutionGraphDeploymentTest {
 
                v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
 
+               Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
+               for (int i = 0; i < dop1; i++) {
+                       scheduler.newInstanceAvailable(
+                               ExecutionGraphTestUtils.getInstance(
+                                       new ActorTaskManagerGateway(
+                                               new 
ExecutionGraphTestUtils.SimpleActorGateway(
+                                                       
TestingUtils.directExecutionContext()))));
+               }
+
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(
                        new DirectScheduledExecutorService(),
@@ -321,25 +331,18 @@ public class ExecutionGraphDeploymentTest {
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       scheduler);
 
                eg.setQueuedSchedulingAllowed(false);
 
                List<JobVertex> ordered = Arrays.asList(v1, v2);
                eg.attachJobGraph(ordered);
 
-               Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
-               for (int i = 0; i < dop1; i++) {
-                       scheduler.newInstanceAvailable(
-                               ExecutionGraphTestUtils.getInstance(
-                                       new ActorTaskManagerGateway(
-                                               new 
ExecutionGraphTestUtils.SimpleActorGateway(
-                                                       
TestingUtils.directExecutionContext()))));
-               }
                assertEquals(dop1, scheduler.getNumberOfAvailableSlots());
 
                // schedule, this triggers mock deployment
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
 
                ExecutionAttemptID attemptID = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
                eg.updateState(new TaskExecutionState(jobId, attemptID, 
ExecutionState.RUNNING));
@@ -357,6 +360,15 @@ public class ExecutionGraphDeploymentTest {
                v1.setInvokableClass(BatchTask.class);
                v2.setInvokableClass(BatchTask.class);
 
+               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+               for (int i = 0; i < dop1 + dop2; i++) {
+                       scheduler.newInstanceAvailable(
+                               ExecutionGraphTestUtils.getInstance(
+                                       new ActorTaskManagerGateway(
+                                               new 
ExecutionGraphTestUtils.SimpleActorGateway(
+                                                       
TestingUtils.directExecutionContext()))));
+               }
+
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(
                        new DirectScheduledExecutorService(),
@@ -366,25 +378,18 @@ public class ExecutionGraphDeploymentTest {
                        new Configuration(), 
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       scheduler);
                
                eg.setQueuedSchedulingAllowed(false);
 
                List<JobVertex> ordered = Arrays.asList(v1, v2);
                eg.attachJobGraph(ordered);
 
-               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-               for (int i = 0; i < dop1 + dop2; i++) {
-                       scheduler.newInstanceAvailable(
-                               ExecutionGraphTestUtils.getInstance(
-                                       new ActorTaskManagerGateway(
-                                               new 
ExecutionGraphTestUtils.SimpleActorGateway(
-                                                       
TestingUtils.directExecutionContext()))));
-               }
                assertEquals(dop1 + dop2, 
scheduler.getNumberOfAvailableSlots());
 
                // schedule, this triggers mock deployment
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
 
                Map<ExecutionAttemptID, Execution> executions = 
eg.getRegisteredExecutions();
                assertEquals(dop1 + dop2, executions.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 6b28984..203c547 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -162,6 +162,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                                testingRestartStrategy,
                                Collections.<BlobKey>emptyList(),
                                Collections.<URL>emptyList(),
+                       scheduler,
                                getClass().getClassLoader(),
                                metricGroup);
        
@@ -180,7 +181,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                        
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        
                        // start execution
-                       executionGraph.scheduleForExecution(scheduler);
+               executionGraph.scheduleForExecution();
 
                        assertTrue(0L == restartingTime.getValue());
        

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index e4f49bb..1729582 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -116,12 +116,12 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                
                //initiate and schedule job
                JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, 
groupVertex2);
-               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 0L));
+               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 0L), scheduler);
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
                
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
                
                //sanity checks
@@ -240,7 +240,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        // We want to manually control the restart and delay
-                       new InfiniteDelayRestartStrategy());
+                       new InfiniteDelayRestartStrategy(),
+                       scheduler);
 
                JobVertex jobVertex = new JobVertex("NoOpInvokable");
                jobVertex.setInvokableClass(NoOpInvokable.class);
@@ -252,7 +253,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                assertEquals(JobStatus.CREATED, executionGraph.getState());
 
-               executionGraph.scheduleForExecution(scheduler);
+               executionGraph.scheduleForExecution();
 
                assertEquals(JobStatus.RUNNING, executionGraph.getState());
 
@@ -384,12 +385,12 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                JobVertex sender = newJobVertex("Task1", 1, 
NoOpInvokable.class);
                JobVertex receiver = newJobVertex("Task2", 1, 
NoOpInvokable.class);
                JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
-               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000));
+               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000), scheduler);
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
 
                Iterator<ExecutionVertex> executionVertices = 
eg.getAllExecutionVertices().iterator();
@@ -453,13 +454,13 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                JobGraph jobGraph = new JobGraph("Test Job", vertex);
                jobGraph.setExecutionConfig(executionConfig);
 
-               ExecutionGraph eg = newExecutionGraph(new 
InfiniteDelayRestartStrategy());
+               ExecutionGraph eg = newExecutionGraph(new 
InfiniteDelayRestartStrategy(), scheduler);
 
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
 
                // Fail right after cancel (for example with concurrent slot 
release)
@@ -499,13 +500,13 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                JobGraph jobGraph = new JobGraph("Test Job", vertex);
                jobGraph.setExecutionConfig(executionConfig);
 
-               ExecutionGraph eg = newExecutionGraph(new 
InfiniteDelayRestartStrategy());
+               ExecutionGraph eg = newExecutionGraph(new 
InfiniteDelayRestartStrategy(), scheduler);
 
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
 
                // Fail right after cancel (for example with concurrent slot 
release)
@@ -555,13 +556,14 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       controllableRestartStrategy);
+                       controllableRestartStrategy,
+                       scheduler);
 
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
 
                assertEquals(JobStatus.RUNNING, eg.getState());
 
@@ -660,7 +662,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
-               ExecutionGraph eg = newExecutionGraph(restartStrategy);
+               ExecutionGraph eg = newExecutionGraph(restartStrategy, 
scheduler);
                if (isSpy) {
                        eg = spy(eg);
                }
@@ -668,7 +670,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
                return new Tuple2<>(eg, instance);
        }
@@ -680,7 +682,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                return groupVertex;
        }
 
-       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy) throws IOException {
+       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy, Scheduler scheduler) throws IOException {
                return new ExecutionGraph(
                        TestingUtils.defaultExecutor(),
                        TestingUtils.defaultExecutor(),
@@ -689,7 +691,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       restartStrategy);
+                       restartStrategy,
+                       scheduler);
+       }
+
+       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy) throws IOException {
+               return newExecutionGraph(restartStrategy, new 
Scheduler(TestingUtils.defaultExecutionContext()));
        }
 
        private static void restartAfterFailure(ExecutionGraph eg, 
FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 9834dc6..c8a0422 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -109,9 +109,15 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                final JobID jobId = new JobID();
                final JobGraph jobGraph = new JobGraph(jobId, "test", 
sourceVertex, targetVertex);
 
-               final ExecutionGraph eg = createExecutionGraph(jobGraph);
+               final FlinkCompletableFuture<SimpleSlot> sourceFuture = new 
FlinkCompletableFuture<>();
+               final FlinkCompletableFuture<SimpleSlot> targetFuture = new 
FlinkCompletableFuture<>();
+
+               ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
+               slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
+               slotProvider.addSlot(targetVertex.getID(), 0, targetFuture);
+
+               final ExecutionGraph eg = createExecutionGraph(jobGraph, 
slotProvider);
 
-               //
                //  set up two TaskManager gateways and slots
 
                final TaskManagerGateway gatewaySource = createTaskManager();
@@ -120,16 +126,9 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId);
                final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId);
 
-               final FlinkCompletableFuture<SimpleSlot> sourceFuture = new 
FlinkCompletableFuture<>();
-               final FlinkCompletableFuture<SimpleSlot> targetFuture = new 
FlinkCompletableFuture<>();
-
-               ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
-               slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
-               slotProvider.addSlot(targetVertex.getID(), 0, targetFuture);
-
                eg.setScheduleMode(ScheduleMode.EAGER);
                eg.setQueuedSchedulingAllowed(true);
-               eg.scheduleForExecution(slotProvider);
+               eg.scheduleForExecution();
 
                // job should be running
                assertEquals(JobStatus.RUNNING, eg.getState());
@@ -177,7 +176,10 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                final JobID jobId = new JobID();
                final JobGraph jobGraph = new JobGraph(jobId, "test", 
sourceVertex, targetVertex);
 
-               final ExecutionGraph eg = createExecutionGraph(jobGraph);
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new 
FlinkCompletableFuture[parallelism];
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new 
FlinkCompletableFuture[parallelism];
 
                //
                //  Create the slots, futures, and the slot provider
@@ -188,11 +190,6 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
                final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
 
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new 
FlinkCompletableFuture[parallelism];
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new 
FlinkCompletableFuture[parallelism];
-
                for (int i = 0; i < parallelism; i++) {
                        sourceTaskManagers[i] = createTaskManager();
                        targetTaskManagers[i] = createTaskManager();
@@ -208,6 +205,8 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
                slotProvider.addSlots(targetVertex.getID(), targetFutures);
 
+               final ExecutionGraph eg = createExecutionGraph(jobGraph, 
slotProvider);
+
                //
                //  we complete some of the futures
 
@@ -220,7 +219,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
 
                eg.setScheduleMode(ScheduleMode.EAGER);
                eg.setQueuedSchedulingAllowed(true);
-               eg.scheduleForExecution(slotProvider);
+               eg.scheduleForExecution();
 
                verifyNothingDeployed(eg, sourceTaskManagers);
 
@@ -274,10 +273,6 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                final JobID jobId = new JobID();
                final JobGraph jobGraph = new JobGraph(jobId, "test", 
sourceVertex, targetVertex);
 
-               final ExecutionGraph eg = createExecutionGraph(jobGraph);
-               TerminalJobStatusListener testListener = new 
TerminalJobStatusListener();
-               eg.registerJobStatusListener(testListener);
-
                //
                //  Create the slots, futures, and the slot provider
 
@@ -304,6 +299,10 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
                slotProvider.addSlots(targetVertex.getID(), targetFutures);
 
+               final ExecutionGraph eg = createExecutionGraph(jobGraph, 
slotProvider);
+               TerminalJobStatusListener testListener = new 
TerminalJobStatusListener();
+               eg.registerJobStatusListener(testListener);
+
                //
                //  we complete some of the futures
 
@@ -317,7 +316,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
 
                eg.setScheduleMode(ScheduleMode.EAGER);
                eg.setQueuedSchedulingAllowed(true);
-               eg.scheduleForExecution(slotProvider);
+               eg.scheduleForExecution();
 
                // fail one slot
                sourceFutures[1].completeExceptionally(new 
TestRuntimeException());
@@ -356,10 +355,6 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                final JobID jobId = new JobID();
                final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
 
-               final ExecutionGraph eg = createExecutionGraph(jobGraph, 
Time.milliseconds(20));
-               final TerminalJobStatusListener statusListener = new 
TerminalJobStatusListener();
-               eg.registerJobStatusListener(statusListener);
-
                final SlotOwner slotOwner = mock(SlotOwner.class);
 
                final TaskManagerGateway taskManager = 
mock(TaskManagerGateway.class);
@@ -375,6 +370,10 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
                slotProvider.addSlots(vertex.getID(), slotFutures);
 
+               final ExecutionGraph eg = createExecutionGraph(jobGraph, 
slotProvider, Time.milliseconds(20));
+               final TerminalJobStatusListener statusListener = new 
TerminalJobStatusListener();
+               eg.registerJobStatusListener(statusListener);
+
                //  we complete one future
                slotFutures[1].complete(slots[1]);
 
@@ -382,7 +381,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
 
                eg.setScheduleMode(ScheduleMode.EAGER);
                eg.setQueuedSchedulingAllowed(true);
-               eg.scheduleForExecution(slotProvider);
+               eg.scheduleForExecution();
 
                //  we complete another future
                slotFutures[2].complete(slots[2]);
@@ -419,9 +418,6 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                final JobID jobId = new JobID();
                final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
 
-               final ExecutionGraph eg = createExecutionGraph(jobGraph);
-               final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
-
                // set up some available slots and some slot owner that accepts 
released slots back
                final List<SimpleSlot> returnedSlots = new ArrayList<>();
                final SlotOwner recycler = new SlotOwner() {
@@ -432,35 +428,37 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                        }
                };
 
+               // slot provider that hand out parallelism / 3 slots, then 
throws an exception
+               final SlotProvider slotProvider = mock(SlotProvider.class);
+
                final TaskManagerGateway taskManager = 
mock(TaskManagerGateway.class);
                final List<SimpleSlot> availableSlots = new 
ArrayList<>(Arrays.asList(
-                               createSlot(taskManager, jobId, recycler),
-                               createSlot(taskManager, jobId, recycler),
-                               createSlot(taskManager, jobId, recycler)));
-
-
-               // slot provider that hand out parallelism / 3 slots, then 
throws an exception
-               final SlotProvider slots = mock(SlotProvider.class);
-               
-               when(slots.allocateSlot(any(ScheduledUnit.class), 
anyBoolean())).then(
-                               new Answer<Future<SimpleSlot>>() {
-
-                                       @Override
-                                       public Future<SimpleSlot> 
answer(InvocationOnMock invocation) {
-                                               if (availableSlots.isEmpty()) {
-                                                       throw new 
TestRuntimeException();
-                                               } else {
-                                                       return 
FlinkCompletableFuture.completed(availableSlots.remove(0));
-                                               }
+                       createSlot(taskManager, jobId, recycler),
+                       createSlot(taskManager, jobId, recycler),
+                       createSlot(taskManager, jobId, recycler)));
+
+               when(slotProvider.allocateSlot(any(ScheduledUnit.class), 
anyBoolean())).then(
+                       new Answer<Future<SimpleSlot>>() {
+
+                               @Override
+                               public Future<SimpleSlot> 
answer(InvocationOnMock invocation) {
+                                       if (availableSlots.isEmpty()) {
+                                               throw new 
TestRuntimeException();
+                                       } else {
+                                               return 
FlinkCompletableFuture.completed(availableSlots.remove(0));
                                        }
-                               });
+                               }
+                       });
+
+               final ExecutionGraph eg = createExecutionGraph(jobGraph, 
slotProvider);
+               final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
 
                // acquire resources and check that all are back after the 
failure
 
                final int numSlotsToExpectBack = availableSlots.size();
 
                try {
-                       ejv.allocateResourcesForAll(slots, false);
+                       ejv.allocateResourcesForAll(slotProvider, false);
                        fail("should have failed with an exception");
                }
                catch (TestRuntimeException e) {
@@ -471,7 +469,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
        }
 
        /**
-        * Tests that the {@link 
ExecutionGraph#scheduleForExecution(SlotProvider)} method
+        * Tests that the {@link ExecutionGraph#scheduleForExecution()} method
         * releases partially acquired resources upon exception.
         */
        @Test
@@ -495,8 +493,6 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                final JobID jobId = new JobID();
                final JobGraph jobGraph = new JobGraph(jobId, "test", 
sourceVertex, targetVertex);
 
-               final ExecutionGraph eg = createExecutionGraph(jobGraph);
-
                // set up some available slots and some slot owner that accepts 
released slots back
                final List<SimpleSlot> returnedSlots = new ArrayList<>();
                final SlotOwner recycler = new SlotOwner() {
@@ -509,28 +505,30 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
 
                final TaskManagerGateway taskManager = 
mock(TaskManagerGateway.class);
                final List<SimpleSlot> availableSlots = new 
ArrayList<>(Arrays.asList(
-                               createSlot(taskManager, jobId, recycler),
-                               createSlot(taskManager, jobId, recycler),
-                               createSlot(taskManager, jobId, recycler),
-                               createSlot(taskManager, jobId, recycler),
-                               createSlot(taskManager, jobId, recycler)));
+                       createSlot(taskManager, jobId, recycler),
+                       createSlot(taskManager, jobId, recycler),
+                       createSlot(taskManager, jobId, recycler),
+                       createSlot(taskManager, jobId, recycler),
+                       createSlot(taskManager, jobId, recycler)));
 
 
                // slot provider that hand out parallelism / 3 slots, then 
throws an exception
-               final SlotProvider slots = mock(SlotProvider.class);
-
-               when(slots.allocateSlot(any(ScheduledUnit.class), 
anyBoolean())).then(
-                               new Answer<Future<SimpleSlot>>() {
-
-                                       @Override
-                                       public Future<SimpleSlot> 
answer(InvocationOnMock invocation) {
-                                               if (availableSlots.isEmpty()) {
-                                                       throw new 
TestRuntimeException();
-                                               } else {
-                                                       return 
FlinkCompletableFuture.completed(availableSlots.remove(0));
-                                               }
+               final SlotProvider slotProvider = mock(SlotProvider.class);
+
+               when(slotProvider.allocateSlot(any(ScheduledUnit.class), 
anyBoolean())).then(
+                       new Answer<Future<SimpleSlot>>() {
+
+                               @Override
+                               public Future<SimpleSlot> 
answer(InvocationOnMock invocation) {
+                                       if (availableSlots.isEmpty()) {
+                                               throw new 
TestRuntimeException();
+                                       } else {
+                                               return 
FlinkCompletableFuture.completed(availableSlots.remove(0));
                                        }
-                               });
+                               }
+                       });
+
+               final ExecutionGraph eg = createExecutionGraph(jobGraph, 
slotProvider);
 
                // acquire resources and check that all are back after the 
failure
 
@@ -538,7 +536,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
 
                try {
                        eg.setScheduleMode(ScheduleMode.EAGER);
-                       eg.scheduleForExecution(slots);
+                       eg.scheduleForExecution();
                        fail("should have failed with an exception");
                }
                catch (TestRuntimeException e) {
@@ -552,24 +550,25 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws 
Exception {
-               return createExecutionGraph(jobGraph, Time.minutes(10));
+       private ExecutionGraph createExecutionGraph(JobGraph jobGraph, 
SlotProvider slotProvider) throws Exception {
+               return createExecutionGraph(jobGraph, slotProvider, 
Time.minutes(10));
        }
 
-       private ExecutionGraph createExecutionGraph(JobGraph jobGraph, Time 
timeout) throws Exception {
+       private ExecutionGraph createExecutionGraph(JobGraph jobGraph, 
SlotProvider slotProvider, Time timeout) throws Exception {
                return ExecutionGraphBuilder.buildGraph(
-                               null,
-                               jobGraph,
-                               new Configuration(),
-                               executor,
-                               executor,
-                               getClass().getClassLoader(),
-                               new StandaloneCheckpointRecoveryFactory(),
-                               timeout,
-                               new NoRestartStrategy(),
-                               new UnregisteredMetricsGroup(),
-                               1,
-                               log);
+                       null,
+                       jobGraph,
+                       new Configuration(),
+                       executor,
+                       executor,
+                       slotProvider,
+                       getClass().getClassLoader(),
+                       new StandaloneCheckpointRecoveryFactory(),
+                       timeout,
+                       new NoRestartStrategy(),
+                       new UnregisteredMetricsGroup(),
+                       1,
+                       log);
        }
 
        private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID 
jobId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index b7850fa..64b9aa2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -139,7 +140,8 @@ public class ExecutionGraphSignalsTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                eg.attachJobGraph(ordered);
 
                f = eg.getClass().getDeclaredField("state");

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index ef94917..b0137fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -46,6 +46,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import 
org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
@@ -55,6 +56,7 @@ import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
 
 public class ExecutionGraphTestUtils {
 
@@ -183,7 +185,8 @@ public class ExecutionGraphTestUtils {
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new 
Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor)));
 
                ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 
1,
                                AkkaUtils.getDefaultTimeout()));

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 7427a1d..bd51c81 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -59,7 +60,8 @@ public class ExecutionStateProgressTest {
                                new Configuration(),
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        graph.attachJobGraph(Collections.singletonList(ajv));
 
                        setGraphStatus(graph, JobStatus.RUNNING);

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 47863ac..cfd4665 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -204,18 +205,19 @@ public class ExecutionVertexLocalityTest extends 
TestLogger {
                JobGraph testJob = new JobGraph(jobId, "test job", source, 
target);
 
                return ExecutionGraphBuilder.buildGraph(
-                               null,
-                               testJob,
-                               new Configuration(),
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               getClass().getClassLoader(),
-                               new StandaloneCheckpointRecoveryFactory(),
-                               Time.of(10, TimeUnit.SECONDS),
-                               new FixedDelayRestartStrategy(10, 0L),
-                               new UnregisteredMetricsGroup(),
-                               1,
-                               log);
+                       null,
+                       testJob,
+                       new Configuration(),
+                       TestingUtils.defaultExecutor(),
+                       TestingUtils.defaultExecutor(),
+                       mock(SlotProvider.class),
+                       getClass().getClassLoader(),
+                       new StandaloneCheckpointRecoveryFactory(),
+                       Time.of(10, TimeUnit.SECONDS),
+                       new FixedDelayRestartStrategy(10, 0L),
+                       new UnregisteredMetricsGroup(),
+                       1,
+                       log);
        }
 
        private void initializeLocation(ExecutionVertex vertex, 
TaskManagerLocation location) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
index 32867bf..89db3a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -49,14 +50,15 @@ public class LegacyJobVertexIdTest {
                jobVertex.setInvokableClass(AbstractInvokable.class);
 
                ExecutionGraph executionGraph = new ExecutionGraph(
-                               mock(ScheduledExecutorService.class),
-                               mock(Executor.class),
-                               new JobID(),
-                               "test",
-                               mock(Configuration.class),
-                               mock(SerializedValue.class),
-                               Time.seconds(1),
-                               mock(RestartStrategy.class));
+                       mock(ScheduledExecutorService.class),
+                       mock(Executor.class),
+                       new JobID(),
+                       "test",
+                       mock(Configuration.class),
+                       mock(SerializedValue.class),
+                       Time.seconds(1),
+                       mock(RestartStrategy.class),
+                       mock(SlotProvider.class));
 
                ExecutionJobVertex executionJobVertex =
                                new ExecutionJobVertex(executionGraph, 
jobVertex, 1, Time.seconds(1));
@@ -75,4 +77,4 @@ public class LegacyJobVertexIdTest {
                Assert.assertEquals(executionJobVertex, 
idToVertex.get(legacyId1));
                Assert.assertEquals(executionJobVertex, 
idToVertex.get(legacyId2));
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 006f894..5629c0b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -73,7 +74,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -119,7 +121,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -166,7 +169,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -214,7 +218,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -260,7 +265,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -326,7 +332,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -383,7 +390,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index afdafe3..d717986 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -187,11 +187,12 @@ public class TerminalStateDeadlockTest {
                                EMPTY_CONFIG,
                                new SerializedValue<>(new ExecutionConfig()),
                                TIMEOUT,
-                               new FixedDelayRestartStrategy(1, 0));
+                               new FixedDelayRestartStrategy(1, 0),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                }
 
                @Override
-               public void scheduleForExecution(SlotProvider slotProvider) {
+               public void scheduleForExecution() {
                        // notify that we are done with the "restarting"
                        synchronized (this) {
                                done = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 4e1bfae..bf17485 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -87,7 +88,8 @@ public class VertexSlotSharingTest {
                                new Configuration(),
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        eg.attachJobGraph(vertices);
                        
                        // verify that the vertices are all in the same slot 
sharing group

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 258e44e..5f4ea25 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -70,13 +70,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike 
with Matchers {
           new Configuration(),
           new SerializedValue(new ExecutionConfig()),
           AkkaUtils.getDefaultTimeout,
-          new NoRestartStrategy())
+          new NoRestartStrategy(),
+          scheduler)
 
         eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
 
         eg.getState should equal(JobStatus.CREATED)
 
-        eg.scheduleForExecution(scheduler)
+        eg.scheduleForExecution()
         eg.getState should equal(JobStatus.RUNNING)
 
         instance1.markDead()

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 3c81112..43fe169 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -148,6 +149,7 @@ public class RescalePartitionerTest extends TestLogger {
                        new NoRestartStrategy(),
                        new ArrayList<BlobKey>(),
                        new ArrayList<URL>(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()),
                        ExecutionGraph.class.getClassLoader(),
                        new UnregisteredMetricsGroup());
                try {

Reply via email to