[FLINK-5934] Set the Scheduler in the ExecutionGraph via its constructor

Before the scheduler was set when calling 
ExecutionGraph.scheduleForExecution(). This
has the disadvantage that the ExecutionGraph has not scheduler set if something 
else
went wrong before the scheduleForExecution call. Consequently, the job will be 
stuck
in a restart loop because the recovery will fail if there is no Scheduler set. 
In
order to solve the problem, the Scheduler is not passed to the ExecutionGraph 
when
it is created.

This closes #3440.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c22efce0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c22efce0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c22efce0

Branch: refs/heads/release-1.2
Commit: c22efce098c14e8f08bad1e0065dbd02df6e4dbb
Parents: 53b4542
Author: Till Rohrmann <[email protected]>
Authored: Tue Feb 28 15:20:47 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Mar 2 13:59:41 2017 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 65 +++++++++-----------
 .../executiongraph/ExecutionGraphBuilder.java   |  3 +
 .../flink/runtime/jobmanager/JobManager.scala   |  3 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |  2 +
 .../ArchivedExecutionGraphTest.java             |  5 +-
 .../ExecutionGraphConstructionTest.java         | 25 +++++---
 .../ExecutionGraphDeploymentTest.java           | 47 +++++++-------
 .../ExecutionGraphMetricsTest.java              |  3 +-
 .../ExecutionGraphRestartTest.java              | 39 +++++++-----
 .../ExecutionGraphSignalsTest.java              |  4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  5 +-
 .../ExecutionStateProgressTest.java             |  4 +-
 .../executiongraph/LegacyJobVertexIdTest.java   | 18 +++---
 .../executiongraph/PointwisePatternTest.java    | 22 ++++---
 .../TerminalStateDeadlockTest.java              |  5 +-
 .../executiongraph/VertexSlotSharingTest.java   |  4 +-
 .../TaskManagerLossFailsTasksTest.scala         |  5 +-
 .../partitioner/RescalePartitionerTest.java     |  2 +
 18 files changed, 155 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 66aec8b..10b2c51 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -128,6 +128,12 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
         */
        private final SerializedValue<JobInformation> serializedJobInformation;
 
+       /** The executor which is used to execute futures. */
+       private final Executor futureExecutor;
+
+       /** The executor which is used to execute blocking io operations */
+       private final Executor ioExecutor;
+
        /** {@code true} if all source tasks are stoppable. */
        private boolean isStoppable = true;
 
@@ -159,6 +165,18 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        /** The timeout for all messages that require a 
response/acknowledgement */
        private final Time timeout;
 
+       /** Strategy to use for restarts */
+       private final RestartStrategy restartStrategy;
+
+       /** The slot provider to use for allocating slots for tasks as they are 
needed */
+       private final SlotProvider slotProvider;
+
+       /** The classloader for the user code. Needed for calls into user code 
classes */
+       private final ClassLoader userClassLoader;
+
+       /** Registered KvState instances reported by the TaskManagers. */
+       private final KvStateLocationRegistry kvStateLocationRegistry;
+
        // ------ Configuration of the Execution -------
 
        /** Flag to indicate whether the scheduler may queue tasks for 
execution, or needs to be able
@@ -184,15 +202,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
        // ------ Fields that are relevant to the execution and need to be 
cleared before archiving  -------
 
-       /** The slot provider to use for allocating slots for tasks as they are 
needed */
-       private SlotProvider slotProvider;
-
-       /** Strategy to use for restarts */
-       private RestartStrategy restartStrategy;
-
-       /** The classloader for the user code. Needed for calls into user code 
classes */
-       private ClassLoader userClassLoader;
-
        /** The coordinator for checkpoints, if snapshot checkpoints are 
enabled */
        private CheckpointCoordinator checkpointCoordinator;
 
@@ -200,15 +209,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
         * available after archiving. */
        private CheckpointStatsTracker checkpointStatsTracker;
 
-       /** The executor which is used to execute futures. */
-       private final Executor futureExecutor;
-
-       /** The executor which is used to execute blocking io operations */
-       private final Executor ioExecutor;
-
-       /** Registered KvState instances reported by the TaskManagers. */
-       private KvStateLocationRegistry kvStateLocationRegistry;
-
        // ------ Fields that are only relevant for archived execution graphs 
------------
        private String jsonPlan;
 
