[FLINK-4459] [distributed runtime] Introduce SlotProvider for Scheduler

This closes #2424


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e40f590
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e40f590
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e40f590

Branch: refs/heads/flip-6
Commit: 6e40f59015dcdb8529691318e8f5a33e831252b8
Parents: 502a79d
Author: Kurt Young <[email protected]>
Authored: Fri Aug 26 17:51:40 2016 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Sun Sep 4 23:09:59 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  15 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  32 +-
 .../executiongraph/ExecutionJobVertex.java      |   7 +-
 .../runtime/executiongraph/ExecutionVertex.java |   8 +-
 .../flink/runtime/instance/SlotProvider.java    |  48 +++
 .../runtime/jobmanager/scheduler/Scheduler.java |  27 +-
 .../ExecutionGraphMetricsTest.java              |   8 +-
 .../ExecutionVertexSchedulingTest.java          |  28 +-
 .../TerminalStateDeadlockTest.java              |  27 +-
 .../ScheduleWithCoLocationHintTest.java         | 303 +++++++++++--------
 .../scheduler/SchedulerIsolatedTasksTest.java   |  45 ++-
 .../scheduler/SchedulerSlotSharingTest.java     | 230 +++++++-------
 12 files changed, 430 insertions(+), 348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 846df49..6826365 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -34,13 +34,13 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import 