@@ -227,7 +227,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        Configuration jobConfig,
                        SerializedValue<ExecutionConfig> serializedConfig,
                        Time timeout,
-                       RestartStrategy restartStrategy) throws IOException {
+                       RestartStrategy restartStrategy,
+                       SlotProvider slotProvider) throws IOException {
                this(
                        futureExecutor,
                        ioExecutor,
@@ -239,6 +240,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        restartStrategy,
                        new ArrayList<BlobKey>(),
                        new ArrayList<URL>(),
+                       slotProvider,
                        ExecutionGraph.class.getClassLoader(),
                        new UnregisteredMetricsGroup()
                );
@@ -255,6 +257,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        RestartStrategy restartStrategy,
                        List<BlobKey> requiredJarFiles,
                        List<URL> requiredClasspaths,
+                       SlotProvider slotProvider,
                        ClassLoader userClassLoader,
                        MetricGroup metricGroup) throws IOException {
 
@@ -262,7 +265,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                checkNotNull(jobId);
                checkNotNull(jobName);
                checkNotNull(jobConfig);
-               checkNotNull(userClassLoader);
 
                this.jobInformation = new JobInformation(
                        jobId,
@@ -278,12 +280,13 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                this.futureExecutor = 
Preconditions.checkNotNull(futureExecutor);
                this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
 
-               this.userClassLoader = userClassLoader;
+               this.slotProvider = Preconditions.checkNotNull(slotProvider, 
"scheduler");
+               this.userClassLoader = 
Preconditions.checkNotNull(userClassLoader, "userClassLoader");
 
-               this.tasks = new ConcurrentHashMap<JobVertexID, 
ExecutionJobVertex>();
-               this.intermediateResults = new 
ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>();
-               this.verticesInCreationOrder = new 
ArrayList<ExecutionJobVertex>();
-               this.currentExecutions = new 
ConcurrentHashMap<ExecutionAttemptID, Execution>();
+               this.tasks = new ConcurrentHashMap<>(16);
+               this.intermediateResults = new ConcurrentHashMap<>(16);
+               this.verticesInCreationOrder = new ArrayList<>(16);
+               this.currentExecutions = new ConcurrentHashMap<>(16);
 
                this.jobStatusListeners  = new CopyOnWriteArrayList<>();
                this.executionListeners = new CopyOnWriteArrayList<>();
@@ -700,17 +703,9 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                }
        }
 
-       public void scheduleForExecution(SlotProvider slotProvider) throws 
JobException {
-               if (slotProvider == null) {
-                       throw new IllegalArgumentException("Scheduler must not 
be null.");
-               }
-
-               if (this.slotProvider != null && this.slotProvider != 
slotProvider) {
-                       throw new IllegalArgumentException("Cannot use 
different slot providers for the same job");
-               }
+       public void scheduleForExecution() throws JobException {
 
                if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
-                       this.slotProvider = slotProvider;
 
                        switch (scheduleMode) {
 
@@ -914,7 +909,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                                }
                        }
 
-                       scheduleForExecution(slotProvider);
+                       scheduleForExecution();
                }
                catch (Throwable t) {
                        LOG.warn("Failed to restart the job.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 386f202..4545777 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -62,6 +63,7 @@ public class ExecutionGraphBuilder {
                        Configuration jobManagerConfig,
                        Executor futureExecutor,
                        Executor ioExecutor,
+                       SlotProvider slotProvider,
                        ClassLoader classLoader,
                        CheckpointRecoveryFactory recoveryFactory,
                        Time timeout,
@@ -92,6 +94,7 @@ public class ExecutionGraphBuilder {
                                                restartStrategy,
                                                jobGraph.getUserJarBlobKeys(),
                                                jobGraph.getClasspaths(),
+                                               slotProvider,
                                                classLoader,
                                                metrics);
                } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 3025727..00128a0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1287,6 +1287,7 @@ class JobManager(
           flinkConfiguration,
           futureExecutor,
           ioExecutor,
+          scheduler,
           userCodeLoader,
           checkpointRecoveryFactory,
           Time.of(timeout.length, timeout.unit),
@@ -1405,7 +1406,7 @@ class JobManager(
             // the job.
             log.info(s"Scheduling job $jobId ($jobName).")
 
-            executionGraph.scheduleForExecution(scheduler)
+            executionGraph.scheduleForExecution()
           } else {
             // Remove the job graph. Otherwise it will be lingering around and 
possibly removed from
             // ZooKeeper by this JM.

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index bc95de7..99305e0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.AfterClass;
@@ -103,6 +104,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                        new NoRestartStrategy(),
                        Collections.<BlobKey>emptyList(),
                        Collections.<URL>emptyList(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()),
                        ClassLoader.getSystemClassLoader(),
                        new UnregisteredMetricsGroup());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 9b1064d..ae1a6d5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -103,7 +104,9 @@ public class ArchivedExecutionGraphTest {
                        new Configuration(),
                        new SerializedValue<>(config),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       mock(SlotProvider.class));
+
                runtimeGraph.attachJobGraph(vertices);
 
                CheckpointStatsTracker statsTracker = new 
CheckpointStatsTracker(

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index bf3a17c..ea25dca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -119,7 +120,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -169,7 +171,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -244,7 +247,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -504,7 +508,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -569,7 +574,8 @@ public class ExecutionGraphConstructionTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                        fail("Attached wrong jobgraph");
@@ -638,7 +644,8 @@ public class ExecutionGraphConstructionTest {
                                cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        try {
                                eg.attachJobGraph(ordered);
                        }
@@ -685,7 +692,8 @@ public class ExecutionGraphConstructionTest {
                                cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
 
                        try {
                                eg.attachJobGraph(ordered);
@@ -767,7 +775,8 @@ public class ExecutionGraphConstructionTest {
                                cfg,
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        
                        
eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index ef4f74c..bff4584 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -96,7 +96,8 @@ public class ExecutionGraphDeploymentTest {
                                new Configuration(),
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
 
                        List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
@@ -311,6 +312,15 @@ public class ExecutionGraphDeploymentTest {
 
                v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
 
+               Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
+               for (int i = 0; i < dop1; i++) {
+                       scheduler.newInstanceAvailable(
+                               ExecutionGraphTestUtils.getInstance(
+                                       new ActorTaskManagerGateway(
+                                               new 
ExecutionGraphTestUtils.SimpleActorGateway(
+                                                       
TestingUtils.directExecutionContext()))));
+               }
+
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.directExecutionContext(),
@@ -320,25 +330,18 @@ public class ExecutionGraphDeploymentTest {
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       scheduler);
 
                eg.setQueuedSchedulingAllowed(false);
 
                List<JobVertex> ordered = Arrays.asList(v1, v2);
                eg.attachJobGraph(ordered);
 
-               Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
-               for (int i = 0; i < dop1; i++) {
-                       scheduler.newInstanceAvailable(
-                               ExecutionGraphTestUtils.getInstance(
-                                       new ActorTaskManagerGateway(
-                                               new 
ExecutionGraphTestUtils.SimpleActorGateway(
-                                                       
TestingUtils.directExecutionContext()))));
-               }
                assertEquals(dop1, scheduler.getNumberOfAvailableSlots());
 
                // schedule, this triggers mock deployment
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
 
                ExecutionAttemptID attemptID = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
                eg.updateState(new TaskExecutionState(jobId, attemptID, 
ExecutionState.RUNNING));
@@ -356,6 +359,15 @@ public class ExecutionGraphDeploymentTest {
                v1.setInvokableClass(BatchTask.class);
                v2.setInvokableClass(BatchTask.class);
 
+               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+               for (int i = 0; i < dop1 + dop2; i++) {
+                       scheduler.newInstanceAvailable(
+                               ExecutionGraphTestUtils.getInstance(
+                                       new ActorTaskManagerGateway(
+                                               new 
ExecutionGraphTestUtils.SimpleActorGateway(
+                                                       
TestingUtils.directExecutionContext()))));
+               }
+
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.directExecutionContext(),
@@ -365,25 +377,18 @@ public class ExecutionGraphDeploymentTest {
                        new Configuration(), 
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       scheduler);
                
                eg.setQueuedSchedulingAllowed(false);
 
                List<JobVertex> ordered = Arrays.asList(v1, v2);
                eg.attachJobGraph(ordered);
 
-               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-               for (int i = 0; i < dop1 + dop2; i++) {
-                       scheduler.newInstanceAvailable(
-                               ExecutionGraphTestUtils.getInstance(
-                                       new ActorTaskManagerGateway(
-                                               new 
ExecutionGraphTestUtils.SimpleActorGateway(
-                                                       
TestingUtils.directExecutionContext()))));
-               }
                assertEquals(dop1 + dop2, 
scheduler.getNumberOfAvailableSlots());
 
                // schedule, this triggers mock deployment
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
 
                Map<ExecutionAttemptID, Execution> executions = 
eg.getRegisteredExecutions();
                assertEquals(dop1 + dop2, executions.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index d8d8e24..02e2d38 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -156,6 +156,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                                testingRestartStrategy,
                                Collections.<BlobKey>emptyList(),
                                Collections.<URL>emptyList(),
+                       scheduler,
                                getClass().getClassLoader(),
                                metricGroup);
        
@@ -174,7 +175,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                        
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        
                        // start execution
-                       executionGraph.scheduleForExecution(scheduler);
+               executionGraph.scheduleForExecution();
        
                        assertTrue(0L == restartingTime.getValue());
        

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 52bfc96..9ef72a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -115,12 +115,12 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                
                //initiate and schedule job
                JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, 
groupVertex2);
-               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 0L));
+               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 0L), scheduler);
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
                
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
                
                //sanity checks
@@ -239,7 +239,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        // We want to manually control the restart and delay
-                       new InfiniteDelayRestartStrategy());
+                       new InfiniteDelayRestartStrategy(),
+                       scheduler);
 
                JobVertex jobVertex = new JobVertex("NoOpInvokable");
                jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
@@ -251,7 +252,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                assertEquals(JobStatus.CREATED, executionGraph.getState());
 
-               executionGraph.scheduleForExecution(scheduler);
+               executionGraph.scheduleForExecution();
 
                assertEquals(JobStatus.RUNNING, executionGraph.getState());
 
@@ -383,12 +384,12 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                JobVertex sender = newJobVertex("Task1", 1, 
Tasks.NoOpInvokable.class);
                JobVertex receiver = newJobVertex("Task2", 1, 
Tasks.NoOpInvokable.class);
                JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
-               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000));
+               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000), scheduler);
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
 
                Iterator<ExecutionVertex> executionVertices = 
eg.getAllExecutionVertices().iterator();
@@ -452,13 +453,13 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                JobGraph jobGraph = new JobGraph("Test Job", vertex);
                jobGraph.setExecutionConfig(executionConfig);
 
-               ExecutionGraph eg = newExecutionGraph(new 
InfiniteDelayRestartStrategy());
+               ExecutionGraph eg = newExecutionGraph(new 
InfiniteDelayRestartStrategy(), scheduler);
 
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
 
                // Fail right after cancel (for example with concurrent slot 
release)
@@ -498,13 +499,13 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                JobGraph jobGraph = new JobGraph("Test Job", vertex);
                jobGraph.setExecutionConfig(executionConfig);
 