org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -271,15 +271,15 @@ public class Execution {
         *       to be scheduled immediately and no resource is available. If 
the task is accepted by the schedule, any
         *       error sets the vertex state to failed and triggers the 
recovery logic.
         * 
-        * @param scheduler The scheduler to use to schedule this execution 
attempt.
+        * @param slotProvider The slot provider to use to allocate slot for 
this execution attempt.
         * @param queued Flag to indicate whether the scheduler may queue this 
task if it cannot
         *               immediately deploy it.
         * 
         * @throws IllegalStateException Thrown, if the vertex is not in 
CREATED state, which is the only state that permits scheduling.
         * @throws NoResourceAvailableException Thrown is no queued scheduling 
is allowed and no resources are currently available.
         */
-       public boolean scheduleForExecution(Scheduler scheduler, boolean 
queued) throws NoResourceAvailableException {
-               if (scheduler == null) {
+       public boolean scheduleForExecution(SlotProvider slotProvider, boolean 
queued) throws NoResourceAvailableException {
+               if (slotProvider == null) {
                        throw new IllegalArgumentException("Cannot send null 
Scheduler when scheduling execution.");
                }
 
@@ -299,9 +299,8 @@ public class Execution {
 
                        // IMPORTANT: To prevent leaks of cluster resources, we 
need to make sure that slots are returned
                        //     in all cases where the deployment failed. we use 
many try {} finally {} clauses to assure that
+                       final SlotAllocationFuture future = 
slotProvider.allocateSlot(toSchedule, queued);
                        if (queued) {
-                               SlotAllocationFuture future = 
scheduler.scheduleQueued(toSchedule);
-
                                future.setFutureAction(new 
SlotAllocationFutureAction() {
                                        @Override
                                        public void slotAllocated(SimpleSlot 
slot) {
@@ -319,7 +318,7 @@ public class Execution {
                                });
                        }
                        else {
-                               SimpleSlot slot = 
scheduler.scheduleImmediately(toSchedule);
+                               SimpleSlot slot = future.get();
                                try {
                                        deployToSlot(slot);
                                }
@@ -560,7 +559,7 @@ public class Execution {
                                        public Boolean call() throws Exception {
                                                try {
                                                        
consumerVertex.scheduleForExecution(
-                                                                       
consumerVertex.getExecutionGraph().getScheduler(),
+                                                                       
consumerVertex.getExecutionGraph().getSlotProvider(),
                                                                        
consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed());
                                                } catch (Throwable t) {
                                                        consumerVertex.fail(new 
IllegalStateException("Could not schedule consumer " +

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 92cab41..585e9f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -47,7 +48,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
@@ -197,8 +197,8 @@ public class ExecutionGraph {
 
        // ------ Fields that are relevant to the execution and need to be 
cleared before archiving  -------
 
-       /** The scheduler to use for scheduling new tasks as they are needed */
-       private Scheduler scheduler;
+       /** The slot provider to use for allocating slots for tasks as they are 
needed */
+       private SlotProvider slotProvider;
 
        /** Strategy to use for restarts */
        private RestartStrategy restartStrategy;
@@ -470,8 +470,8 @@ public class ExecutionGraph {
                return jsonPlan;
        }
 
-       public Scheduler getScheduler() {
-               return scheduler;
+       public SlotProvider getSlotProvider() {
+               return slotProvider;
        }
 
        public JobID getJobID() {
@@ -670,17 +670,17 @@ public class ExecutionGraph {
                }
        }
 
-       public void scheduleForExecution(Scheduler scheduler) throws 
JobException {
-               if (scheduler == null) {
+       public void scheduleForExecution(SlotProvider slotProvider) throws 
JobException {
+               if (slotProvider == null) {
                        throw new IllegalArgumentException("Scheduler must not 
be null.");
                }
 
-               if (this.scheduler != null && this.scheduler != scheduler) {
-                       throw new IllegalArgumentException("Cannot use 
different schedulers for the same job");
+               if (this.slotProvider != null && this.slotProvider != 
slotProvider) {
+                       throw new IllegalArgumentException("Cannot use 
different slot providers for the same job");
                }
 
                if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
-                       this.scheduler = scheduler;
+                       this.slotProvider = slotProvider;
 
                        switch (scheduleMode) {
 
@@ -688,14 +688,14 @@ public class ExecutionGraph {
                                        // simply take the vertices without 
inputs.
                                        for (ExecutionJobVertex ejv : 
this.tasks.values()) {
                                                if 
(ejv.getJobVertex().isInputVertex()) {
-                                                       
ejv.scheduleAll(scheduler, allowQueuedScheduling);
+                                                       
ejv.scheduleAll(slotProvider, allowQueuedScheduling);
                                                }
                                        }
                                        break;
 
                                case EAGER:
                                        for (ExecutionJobVertex ejv : 
getVerticesTopologically()) {
-                                               ejv.scheduleAll(scheduler, 
allowQueuedScheduling);
+                                               ejv.scheduleAll(slotProvider, 
allowQueuedScheduling);
                                        }
                                        break;
 
@@ -850,8 +850,8 @@ public class ExecutionGraph {
                                        throw new IllegalStateException("Can 
only restart job from state restarting.");
                                }
 
-                               if (scheduler == null) {
-                                       throw new IllegalStateException("The 
execution graph has not been scheduled before - scheduler is null.");
+                               if (slotProvider == null) {
+                                       throw new IllegalStateException("The 
execution graph has not been scheduled before - slotProvider is null.");
                                }
 
                                this.currentExecutions.clear();
@@ -885,7 +885,7 @@ public class ExecutionGraph {
                                }
                        }
 
-                       scheduleForExecution(scheduler);
+                       scheduleForExecution(slotProvider);
                }
                catch (Throwable t) {
                        fail(t);
@@ -917,7 +917,7 @@ public class ExecutionGraph {
 
                // clear the non-serializable fields
                restartStrategy = null;
-               scheduler = null;
+               slotProvider = null;
                checkpointCoordinator = null;
                executionContext = null;
                kvStateLocationRegistry = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index d3dc8fe..1ac9522 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -37,7 +38,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.Preconditions;
@@ -289,12 +289,13 @@ public class ExecutionJobVertex {
        //  Actions
        
//---------------------------------------------------------------------------------------------
        
-       public void scheduleAll(Scheduler scheduler, boolean queued) throws 
NoResourceAvailableException {
+       public void scheduleAll(SlotProvider slotProvider, boolean queued) 
throws NoResourceAvailableException {
+               
                ExecutionVertex[] vertices = this.taskVertices;
 
                // kick off the tasks
                for (ExecutionVertex ev : vertices) {
-                       ev.scheduleForExecution(scheduler, queued);
+                       ev.scheduleForExecution(slotProvider, queued);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 88e1b88..a8d5ee4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -40,12 +41,11 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.slf4j.Logger;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -443,8 +443,8 @@ public class ExecutionVertex {
                }
        }
 
-       public boolean scheduleForExecution(Scheduler scheduler, boolean 
queued) throws NoResourceAvailableException {
-               return this.currentExecution.scheduleForExecution(scheduler, 
queued);
+       public boolean scheduleForExecution(SlotProvider slotProvider, boolean 
queued) throws NoResourceAvailableException {
+               return this.currentExecution.scheduleForExecution(slotProvider, 
queued);
        }
 
        public void deployToSlot(SimpleSlot slot) throws JobException {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
new file mode 100644
index 0000000..b2c23a5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
+
+/**
+ * The slot provider is responsible for preparing slots for ready-to-run tasks.
+ * 
+ * <p>It supports two allocating modes:
+ * <ul>
+ *     <li>Immediate allocating: A request for a task slot immediately gets 
satisfied, we can call
+ *         {@link SlotAllocationFuture#get()} to get the allocated slot.</li>
+ *     <li>Queued allocating: A request for a task slot is queued and returns 
a future that will be
+ *         fulfilled as soon as a slot becomes available.</li>
+ * </ul>
+ */
+public interface SlotProvider {
+
+       /**
+        * Allocating slot with specific requirement.
+        *
+        * @param task         The task to allocate the slot for
+        * @param allowQueued  Whether allow the task be queued if we do not 
have enough resource
+        * @return The future of the allocation
+        * 
+        * @throws NoResourceAvailableException
+        */
+       SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean 
allowQueued) throws NoResourceAvailableException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 734972d..c9cdd00 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.instance.SharedSlot;
@@ -65,7 +66,7 @@ import scala.concurrent.ExecutionContext;
  *         fulfilled as soon as a slot becomes available.</li>
  * </ul>
  */
-public class Scheduler implements InstanceListener, SlotAvailabilityListener {
+public class Scheduler implements InstanceListener, SlotAvailabilityListener, 
SlotProvider {
 
        /** Scheduler-wide logger */
        private static final Logger LOG = 
LoggerFactory.getLogger(Scheduler.class);
@@ -129,30 +130,24 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
        // 
------------------------------------------------------------------------
        //  Scheduling
        // 
------------------------------------------------------------------------
-       
-       public SimpleSlot scheduleImmediately(ScheduledUnit task) throws 
NoResourceAvailableException {
-               Object ret = scheduleTask(task, false);
-               if (ret instanceof SimpleSlot) {
-                       return (SimpleSlot) ret;
-               }
-               else {
-                       throw new RuntimeException();
-               }
-       }
-       
-       public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws 
NoResourceAvailableException {
-               Object ret = scheduleTask(task, true);
+
+
+       @Override
+       public SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean 
allowQueued) 
+                       throws NoResourceAvailableException {
+
+               final Object ret = scheduleTask(task, allowQueued);
                if (ret instanceof SimpleSlot) {
                        return new SlotAllocationFuture((SimpleSlot) ret);
                }
-               if (ret instanceof SlotAllocationFuture) {
+               else if (ret instanceof SlotAllocationFuture) {
                        return (SlotAllocationFuture) ret;
                }
                else {
                        throw new RuntimeException();
                }
        }
-       
+
        /**
         * Returns either a {@link 
org.apache.flink.runtime.instance.SimpleSlot}, or a {@link 
SlotAllocationFuture}.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index d5520fd..aa5925f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
@@ -70,8 +71,8 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+
+import static org.mockito.Mockito.*;
 
 public class ExecutionGraphMetricsTest extends TestLogger {
 
@@ -135,7 +136,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                        
when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
                        when(simpleSlot.getRoot()).thenReturn(rootSlot);
 
-                       
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
+                       when(scheduler.allocateSlot(any(ScheduledUnit.class), 
anyBoolean()))
+                                       .thenReturn(new 
SlotAllocationFuture(simpleSlot));
 
                        
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 5e9ee33..c576ce5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -18,25 +18,29 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.Test;
 
+import org.junit.Test;
 import org.mockito.Matchers;
 
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class ExecutionVertexSchedulingTest {
 
        @Test
@@ -54,7 +58,8 @@ public class ExecutionVertexSchedulingTest {
                        assertTrue(slot.isReleased());
 
                        Scheduler scheduler = mock(Scheduler.class);
-                       
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
+                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
+                               .thenReturn(new SlotAllocationFuture(slot));
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        // try to deploy to the slot
@@ -86,7 +91,7 @@ public class ExecutionVertexSchedulingTest {
                        final SlotAllocationFuture future = new 
SlotAllocationFuture();
 
                        Scheduler scheduler = mock(Scheduler.class);
-                       
when(scheduler.scheduleQueued(Matchers.any(ScheduledUnit.class))).thenReturn(future);
+                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), 
anyBoolean())).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        // try to deploy to the slot
@@ -117,7 +122,8 @@ public class ExecutionVertexSchedulingTest {
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        Scheduler scheduler = mock(Scheduler.class);
-                       
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
+                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
+                               .thenReturn(new SlotAllocationFuture(slot));
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 
@@ -130,4 +136,4 @@ public class ExecutionVertexSchedulingTest {
                        fail(e.getMessage());
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 870ae05..4cae7c2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -54,12 +55,12 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.*;
 
 public class TerminalStateDeadlockTest {
-       
+
        private final Field stateField;
        private final Field resourceField;
        private final Field execGraphStateField;
-       private final Field execGraphSchedulerField;
-       
+       private final Field execGraphSlotProviderField;
+
        private final SimpleSlot resource;
 
 
@@ -75,8 +76,8 @@ public class TerminalStateDeadlockTest {
                        this.execGraphStateField = 
ExecutionGraph.class.getDeclaredField("state");
                        this.execGraphStateField.setAccessible(true);
 
-                       this.execGraphSchedulerField = 
ExecutionGraph.class.getDeclaredField("scheduler");
-                       this.execGraphSchedulerField.setAccessible(true);
+                       this.execGraphSlotProviderField = 
ExecutionGraph.class.getDeclaredField("slotProvider");
+                       this.execGraphSlotProviderField.setAccessible(true);
                        
                        // the dummy resource
                        ResourceID resourceId = ResourceID.generate();
@@ -96,11 +97,9 @@ public class TerminalStateDeadlockTest {
                        throw new RuntimeException();
                }
        }
-       
-        
-       
+
        // 
------------------------------------------------------------------------
-       
+
        @Test
        public void testProvokeDeadlock() {
                try {
@@ -135,7 +134,7 @@ public class TerminalStateDeadlockTest {
                                initializeExecution(e2);
 
                                execGraphStateField.set(eg, JobStatus.FAILING);
-                               execGraphSchedulerField.set(eg, scheduler);
+                               execGraphSlotProviderField.set(eg, scheduler);
                                
                                Runnable r1 = new Runnable() {
                                        @Override
@@ -173,12 +172,10 @@ public class TerminalStateDeadlockTest {
        
        static class TestExecGraph extends ExecutionGraph {
 
-               private static final long serialVersionUID = 
-7606144898417942044L;
-               
                private static final Configuration EMPTY_CONFIG = new 
Configuration();
 
                private static final FiniteDuration TIMEOUT = new 
FiniteDuration(30, TimeUnit.SECONDS);
-               
+
                private volatile boolean done;
 
                TestExecGraph(JobID jobId) throws IOException {
@@ -193,14 +190,14 @@ public class TerminalStateDeadlockTest {
                }
 
                @Override
-               public void scheduleForExecution(Scheduler scheduler) {
+               public void scheduleForExecution(SlotProvider slotProvider) {
                        // notify that we are done with the "restarting"
                        synchronized (this) {
                                done = true;
                                this.notifyAll();
                        }
                }
-               
+
                public void waitTillDone() {
                        try {
                                synchronized (this) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index eab4fea..b803702 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -63,18 +63,18 @@ public class ScheduleWithCoLocationHintTest {
                        CoLocationConstraint c6 = new CoLocationConstraint(ccg);
 
                        // schedule 4 tasks from the first vertex group
-                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1));
-                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2));
-                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3));
-                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4));
-                       SimpleSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1));
-                       SimpleSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2));
-                       SimpleSlot s7 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3));
-                       SimpleSlot s8 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5));
-                       SimpleSlot s9 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6));
-                       SimpleSlot s10 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4));
-                       SimpleSlot s11 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5));
-                       SimpleSlot s12 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6));
+                       SimpleSlot s1 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false).get();
+                       SimpleSlot s2 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false).get();
+                       SimpleSlot s3 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false).get();
+                       SimpleSlot s4 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false).get();
+                       SimpleSlot s5 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false).get();
+                       SimpleSlot s6 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false).get();
+                       SimpleSlot s7 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false).get();
+                       SimpleSlot s8 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false).get();
+                       SimpleSlot s9 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false).get();
+                       SimpleSlot s10 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false).get();
+                       SimpleSlot s11 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false).get();
+                       SimpleSlot s12 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false).get();
 
                        assertNotNull(s1);
                        assertNotNull(s2);
@@ -109,22 +109,22 @@ public class ScheduleWithCoLocationHintTest {
                        assertEquals(s4.getTaskManagerID(), 
s10.getTaskManagerID());
                        assertEquals(s8.getTaskManagerID(), 
s11.getTaskManagerID());
                        assertEquals(s9.getTaskManagerID(), 
s12.getTaskManagerID());
-                       
+
                        assertEquals(c1.getLocation(), 
s1.getTaskManagerLocation());
                        assertEquals(c2.getLocation(), 
s2.getTaskManagerLocation());
                        assertEquals(c3.getLocation(), 
s3.getTaskManagerLocation());
                        assertEquals(c4.getLocation(), 
s4.getTaskManagerLocation());
                        assertEquals(c5.getLocation(), 
s8.getTaskManagerLocation());
                        assertEquals(c6.getLocation(), 
s9.getTaskManagerLocation());
-                       
+
                        // check the scheduler's bookkeeping
                        assertEquals(0, scheduler.getNumberOfAvailableSlots());
-                       
+
                        // the first assignments are unconstrained, 
co.-scheduling is constrained
                        assertEquals(6, 
scheduler.getNumberOfLocalizedAssignments());
                        assertEquals(0, 
scheduler.getNumberOfNonLocalizedAssignments());
                        assertEquals(6, 
scheduler.getNumberOfUnconstrainedAssignments());
-                       
+
                        // release some slots, be sure that new available ones 
come up
                        s1.releaseSlot();
                        s2.releaseSlot();
@@ -135,10 +135,11 @@ public class ScheduleWithCoLocationHintTest {
                        s11.releaseSlot();
                        s12.releaseSlot();
                        assertTrue(scheduler.getNumberOfAvailableSlots() >= 1);
-                       
-                       SimpleSlot single = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)));
+
+                       SimpleSlot single = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(new 
JobVertexID(), 0, 1)), false).get();
                        assertNotNull(single);
-                       
+
                        s1.releaseSlot();
                        s2.releaseSlot();
                        s3.releaseSlot();
@@ -149,9 +150,9 @@ public class ScheduleWithCoLocationHintTest {
                        s9.releaseSlot();
                        s11.releaseSlot();
                        s12.releaseSlot();
-                       
+
                        assertEquals(5, scheduler.getNumberOfAvailableSlots());
-                       
+
                        assertEquals(6, 
scheduler.getNumberOfLocalizedAssignments());
                        assertEquals(0, 
scheduler.getNumberOfNonLocalizedAssignments());
                        assertEquals(7, 
scheduler.getNumberOfUnconstrainedAssignments());
@@ -161,7 +162,7 @@ public class ScheduleWithCoLocationHintTest {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
        public void scheduleWithIntermediateRelease() {
                try {
@@ -169,34 +170,37 @@ public class ScheduleWithCoLocationHintTest {
                        JobVertexID jid2 = new JobVertexID();
                        JobVertexID jid3 = new JobVertexID();
                        JobVertexID jid4 = new JobVertexID();
-                       
+
                        Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
-                       
+
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
-                       
+
                        scheduler.newInstanceAvailable(i1);
                        scheduler.newInstanceAvailable(i2);
-                       
+
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
-                       
+
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
                        CoLocationConstraint c1 = new CoLocationConstraint(new 
CoLocationGroup());
-                       
-                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
-                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1));
-                       
-                       SimpleSlot sSolo = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 0, 1)));
-                       
+
+                       SimpleSlot s1 = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid1, 
0, 1), sharingGroup, c1), false).get();
+                       SimpleSlot s2 = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid2, 
0, 1), sharingGroup, c1), false).get();
+
+                       SimpleSlot sSolo = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 0, 1)), false).get();
+
                        ResourceID taskManager = s1.getTaskManagerID();
-                       
+
                        s1.releaseSlot();
                        s2.releaseSlot();
                        sSolo.releaseSlot();
-                       
-                       SimpleSlot sNew = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1));
+
+                       SimpleSlot sNew = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid3, 
0, 1), sharingGroup, c1), false).get();
                        assertEquals(taskManager, sNew.getTaskManagerID());
-                       
+
                        assertEquals(2, 
scheduler.getNumberOfLocalizedAssignments());
                        assertEquals(0, 
scheduler.getNumberOfNonLocalizedAssignments());
                        assertEquals(2, 
scheduler.getNumberOfUnconstrainedAssignments());
@@ -206,41 +210,41 @@ public class ScheduleWithCoLocationHintTest {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
        public void scheduleWithReleaseNoResource() {
                try {
                        JobVertexID jid1 = new JobVertexID();
                        JobVertexID jid2 = new JobVertexID();
                        JobVertexID jid3 = new JobVertexID();
-                       
+
                        Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
-                       
+
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
-                       
+
                        scheduler.newInstanceAvailable(i1);
                        scheduler.newInstanceAvailable(i2);
-                       
+
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
-                       
+
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
                        CoLocationConstraint c1 = new CoLocationConstraint(new 
CoLocationGroup());
-                       
-                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
+
+                       SimpleSlot s1 = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid1, 
0, 1), sharingGroup, c1), false).get();
                        s1.releaseSlot();
-                       
-                       scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 1)));
-                       scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 2)));
-                       
-                       
+
+                       scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 0, 1)), false).get();
+                       scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid2, 1, 2)), false).get();
+
                        try {
-                               scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1));
+                               scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
                                fail("Scheduled even though no resource was 
available.");
                        } catch (NoResourceAvailableException e) {
                                // expected
                        }
-                       
+
                        assertEquals(0, 
scheduler.getNumberOfLocalizedAssignments());
                        assertEquals(0, 
scheduler.getNumberOfNonLocalizedAssignments());
                        assertEquals(3, 
scheduler.getNumberOfUnconstrainedAssignments());
@@ -250,7 +254,7 @@ public class ScheduleWithCoLocationHintTest {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
        public void scheduleMixedCoLocationSlotSharing() {
                try {
@@ -276,27 +280,35 @@ public class ScheduleWithCoLocationHintTest {
                        SlotSharingGroup shareGroup = new SlotSharingGroup();
 
                        // first wave
-                       scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup));
-                       scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup));
-                       scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup));
-                       scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup));
+                       scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false);
+                       scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false);
+                       scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false);
+                       scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false);
                        
                        // second wave
-                       SimpleSlot s21 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1));
-                       SimpleSlot s22 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2));
-                       SimpleSlot s23 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3));
-                       SimpleSlot s24 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4));
+                       SimpleSlot s21 = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid2, 
0, 4), shareGroup, clc1), false).get();
+                       SimpleSlot s22 = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid2, 
2, 4), shareGroup, clc2), false).get();
+                       SimpleSlot s23 = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid2, 
1, 4), shareGroup, clc3), false).get();
+                       SimpleSlot s24 = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid2, 
3, 4), shareGroup, clc4), false).get();
                        
                        // third wave
-                       SimpleSlot s31 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2));
-                       SimpleSlot s32 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3));
-                       SimpleSlot s33 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4));
-                       SimpleSlot s34 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1));
-                       
-                       scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup));
-                       scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup));
-                       scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup));
-                       scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup));
+                       SimpleSlot s31 = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid3, 
1, 4), shareGroup, clc2), false).get();
+                       SimpleSlot s32 = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid3, 
2, 4), shareGroup, clc3), false).get();
+                       SimpleSlot s33 = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid3, 
3, 4), shareGroup, clc4), false).get();
+                       SimpleSlot s34 = scheduler.allocateSlot(
+                                       new ScheduledUnit(getTestVertex(jid3, 
0, 4), shareGroup, clc1), false).get();
+                       
+                       scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false);
+                       scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false);
+                       scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false);
+                       scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false);
                        
                        assertEquals(s21.getTaskManagerID(), 
s34.getTaskManagerID());
                        assertEquals(s22.getTaskManagerID(), 
s31.getTaskManagerID());
@@ -341,20 +353,26 @@ public class ScheduleWithCoLocationHintTest {
                        CoLocationConstraint cc2 = new 
CoLocationConstraint(ccg);
 
                        // schedule something into the shared group so that 
both instances are in the sharing group
-                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup));
-                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup));
+                       SimpleSlot s1 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), 
false).get();
+                       SimpleSlot s2 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), 
false).get();
                        
                        // schedule one locally to instance 1
-                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1));
+                       SimpleSlot s3 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), 
false).get();
 
                        // schedule with co location constraint (yet 
unassigned) and a preference for
                        // instance 1, but it can only get instance 2
-                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2));
+                       SimpleSlot s4 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), 
false).get();
                        
                        // schedule something into the assigned co-location 
constraints and check that they override the
                        // other preferences
-                       SimpleSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1));
-                       SimpleSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2));
+                       SimpleSlot s5 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), 
false).get();
+                       SimpleSlot s6 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), 
false).get();
                        
                        // check that each slot got three
                        assertEquals(3, s1.getRoot().getNumberLeaves());
@@ -386,13 +404,13 @@ public class ScheduleWithCoLocationHintTest {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
        public void testSlotReleasedInBetween() {
                try {
                        JobVertexID jid1 = new JobVertexID();
                        JobVertexID jid2 = new JobVertexID();
-                       
+
                        Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
 
                        Instance i1 = getRandomInstance(1);
@@ -403,36 +421,40 @@ public class ScheduleWithCoLocationHintTest {
 
                        scheduler.newInstanceAvailable(i2);
                        scheduler.newInstanceAvailable(i1);
-                       
+
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
-                       
+
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
-                       
+
                        CoLocationGroup ccg = new CoLocationGroup();
                        CoLocationConstraint cc1 = new 
CoLocationConstraint(ccg);
                        CoLocationConstraint cc2 = new 
CoLocationConstraint(ccg);
 
-                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
-                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2));
-                       
+                       SimpleSlot s1 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), 
false).get();
+                       SimpleSlot s2 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), 
false).get();
+
                        s1.releaseSlot();
                        s2.releaseSlot();
-                       
+
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
 
-                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1));
-                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2));
-                       
+                       SimpleSlot s3 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), 
false).get();
+                       SimpleSlot s4 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), 
false).get();
+
                        // still preserves the previous instance mapping)
                        assertEquals(i1.getTaskManagerID(), 
s3.getTaskManagerID());
                        assertEquals(i2.getTaskManagerID(), 
s4.getTaskManagerID());
-                       
+
                        s3.releaseSlot();
                        s4.releaseSlot();
 
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
-                       
+
                        assertEquals(4, 
scheduler.getNumberOfLocalizedAssignments());
                        assertEquals(0, 
scheduler.getNumberOfNonLocalizedAssignments());
                        assertEquals(0, 
scheduler.getNumberOfUnconstrainedAssignments());
@@ -442,14 +464,14 @@ public class ScheduleWithCoLocationHintTest {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
        public void testSlotReleasedInBetweenAndNoNewLocal() {
                try {
                        JobVertexID jid1 = new JobVertexID();
                        JobVertexID jid2 = new JobVertexID();
                        JobVertexID jidx = new JobVertexID();
-                       
+
                        Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
 
                        Instance i1 = getRandomInstance(1);
@@ -460,41 +482,46 @@ public class ScheduleWithCoLocationHintTest {
 
                        scheduler.newInstanceAvailable(i2);
                        scheduler.newInstanceAvailable(i1);
-                       
+
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
-                       
+
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
-                       
+
                        CoLocationGroup ccg = new CoLocationGroup();
                        CoLocationConstraint cc1 = new 
CoLocationConstraint(ccg);
                        CoLocationConstraint cc2 = new 
CoLocationConstraint(ccg);
 
-                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
-                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2));
-                       
+                       SimpleSlot s1 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), 
false).get();
+                       SimpleSlot s2 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), 
false).get();
+
                        s1.releaseSlot();
                        s2.releaseSlot();
-                       
+
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
 
-                       SimpleSlot sa = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)));
-                       SimpleSlot sb = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)));
-                       
+                       SimpleSlot sa = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false).get();
+                       SimpleSlot sb = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false).get();
+
                        try {
-                               scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1));
+                               scheduler.allocateSlot(
+                                               new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), 
false);
                                fail("should not be able to find a resource");
                        } catch (NoResourceAvailableException e) {
                                // good
                        } catch (Exception e) {
                                fail("wrong exception");
                        }
-                       
+
                        sa.releaseSlot();
                        sb.releaseSlot();
 
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
-                       
+
                        assertEquals(2, 
scheduler.getNumberOfLocalizedAssignments());
                        assertEquals(0, 
scheduler.getNumberOfNonLocalizedAssignments());
                        assertEquals(2, 
scheduler.getNumberOfUnconstrainedAssignments());
@@ -504,15 +531,15 @@ public class ScheduleWithCoLocationHintTest {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
        public void testScheduleOutOfOrder() {
                try {
                        JobVertexID jid1 = new JobVertexID();
                        JobVertexID jid2 = new JobVertexID();
-                       
+
                        Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
-                       
+
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
 
@@ -520,11 +547,11 @@ public class ScheduleWithCoLocationHintTest {
 
                        scheduler.newInstanceAvailable(i2);
                        scheduler.newInstanceAvailable(i1);
-                       
+
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
-                       
+
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
-                       
+
                        CoLocationGroup ccg = new CoLocationGroup();
                        CoLocationConstraint cc1 = new 
CoLocationConstraint(ccg);
                        CoLocationConstraint cc2 = new 
CoLocationConstraint(ccg);
@@ -532,33 +559,37 @@ public class ScheduleWithCoLocationHintTest {
                        // schedule something from the second job vertex id 
before the first is filled,
                        // and give locality preferences that hint at using the 
same shared slot for both
                        // co location constraints (which we seek to prevent)
-                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
-                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2));
+                       SimpleSlot s1 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), 
false).get();
+                       SimpleSlot s2 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), 
false).get();
+
+                       SimpleSlot s3 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), 
false).get();
+                       SimpleSlot s4 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), 
false).get();
 
-                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1));
-                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2));
-                       
                        // check that each slot got three
                        assertEquals(2, s1.getRoot().getNumberLeaves());
                        assertEquals(2, s2.getRoot().getNumberLeaves());
-                       
+
                        assertEquals(s1.getTaskManagerID(), 
s3.getTaskManagerID());
                        assertEquals(s2.getTaskManagerID(), 
s4.getTaskManagerID());
-                       
+
                        // check the scheduler's bookkeeping
                        assertEquals(0, scheduler.getNumberOfAvailableSlots());
-                       
+
                        assertEquals(3, 
scheduler.getNumberOfLocalizedAssignments());
                        assertEquals(1, 
scheduler.getNumberOfNonLocalizedAssignments());
                        assertEquals(0, 
scheduler.getNumberOfUnconstrainedAssignments());
-                       
+
                        // release some slots, be sure that new available ones 
come up
                        s1.releaseSlot();
                        s2.releaseSlot();
                        s3.releaseSlot();
                        s4.releaseSlot();
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
-                       
+
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
@@ -568,15 +599,15 @@ public class ScheduleWithCoLocationHintTest {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
        public void nonColocationFollowsCoLocation() {
                try {
                        JobVertexID jid1 = new JobVertexID();
                        JobVertexID jid2 = new JobVertexID();
-                       
+
                        Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
-                       
+
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
 
@@ -585,32 +616,36 @@ public class ScheduleWithCoLocationHintTest {
 
                        scheduler.newInstanceAvailable(i2);
                        scheduler.newInstanceAvailable(i1);
-                       
+
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
                        
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
-                       
+
                        CoLocationGroup ccg = new CoLocationGroup();
                        CoLocationConstraint cc1 = new 
CoLocationConstraint(ccg);
                        CoLocationConstraint cc2 = new 
CoLocationConstraint(ccg);
 
-                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
-                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2));
+                       SimpleSlot s1 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), 
false).get();
+                       SimpleSlot s2 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), 
false).get();
+
+                       SimpleSlot s3 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), 
false).get();
+                       SimpleSlot s4 = scheduler.allocateSlot(
+                                       new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), 
false).get();
 
-                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup));
-                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup));
-                       
                        // check that each slot got two
                        assertEquals(2, s1.getRoot().getNumberLeaves());
                        assertEquals(2, s2.getRoot().getNumberLeaves());
-                       
+
                        s1.releaseSlot();
                        s2.releaseSlot();
                        s3.releaseSlot();
                        s4.releaseSlot();
-                       
+
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
-                       
+
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
                        assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 25498c4..d78f551 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -35,7 +35,6 @@ import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.a
 import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getDummyTask;
 import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
 import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -122,17 +121,17 @@ public class SchedulerIsolatedTasksTest {
                        assertEquals(5, scheduler.getNumberOfAvailableSlots());
                        
                        // schedule something into all slots
-                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
-                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
-                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
-                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
-                       SimpleSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
+                       SimpleSlot s1 = scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get();
+                       SimpleSlot s2 = scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get();
+                       SimpleSlot s3 = scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get();
+                       SimpleSlot s4 = scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get();
+                       SimpleSlot s5 = scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get();
                        
                        // the slots should all be different
                        assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
                        
                        try {
-                               scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
+                               scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false);
                                fail("Scheduler accepted scheduling request 
without available resource.");
                        }
                        catch (NoResourceAvailableException e) {
@@ -145,8 +144,8 @@ public class SchedulerIsolatedTasksTest {
                        assertEquals(2, scheduler.getNumberOfAvailableSlots());
                        
                        // now we can schedule some more slots
-                       SimpleSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
-                       SimpleSlot s7 = scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
+                       SimpleSlot s6 = scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get();
+                       SimpleSlot s7 = scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get();
                        
                        assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
                        
@@ -245,7 +244,7 @@ public class SchedulerIsolatedTasksTest {
                        disposeThread.start();
 
                        for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
-                               SlotAllocationFuture future = 
scheduler.scheduleQueued(new ScheduledUnit(getDummyTask()));
+                               SlotAllocationFuture future = 
scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
                                future.setFutureAction(action);
                                allAllocatedSlots.add(future);
                        }
@@ -287,11 +286,11 @@ public class SchedulerIsolatedTasksTest {
                        scheduler.newInstanceAvailable(i3);
                        
                        List<SimpleSlot> slots = new ArrayList<SimpleSlot>();
-                       slots.add(scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask())));
-                       slots.add(scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask())));
-                       slots.add(scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask())));
-                       slots.add(scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask())));
-                       slots.add(scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask())));
+                       slots.add(scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get());
+                       slots.add(scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get());
+                       slots.add(scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get());
+                       slots.add(scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get());
+                       slots.add(scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get());
                        
                        i2.markDead();
                        
@@ -312,7 +311,7 @@ public class SchedulerIsolatedTasksTest {
                        
                        // cannot get another slot, since all instances are dead
                        try {
-                               scheduler.scheduleImmediately(new 
ScheduledUnit(getDummyTask()));
+                               scheduler.allocateSlot(new 
ScheduledUnit(getDummyTask()), false).get();
                                fail("Scheduler served a slot from a dead 
instance");
                        }
                        catch (NoResourceAvailableException e) {
@@ -347,7 +346,7 @@ public class SchedulerIsolatedTasksTest {
                        scheduler.newInstanceAvailable(i3);
                        
                        // schedule something on an arbitrary instance
-                       SimpleSlot s1 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(new Instance[0])));
+                       SimpleSlot s1 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(new Instance[0])), false).get();
                        
                        // figure out how we use the location hints
                        Instance first = (Instance) s1.getOwner();
@@ -355,28 +354,28 @@ public class SchedulerIsolatedTasksTest {
                        Instance third = first == i3 ? i2 : i3;
                        
                        // something that needs to go to the first instance 
again
-                       SimpleSlot s2 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())));
+                       SimpleSlot s2 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false).get();
                        assertEquals(first, s2.getOwner());
 
                        // first or second --> second, because first is full
-                       SimpleSlot s3 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(first, second)));
+                       SimpleSlot s3 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(first, second)), false).get();
                        assertEquals(second, s3.getOwner());
                        
                        // first or third --> third (because first is full)
-                       SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(first, third)));
-                       SimpleSlot s5 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(first, third)));
+                       SimpleSlot s4 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(first, third)), false).get();
+                       SimpleSlot s5 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(first, third)), false).get();
                        assertEquals(third, s4.getOwner());
                        assertEquals(third, s5.getOwner());
                        
                        // first or third --> second, because all others are 
full
-                       SimpleSlot s6 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(first, third)));
+                       SimpleSlot s6 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(first, third)), false).get();
                        assertEquals(second, s6.getOwner());
                        
                        // release something on the first and second instance
                        s2.releaseSlot();
                        s6.releaseSlot();
                        
-                       SimpleSlot s7 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertex(first, third)));
+                       SimpleSlot s7 = scheduler.allocateSlot(new 
ScheduledUnit(getTestVertex(first, third)), false).get();
                        assertEquals(first, s7.getOwner());
                        
                        assertEquals(1, 
scheduler.getNumberOfUnconstrainedAssignments());

Reply via email to