-               ExecutionGraph eg = newExecutionGraph(new 
InfiniteDelayRestartStrategy());
+               ExecutionGraph eg = newExecutionGraph(new 
InfiniteDelayRestartStrategy(), scheduler);
 
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
 
                // Fail right after cancel (for example with concurrent slot 
release)
@@ -554,13 +555,14 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       controllableRestartStrategy);
+                       controllableRestartStrategy,
+                       scheduler);
 
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
 
                assertEquals(JobStatus.RUNNING, eg.getState());
 
@@ -659,7 +661,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
-               ExecutionGraph eg = newExecutionGraph(restartStrategy);
+               ExecutionGraph eg = newExecutionGraph(restartStrategy, 
scheduler);
                if (isSpy) {
                        eg = spy(eg);
                }
@@ -667,7 +669,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
-               eg.scheduleForExecution(scheduler);
+               eg.scheduleForExecution();
                assertEquals(JobStatus.RUNNING, eg.getState());
                return new Tuple2<>(eg, instance);
        }
@@ -679,7 +681,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                return groupVertex;
        }
 
-       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy) throws IOException {
+       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy, Scheduler scheduler) throws IOException {
                return new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
                        TestingUtils.defaultExecutionContext(),
@@ -688,7 +690,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       restartStrategy);
+                       restartStrategy,
+                       scheduler);
+       }
+
+       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy) throws IOException {
+               return newExecutionGraph(restartStrategy, new 
Scheduler(TestingUtils.defaultExecutionContext()));
        }
 
        private static void restartAfterFailure(ExecutionGraph eg, 
FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index fde967e..fab6505 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -139,7 +140,8 @@ public class ExecutionGraphSignalsTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                eg.attachJobGraph(ordered);
 
                f = eg.getClass().getDeclaredField("state");

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 71ae3b6..ef4926e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -46,6 +46,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import 
org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
@@ -55,6 +56,7 @@ import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
 
 public class ExecutionGraphTestUtils {
 
@@ -180,7 +182,8 @@ public class ExecutionGraphTestUtils {
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new 
Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor)));
 
                ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 
1,
                                AkkaUtils.getDefaultTimeout()));

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 19e2d6d..e458d02 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -59,7 +60,8 @@ public class ExecutionStateProgressTest {
                                new Configuration(),
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        graph.attachJobGraph(Collections.singletonList(ajv));
 
                        setGraphStatus(graph, JobStatus.RUNNING);

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
index 44dc0a4..a4da0bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -49,13 +50,14 @@ public class LegacyJobVertexIdTest {
 
                ExecutionGraph executionGraph = new ExecutionGraph(
                                mock(Executor.class),
-                               mock(Executor.class),
-                               new JobID(),
-                               "test",
-                               mock(Configuration.class),
-                               mock(SerializedValue.class),
-                               Time.seconds(1),
-                               mock(RestartStrategy.class));
+                       mock(Executor.class),
+                       new JobID(),
+                       "test",
+                       mock(Configuration.class),
+                       mock(SerializedValue.class),
+                       Time.seconds(1),
+                       mock(RestartStrategy.class),
+                       mock(SlotProvider.class));
 
                ExecutionJobVertex executionJobVertex =
                                new ExecutionJobVertex(executionGraph, 
jobVertex, 1, Time.seconds(1));
@@ -74,4 +76,4 @@ public class LegacyJobVertexIdTest {
                Assert.assertEquals(executionJobVertex, 
idToVertex.get(legacyId1));
                Assert.assertEquals(executionJobVertex, 
idToVertex.get(legacyId2));
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 0e147e3..b59dcda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -73,7 +74,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -119,7 +121,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -166,7 +169,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -214,7 +218,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -260,7 +265,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -326,7 +332,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -383,7 +390,8 @@ public class PointwisePatternTest {
                        cfg,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
-                       new NoRestartStrategy());
+                       new NoRestartStrategy(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()));
                try {
                        eg.attachJobGraph(ordered);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 5b1a03e..32ae093 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -187,11 +187,12 @@ public class TerminalStateDeadlockTest {
                                EMPTY_CONFIG,
                                new SerializedValue<>(new ExecutionConfig()),
                                TIMEOUT,
-                               new FixedDelayRestartStrategy(1, 0));
+                               new FixedDelayRestartStrategy(1, 0),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                }
 
                @Override
-               public void scheduleForExecution(SlotProvider slotProvider) {
+               public void scheduleForExecution() {
                        // notify that we are done with the "restarting"
                        synchronized (this) {
                                done = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 27708a2..5c85592 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -87,7 +88,8 @@ public class VertexSlotSharingTest {
                                new Configuration(),
                                new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                               new NoRestartStrategy(),
+                               new 
Scheduler(TestingUtils.defaultExecutionContext()));
                        eg.attachJobGraph(vertices);
                        
                        // verify that the vertices are all in the same slot 
sharing group

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 1cbd605..5e4a802 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -64,13 +64,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike 
with Matchers {
           new Configuration(),
           new SerializedValue(new ExecutionConfig()),
           AkkaUtils.getDefaultTimeout,
-          new NoRestartStrategy())
+          new NoRestartStrategy(),
+          scheduler)
 
         eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
 
         eg.getState should equal(JobStatus.CREATED)
 
-        eg.scheduleForExecution(scheduler)
+        eg.scheduleForExecution()
         eg.getState should equal(JobStatus.RUNNING)
 
         instance1.markDead()

http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index e77cbb3..2e05101 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -148,6 +149,7 @@ public class RescalePartitionerTest extends TestLogger {
                        new NoRestartStrategy(),
                        new ArrayList<BlobKey>(),
                        new ArrayList<URL>(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()),
                        ExecutionGraph.class.getClassLoader(),
                        new UnregisteredMetricsGroup());
                try {

Reply via email to