[FLINK-7153] Introduce LocationPreferenceConstraint for scheduling

The LocationPreferenceConstraint defines whether all or any preferred locations
have to be taken into consideration when scheduling tasks. Especially for batch
jobs where we do lazy scheduling not all input locations might be known for a
consumer task. Therefore, we set the location preference constraint to any which
means that only those location are taken into consideration which are known at
scheduling time.

[FLINK-7153] Add test cases

Replace AtomicReference with AtomicReferenceUpdater

Fix

Use static imports Preconditions.checkNotNull

Initialize ANY array with number of returned futures

Revert formatting changes in Scheduler

Set flink-runtime log level in log4j-test.properties to OFF

Revert changes to ExecutionVertex#getPreferredLocationsBasedOnInputs

Fix failing FailoverRegionTest

This closes #4916.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b0fb26b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b0fb26b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b0fb26b

Branch: refs/heads/master
Commit: 3b0fb26bfb779a98f8dcadbb4a7ba206e11f9c2c
Parents: c73b2fe
Author: Till Rohrmann <[email protected]>
Authored: Fri Oct 27 09:47:03 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu Nov 2 17:04:45 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 152 +++++++---
 .../runtime/executiongraph/ExecutionGraph.java  |  13 +-
 .../executiongraph/ExecutionJobVertex.java      |  29 +-
 .../runtime/executiongraph/ExecutionVertex.java |  41 ++-
 .../executiongraph/failover/FailoverRegion.java |   6 +-
 .../apache/flink/runtime/instance/SlotPool.java |  12 +-
 .../flink/runtime/instance/SlotProvider.java    |   8 +-
 .../scheduler/LocationPreferenceConstraint.java |  32 +++
 .../runtime/jobmanager/scheduler/Scheduler.java |  84 +++---
 .../ExecutionGraphDeploymentTest.java           | 107 +++++++
 .../ExecutionGraphMetricsTest.java              |   3 +-
 .../ExecutionGraphSchedulingTest.java           |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  14 +-
 .../runtime/executiongraph/ExecutionTest.java   | 286 +++++++++++++++++++
 .../ExecutionVertexCancelTest.java              |   5 +-
 .../ExecutionVertexDeploymentTest.java          |   4 +-
 .../ExecutionVertexSchedulingTest.java          |  15 +-
 .../executiongraph/FailoverRegionTest.java      |   9 +-
 .../executiongraph/ProgrammedSlotProvider.java  |  29 +-
 .../utils/SimpleSlotProvider.java               |   6 +-
 .../ScheduleWithCoLocationHintTest.java         | 121 ++++----
 .../scheduler/SchedulerIsolatedTasksTest.java   |  46 +--
 .../scheduler/SchedulerSlotSharingTest.java     | 223 ++++++++-------
 .../scheduler/SchedulerTestUtils.java           |  15 +-
 .../taskmanager/LocalTaskManagerLocation.java   |  35 +++
 25 files changed, 972 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 9d3e128..38c3821 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.time.Time;
@@ -37,6 +38,7 @@ import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -45,11 +47,12 @@ import 
org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -58,7 +61,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
@@ -96,6 +98,11 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        private static final AtomicReferenceFieldUpdater<Execution, 
ExecutionState> STATE_UPDATER =
                        AtomicReferenceFieldUpdater.newUpdater(Execution.class, 
ExecutionState.class, "state");
 
+       private static final AtomicReferenceFieldUpdater<Execution, SimpleSlot> 
ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+               Execution.class,
+               SimpleSlot.class,
+               "assignedResource");
+
        private static final Logger LOG = ExecutionGraph.LOG;
 
        private static final int NUM_CANCEL_CALL_TRIES = 3;
@@ -134,7 +141,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
        private volatile ExecutionState state = CREATED;
 
-       private final AtomicReference<SimpleSlot> assignedResource;
+       private volatile SimpleSlot assignedResource;
 
        private volatile Throwable failureCause;          // once assigned, 
never changes
 
@@ -193,7 +200,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                this.terminationFuture = new CompletableFuture<>();
                this.taskManagerLocationFuture = new CompletableFuture<>();
 
-               this.assignedResource = new AtomicReference<>();
+               this.assignedResource = null;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -234,7 +241,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        }
 
        public SimpleSlot getAssignedResource() {
-               return assignedResource.get();
+               return assignedResource;
        }
 
        /**
@@ -244,22 +251,23 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * @param slot to assign to this execution
         * @return true if the slot could be assigned to the execution, 
otherwise false
         */
+       @VisibleForTesting
        boolean tryAssignResource(final SimpleSlot slot) {
-               Preconditions.checkNotNull(slot);
+               checkNotNull(slot);
 
                // only allow to set the assigned resource in state SCHEDULED 
or CREATED
                // note: we also accept resource assignment when being in state 
CREATED for testing purposes
                if (state == SCHEDULED || state == CREATED) {
-                       if (assignedResource.compareAndSet(null, slot)) {
+                       if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, 
slot)) {
                                // check for concurrent modification (e.g. 
cancelling call)
                                if (state == SCHEDULED || state == CREATED) {
-                                       
Preconditions.checkState(!taskManagerLocationFuture.isDone(), "The 
TaskManagerLocationFuture should not be set if we haven't assigned a resource 
yet.");
+                                       
checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture 
should not be set if we haven't assigned a resource yet.");
                                        
taskManagerLocationFuture.complete(slot.getTaskManagerLocation());
 
                                        return true;
                                } else {
                                        // free assigned resource and return 
false
-                                       assignedResource.set(null);
+                                       ASSIGNED_SLOT_UPDATER.set(this, null);
                                        return false;
                                }
                        } else {
@@ -275,7 +283,8 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        @Override
        public TaskManagerLocation getAssignedResourceLocation() {
                // returns non-null only when a location is already assigned
-               return assignedResource.get() != null ? 
assignedResource.get().getTaskManagerLocation() : null;
+               final SimpleSlot currentAssignedResource = assignedResource;
+               return currentAssignedResource != null ? 
currentAssignedResource.getTaskManagerLocation() : null;
        }
 
        public Throwable getFailureCause() {
@@ -333,7 +342,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        public boolean scheduleForExecution() {
                SlotProvider resourceProvider = 
getVertex().getExecutionGraph().getSlotProvider();
                boolean allowQueued = 
getVertex().getExecutionGraph().isQueuedSchedulingAllowed();
-               return scheduleForExecution(resourceProvider, allowQueued);
+               return scheduleForExecution(resourceProvider, allowQueued, 
LocationPreferenceConstraint.ANY);
        }
 
        /**
@@ -344,12 +353,19 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * @param slotProvider The slot provider to use to allocate slot for 
this execution attempt.
         * @param queued Flag to indicate whether the scheduler may queue this 
task if it cannot
         *               immediately deploy it.
+        * @param locationPreferenceConstraint constraint for the location 
preferences
         * 
         * @throws IllegalStateException Thrown, if the vertex is not in 
CREATED state, which is the only state that permits scheduling.
         */
-       public boolean scheduleForExecution(SlotProvider slotProvider, boolean 
queued) {
+       public boolean scheduleForExecution(
+               SlotProvider slotProvider,
+               boolean queued,
+               LocationPreferenceConstraint locationPreferenceConstraint) {
                try {
-                       final CompletableFuture<Execution> allocationFuture = 
allocateAndAssignSlotForExecution(slotProvider, queued);
+                       final CompletableFuture<Execution> allocationFuture = 
allocateAndAssignSlotForExecution(
+                               slotProvider,
+                               queued,
+                               locationPreferenceConstraint);
 
                        // IMPORTANT: We have to use the synchronous handle 
operation (direct executor) here so
                        // that we directly deploy the tasks if the slot 
allocation future is completed. This is
@@ -387,11 +403,15 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         *
         * @param slotProvider to obtain a new slot from
         * @param queued if the allocation can be queued
+        * @param locationPreferenceConstraint constraint for the location 
preferences
         * @return Future which is completed with this execution once the slot 
has been assigned
         *                      or with an exception if an error occurred.
         * @throws IllegalExecutionStateException if this method has been 
called while not being in the CREATED state
         */
-       public CompletableFuture<Execution> 
allocateAndAssignSlotForExecution(SlotProvider slotProvider, boolean queued) 
throws IllegalExecutionStateException {
+       public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
+                       SlotProvider slotProvider,
+                       boolean queued,
+                       LocationPreferenceConstraint 
locationPreferenceConstraint) throws IllegalExecutionStateException {
 
                checkNotNull(slotProvider);
 
@@ -411,18 +431,27 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                        new ScheduledUnit(this, sharingGroup) :
                                        new ScheduledUnit(this, sharingGroup, 
locationConstraint);
 
-                       CompletableFuture<SimpleSlot> slotFuture = 
slotProvider.allocateSlot(toSchedule, queued);
-
-                       return slotFuture.thenApply(slot -> {
-                               if (tryAssignResource(slot)) {
-                                       return this;
-                               } else {
-                                       // release the slot
-                                       slot.releaseSlot();
+                       // calculate the preferred locations
+                       final 
CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = 
calculatePreferredLocations(locationPreferenceConstraint);
+
+                       return preferredLocationsFuture
+                               .thenCompose(
+                                       (Collection<TaskManagerLocation> 
preferredLocations) ->
+                                               slotProvider.allocateSlot(
+                                                       toSchedule,
+                                                       queued,
+                                                       preferredLocations))
+                               .thenApply(
+                                       (SimpleSlot slot) -> {
+                                               if (tryAssignResource(slot)) {
+                                                       return this;
+                                               } else {
+                                                       // release the slot
+                                                       slot.releaseSlot();
 
-                                       throw new CompletionException(new 
FlinkException("Could not assign slot " + slot + " to execution " + this + " 
because it has already been assigned "));
-                               }
-                       });
+                                                       throw new 
CompletionException(new FlinkException("Could not assign slot " + slot + " to 
execution " + this + " because it has already been assigned "));
+                                               }
+                                       });
                }
                else {
                        // call race, already deployed, or already done
@@ -436,7 +465,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * @throws JobException if the execution cannot be deployed to the 
assigned resource
         */
        public void deploy() throws JobException {
-               final SimpleSlot slot  = assignedResource.get();
+               final SimpleSlot slot  = assignedResource;
 
                checkNotNull(slot, "In order to deploy the execution we first 
have to assign a resource via tryAssignResource.");
 
@@ -516,7 +545,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * Sends stop RPC call.
         */
        public void stop() {
-               final SimpleSlot slot = assignedResource.get();
+               final SimpleSlot slot = assignedResource;
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -579,7 +608,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                        try {
                                                
vertex.getExecutionGraph().deregisterExecution(this);
 
-                                               final SimpleSlot slot = 
assignedResource.get();
+                                               final SimpleSlot slot = 
assignedResource;
 
                                                if (slot != null) {
                                                        slot.releaseSlot();
@@ -640,8 +669,9 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                        () -> {
                                                try {
                                                        
consumerVertex.scheduleForExecution(
-                                                                       
consumerVertex.getExecutionGraph().getSlotProvider(),
-                                                                       
consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed());
+                                                               
consumerVertex.getExecutionGraph().getSlotProvider(),
+                                                               
consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed(),
+                                                               
LocationPreferenceConstraint.ANY); // there must be at least one known location
                                                } catch (Throwable t) {
                                                        consumerVertex.fail(new 
IllegalStateException("Could not schedule consumer " +
                                                                        "vertex 
" + consumerVertex, t));
@@ -748,7 +778,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        int maxStrackTraceDepth,
                        Time timeout) {
 
-               final SimpleSlot slot = assignedResource.get();
+               final SimpleSlot slot = assignedResource;
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -772,7 +802,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * @param timestamp of the completed checkpoint
         */
        public void notifyCheckpointComplete(long checkpointId, long timestamp) 
{
-               final SimpleSlot slot = assignedResource.get();
+               final SimpleSlot slot = assignedResource;
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -792,7 +822,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * @param checkpointOptions of the checkpoint to trigger
         */
        public void triggerCheckpoint(long checkpointId, long timestamp, 
CheckpointOptions checkpointOptions) {
-               final SimpleSlot slot = assignedResource.get();
+               final SimpleSlot slot = assignedResource;
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -850,7 +880,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
                                                
updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
-                                               final SimpleSlot slot = 
assignedResource.get();
+                                               final SimpleSlot slot = 
assignedResource;
 
                                                if (slot != null) {
                                                        slot.releaseSlot();
@@ -908,7 +938,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
                                if (transitionState(current, CANCELED)) {
                                        try {
-                                               final SimpleSlot slot = 
assignedResource.get();
+                                               final SimpleSlot slot = 
assignedResource;
 
                                                if (slot != null) {
                                                        slot.releaseSlot();
@@ -1005,7 +1035,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                updateAccumulatorsAndMetrics(userAccumulators, 
metrics);
 
                                try {
-                                       final SimpleSlot slot = 
assignedResource.get();
+                                       final SimpleSlot slot = 
assignedResource;
                                        if (slot != null) {
                                                slot.releaseSlot();
                                        }
@@ -1022,7 +1052,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                        }
 
                                        try {
-                                               if (assignedResource.get() != 
null) {
+                                               if (assignedResource != null) {
                                                        sendCancelRpcCall();
                                                }
                                        } catch (Throwable tt) {
@@ -1089,7 +1119,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
         */
        private void sendCancelRpcCall() {
-               final SimpleSlot slot = assignedResource.get();
+               final SimpleSlot slot = assignedResource;
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -1110,7 +1140,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        }
 
        private void sendFailIntermediateResultPartitionsRpcCall() {
-               final SimpleSlot slot = assignedResource.get();
+               final SimpleSlot slot = assignedResource;
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -1128,7 +1158,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        private void sendUpdatePartitionInfoRpcCall(
                        final Iterable<PartitionInfo> partitionInfos) {
 
-               final SimpleSlot slot = assignedResource.get();
+               final SimpleSlot slot = assignedResource;
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -1151,6 +1181,46 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        //  Miscellaneous
        // 
--------------------------------------------------------------------------------------------
 
+       /**
+        * Calculates the preferred locations based on the location preference 
constraint.
+        *
+        * @param locationPreferenceConstraint constraint for the location 
preference
+        * @return Future containing the collection of preferred locations. 
This might not be completed if not all inputs
+        *              have been a resource assigned.
+        */
+       @VisibleForTesting
+       public CompletableFuture<Collection<TaskManagerLocation>> 
calculatePreferredLocations(LocationPreferenceConstraint 
locationPreferenceConstraint) {
+               final Collection<CompletableFuture<TaskManagerLocation>> 
preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
+               final CompletableFuture<Collection<TaskManagerLocation>> 
preferredLocationsFuture;
+
+               switch(locationPreferenceConstraint) {
+                       case ALL:
+                               preferredLocationsFuture = 
FutureUtils.combineAll(preferredLocationFutures);
+                               break;
+                       case ANY:
+                               final ArrayList<TaskManagerLocation> 
completedTaskManagerLocations = new 
ArrayList<>(preferredLocationFutures.size());
+
+                               for (CompletableFuture<TaskManagerLocation> 
preferredLocationFuture : preferredLocationFutures) {
+                                       if (preferredLocationFuture.isDone() && 
!preferredLocationFuture.isCompletedExceptionally()) {
+                                               final TaskManagerLocation 
taskManagerLocation = preferredLocationFuture.getNow(null);
+
+                                               if (taskManagerLocation == 
null) {
+                                                       throw new 
FlinkRuntimeException("TaskManagerLocationFuture was completed with null. This 
indicates a programming bug.");
+                                               }
+
+                                               
completedTaskManagerLocations.add(taskManagerLocation);
+                                       }
+                               }
+
+                               preferredLocationsFuture = 
CompletableFuture.completedFuture(completedTaskManagerLocations);
+                               break;
+                       default:
+                               throw new RuntimeException("Unknown 
LocationPreferenceConstraint " + locationPreferenceConstraint + '.');
+               }
+
+               return preferredLocationsFuture;
+       }
+
        private boolean transitionState(ExecutionState currentState, 
ExecutionState targetState) {
                return transitionState(currentState, targetState, null);
        }
@@ -1248,7 +1318,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        
        @Override
        public String toString() {
-               final SimpleSlot slot = assignedResource.get();
+               final SimpleSlot slot = assignedResource;
 
                return String.format("Attempt #%d (%s) @ %s - [%s]", 
attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
                                (slot == null ? "(unassigned)" : slot), state);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 62c6e99..8a74001 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -853,11 +854,14 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                }
        }
 
-       private void scheduleLazy(SlotProvider slotProvider) throws 
NoResourceAvailableException {
+       private void scheduleLazy(SlotProvider slotProvider) {
                // simply take the vertices without inputs.
                for (ExecutionJobVertex ejv : verticesInCreationOrder) {
                        if (ejv.getJobVertex().isInputVertex()) {
-                               ejv.scheduleAll(slotProvider, 
allowQueuedScheduling);
+                               ejv.scheduleAll(
+                                       slotProvider,
+                                       allowQueuedScheduling,
+                                       LocationPreferenceConstraint.ALL); // 
since it is an input vertex, the input based location preferences should be 
empty
                        }
                }
        }
@@ -884,7 +888,10 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                // allocate the slots (obtain all their futures
                for (ExecutionJobVertex ejv : getVerticesTopologically()) {
                        // these calls are not blocking, they only return 
futures
-                       Collection<CompletableFuture<Execution>> 
allocationFutures = ejv.allocateResourcesForAll(slotProvider, queued);
+                       Collection<CompletableFuture<Execution>> 
allocationFutures = ejv.allocateResourcesForAll(
+                               slotProvider,
+                               queued,
+                               LocationPreferenceConstraint.ALL);
 
                        allAllocationFutures.addAll(allocationFutures);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 98191d0..fff7ce1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.types.Either;
@@ -455,14 +456,24 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
        
//---------------------------------------------------------------------------------------------
        //  Actions
        
//---------------------------------------------------------------------------------------------
-       
-       public void scheduleAll(SlotProvider slotProvider, boolean queued) {
+
+       /**
+        * Schedules all execution vertices of this ExecutionJobVertex.
+        *
+        * @param slotProvider to allocate the slots from
+        * @param queued if the allocations can be queued
+        * @param locationPreferenceConstraint constraint for the location 
preferences
+        */
+       public void scheduleAll(
+                       SlotProvider slotProvider,
+                       boolean queued,
+                       LocationPreferenceConstraint 
locationPreferenceConstraint) {
                
                final ExecutionVertex[] vertices = this.taskVertices;
 
                // kick off the tasks
                for (ExecutionVertex ev : vertices) {
-                       ev.scheduleForExecution(slotProvider, queued);
+                       ev.scheduleForExecution(slotProvider, queued, 
locationPreferenceConstraint);
                }
        }
 
@@ -474,8 +485,13 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
         * <p>If this method throws an exception, it makes sure to release all 
so far requested slots.
         * 
         * @param resourceProvider The resource provider from whom the slots 
are requested.
+        * @param queued if the allocation can be queued
+        * @param locationPreferenceConstraint constraint for the location 
preferences
         */
-       public Collection<CompletableFuture<Execution>> 
allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
+       public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
+                       SlotProvider resourceProvider,
+                       boolean queued,
+                       LocationPreferenceConstraint 
locationPreferenceConstraint) {
                final ExecutionVertex[] vertices = this.taskVertices;
                final CompletableFuture<Execution>[] slots = new 
CompletableFuture[vertices.length];
 
@@ -484,7 +500,10 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                for (int i = 0; i < vertices.length; i++) {
                        // allocate the next slot (future)
                        final Execution exec = 
vertices[i].getCurrentExecutionAttempt();
-                       final CompletableFuture<Execution> allocationFuture = 
exec.allocateAndAssignSlotForExecution(resourceProvider, queued);
+                       final CompletableFuture<Execution> allocationFuture = 
exec.allocateAndAssignSlotForExecution(
+                               resourceProvider,
+                               queued,
+                               locationPreferenceConstraint);
                        slots[i] = allocationFuture;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e87a5a0..6d45d06 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
@@ -487,7 +488,8 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                        return Collections.emptySet();
                }
                else {
-                       Set<CompletableFuture<TaskManagerLocation>> 
inputLocations = new HashSet<>(4);
+                       Set<CompletableFuture<TaskManagerLocation>> locations = 
new HashSet<>(getTotalNumberOfParallelSubtasks());
+                       Set<CompletableFuture<TaskManagerLocation>> 
inputLocations = new HashSet<>(getTotalNumberOfParallelSubtasks());
 
                        // go over all inputs
                        for (int i = 0; i < inputEdges.length; i++) {
@@ -497,17 +499,26 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                                        // go over all input sources
                                        for (int k = 0; k < sources.length; 
k++) {
                                                // look-up assigned slot of 
input source
-                                               
CompletableFuture<TaskManagerLocation> taskManagerLocationFuture = 
sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
-                                               
inputLocations.add(taskManagerLocationFuture);
-
+                                               
CompletableFuture<TaskManagerLocation> locationFuture = 
sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
+                                               // add input location
+                                               
inputLocations.add(locationFuture);
+                                               // inputs which have too many 
distinct sources are not considered
                                                if (inputLocations.size() > 
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-                                                       return 
Collections.emptyList();
+                                                       inputLocations.clear();
+                                                       break;
                                                }
                                        }
                                }
+                               // keep the locations of the input with the 
least preferred locations
+                               if (locations.isEmpty() || // nothing assigned 
yet
+                                               (!inputLocations.isEmpty() && 
inputLocations.size() < locations.size())) {
+                                       // current input has fewer preferred 
locations
+                                       locations.clear();
+                                       locations.addAll(inputLocations);
+                               }
                        }
 
-                       return inputLocations.isEmpty() ? 
Collections.emptyList() : inputLocations;
+                       return locations.isEmpty() ? Collections.emptyList() : 
locations;
                }
        }
 
@@ -587,8 +598,22 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                }
        }
 
-       public boolean scheduleForExecution(SlotProvider slotProvider, boolean 
queued) {
-               return this.currentExecution.scheduleForExecution(slotProvider, 
queued);
+       /**
+        * Schedules the current execution of this ExecutionVertex.
+        *
+        * @param slotProvider to allocate the slots from
+        * @param queued if the allocation can be queued
+        * @param locationPreferenceConstraint constraint for the location 
preferences
+        * @return
+        */
+       public boolean scheduleForExecution(
+                       SlotProvider slotProvider,
+                       boolean queued,
+                       LocationPreferenceConstraint 
locationPreferenceConstraint) {
+               return this.currentExecution.scheduleForExecution(
+                       slotProvider,
+                       queued,
+                       locationPreferenceConstraint);
        }
 
        @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 1919c61..0b00c0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 
@@ -216,8 +217,9 @@ public class FailoverRegion {
                                for (ExecutionVertex ev : 
connectedExecutionVertexes) {
                                        try {
                                                ev.scheduleForExecution(
-                                                               
executionGraph.getSlotProvider(),
-                                                               
executionGraph.isQueuedSchedulingAllowed());
+                                                       
executionGraph.getSlotProvider(),
+                                                       
executionGraph.isQueuedSchedulingAllowed(),
+                                                       
LocationPreferenceConstraint.ANY); // some inputs not belonging to the failover 
region might have failed concurrently
                                        }
                                        catch (Throwable e) {
                                                
failover(globalModVersionOfFailover);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index fcf7d40..1944b38 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -1002,14 +1002,12 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
                }
 
                @Override
-               public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit 
task, boolean allowQueued) {
-                       Collection<CompletableFuture<TaskManagerLocation>> 
locationPreferenceFutures =
-                               
task.getTaskToExecute().getVertex().getPreferredLocations();
+               public CompletableFuture<SimpleSlot> allocateSlot(
+                               ScheduledUnit task,
+                               boolean allowQueued,
+                               Collection<TaskManagerLocation> 
preferredLocations) {
 
-                       CompletableFuture<Collection<TaskManagerLocation>> 
locationPreferencesFuture = FutureUtils.combineAll(locationPreferenceFutures);
-
-                       return locationPreferencesFuture.thenCompose(
-                               locationPreferences -> 
gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, 
timeout));
+                       return gateway.allocateSlot(task, 
ResourceProfile.UNKNOWN, preferredLocations, timeout);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index 23e6749..ef988b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -40,7 +42,11 @@ public interface SlotProvider {
         *
         * @param task         The task to allocate the slot for
         * @param allowQueued  Whether allow the task be queued if we do not 
have enough resource
+        * @param preferredLocations preferred locations for the slot allocation
         * @return The future of the allocation
         */
-       CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean 
allowQueued);
+       CompletableFuture<SimpleSlot> allocateSlot(
+               ScheduledUnit task,
+               boolean allowQueued,
+               Collection<TaskManagerLocation> preferredLocations);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java
new file mode 100644
index 0000000..e890512
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+/**
+ * Defines the location preference constraint.
+ *
+ * <p> Currently, we support that all input locations have to be taken into 
consideration
+ * and only those which are known at scheduling time. Note that if all input 
locations
+ * are considered, then the scheduling operation can potentially take a while 
until all
+ * inputs have locations assigned.
+ */
+public enum LocationPreferenceConstraint {
+       ALL, // wait for all inputs to have a location assigned
+       ANY // only consider those inputs who already have a location assigned
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 9b1ffbe..1995c12 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -18,6 +18,26 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceDiedException;
+import org.apache.flink.runtime.instance.InstanceListener;
+import org.apache.flink.runtime.instance.SharedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -32,31 +52,9 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.instance.SharedSlot;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceDiedException;
-import org.apache.flink.runtime.instance.InstanceListener;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * The scheduler is responsible for distributing the ready-to-run tasks among 
instances and slots.
  * 
@@ -135,31 +133,29 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
 
 
        @Override
-       public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, 
boolean allowQueued) {
-               Collection<CompletableFuture<TaskManagerLocation>> 
preferredLocationFutures = 
task.getTaskToExecute().getVertex().getPreferredLocationsBasedOnInputs();
+       public CompletableFuture<SimpleSlot> allocateSlot(
+                       ScheduledUnit task,
+                       boolean allowQueued,
+                       Collection<TaskManagerLocation> preferredLocations) {
 
-               CompletableFuture<Collection<TaskManagerLocation>> 
preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
+               try {
+                       final Object ret = scheduleTask(task, allowQueued, 
preferredLocations);
 
-               return preferredLocationsFuture.thenCompose(
-                       preferredLocations -> {
-                               try {
-                                       final Object ret = scheduleTask(task, 
allowQueued, preferredLocations);
-
-                                       if (ret instanceof SimpleSlot) {
-                                               return 
CompletableFuture.completedFuture((SimpleSlot) ret);
-                                       } else if (ret instanceof 
CompletableFuture) {
-                                               @SuppressWarnings("unchecked")
-                                               CompletableFuture<SimpleSlot> 
typed = (CompletableFuture<SimpleSlot>) ret;
-                                               return typed;
-                                       } else {
-                                               // this should never happen, 
simply guard this case with an exception
-                                               throw new RuntimeException();
-                                       }
-                               } catch (NoResourceAvailableException e) {
-                                       throw new CompletionException(e);
-                               }
+                       if (ret instanceof SimpleSlot) {
+                               return 
CompletableFuture.completedFuture((SimpleSlot) ret);
+                       }
+                       else if (ret instanceof CompletableFuture) {
+                               @SuppressWarnings("unchecked")
+                               CompletableFuture<SimpleSlot> typed = 
(CompletableFuture<SimpleSlot>) ret;
+                               return typed;
                        }
-               );
+                       else {
+                               // this should never happen, simply guard this 
case with an exception
+                               throw new RuntimeException();
+                       }
+               } catch (NoResourceAvailableException e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 5c80405..de91d78 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -46,14 +47,19 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;
 
@@ -67,14 +73,18 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import static junit.framework.TestCase.assertTrue;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests for {@link ExecutionGraph} deployment.
@@ -555,6 +565,103 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
                        
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
        }
 
+       /**
+        * Tests that eager scheduling will wait until all input locations have 
been set before
+        * scheduling a task.
+        */
+       @Test
+       public void testEagerSchedulingWaitsOnAllInputPreferredLocations() 
throws Exception {
+               final int parallelism = 2;
+               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
+
+               final Time timeout = Time.hours(1L);
+               final JobVertexID sourceVertexId = new JobVertexID();
+               final JobVertex sourceVertex = new JobVertex("Test source", 
sourceVertexId);
+               sourceVertex.setInvokableClass(NoOpInvokable.class);
+               sourceVertex.setParallelism(parallelism);
+
+               final JobVertexID sinkVertexId = new JobVertexID();
+               final JobVertex sinkVertex = new JobVertex("Test sink", 
sinkVertexId);
+               sinkVertex.setInvokableClass(NoOpInvokable.class);
+               sinkVertex.setParallelism(parallelism);
+
+               sinkVertex.connectNewDataSetAsInput(sourceVertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+               final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> 
slotFutures = new HashMap<>(2);
+
+               for (JobVertexID jobVertexID : Arrays.asList(sourceVertexId, 
sinkVertexId)) {
+                       CompletableFuture<SimpleSlot>[] slotFutureArray = new 
CompletableFuture[parallelism];
+
+                       for (int i = 0; i < parallelism; i++) {
+                               slotFutureArray[i] = new CompletableFuture<>();
+                       }
+
+                       slotFutures.put(jobVertexID, slotFutureArray);
+                       slotProvider.addSlots(jobVertexID, slotFutureArray);
+               }
+
+               final ScheduledExecutorService scheduledExecutorService = new 
ScheduledThreadPoolExecutor(3);
+
+               final ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createExecutionGraph(
+                       new JobID(),
+                       slotProvider,
+                       new NoRestartStrategy(),
+                       scheduledExecutorService,
+                       timeout,
+                       sourceVertex,
+                       sinkVertex);
+
+               executionGraph.setScheduleMode(ScheduleMode.EAGER);
+               executionGraph.scheduleForExecution();
+
+               // all tasks should be in state SCHEDULED
+               for (ExecutionVertex executionVertex : 
executionGraph.getAllExecutionVertices()) {
+                       assertEquals(ExecutionState.SCHEDULED, 
executionVertex.getCurrentExecutionAttempt().getState());
+               }
+
+               // wait until the source vertex slots have been requested
+               assertTrue(slotProvider.getSlotRequestedFuture(sourceVertexId, 
0).get());
+               assertTrue(slotProvider.getSlotRequestedFuture(sourceVertexId, 
1).get());
+
+               // check that the sinks have not requested their slots because 
they need the location
+               // information of the sources
+               assertFalse(slotProvider.getSlotRequestedFuture(sinkVertexId, 
0).isDone());
+               assertFalse(slotProvider.getSlotRequestedFuture(sinkVertexId, 
1).isDone());
+
+               final TaskManagerLocation localTaskManagerLocation = new 
LocalTaskManagerLocation();
+
+               final SimpleSlot sourceSlot1 = 
createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
+
+               final SimpleSlot sourceSlot2 = 
createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
+
+               final SimpleSlot sinkSlot1 = 
createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
+
+               final SimpleSlot sinkSlot2 = 
createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
+
+               slotFutures.get(sourceVertexId)[0].complete(sourceSlot1);
+               slotFutures.get(sourceVertexId)[1].complete(sourceSlot2);
+
+               // wait until the sink vertex slots have been requested after 
we completed the source slots
+               assertTrue(slotProvider.getSlotRequestedFuture(sinkVertexId, 
0).get());
+               assertTrue(slotProvider.getSlotRequestedFuture(sinkVertexId, 
1).get());
+
+               slotFutures.get(sinkVertexId)[0].complete(sinkSlot1);
+               slotFutures.get(sinkVertexId)[1].complete(sinkSlot2);
+
+               for (ExecutionVertex executionVertex : 
executionGraph.getAllExecutionVertices()) {
+                       
ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex.getCurrentExecutionAttempt(),
 ExecutionState.DEPLOYING, 5000L);
+               }
+       }
+
+       private SimpleSlot createSlot(JobID jobId, TaskManagerLocation 
taskManagerLocation, int index) {
+               return new SimpleSlot(
+                       jobId,
+                       mock(SlotOwner.class),
+                       taskManagerLocation,
+                       index,
+                       new SimpleAckingTaskManagerGateway());
+       }
+
        @SuppressWarnings("serial")
        public static class FailingFinalizeJobVertex extends JobVertex {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index f4e8b30..d3cec30 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -53,6 +53,7 @@ import org.mockito.Matchers;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -115,7 +116,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
                        CompletableFuture<SimpleSlot> future = new 
CompletableFuture<>();
                        future.complete(simpleSlot);
-                       when(scheduler.allocateSlot(any(ScheduledUnit.class), 
anyBoolean())).thenReturn(future);
+                       when(scheduler.allocateSlot(any(ScheduledUnit.class), 
anyBoolean(), any(Collection.class))).thenReturn(future);
 
                        when(rootSlot.getSlotNumber()).thenReturn(0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 69a679a..90136a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -306,7 +306,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
 
                for (int i = 0; i < parallelism; i += 2) {
                        sourceFutures[i].complete(sourceSlots[i]);
-                       targetFutures[i + 1].complete(targetSlots[i + 1]);
+                       targetFutures[i].complete(targetSlots[i]);
                }
 
                //
@@ -331,7 +331,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                // all completed futures must have been returns
                for (int i = 0; i < parallelism; i += 2) {
                        assertTrue(sourceSlots[i].isCanceled());
-                       assertTrue(targetSlots[i + 1].isCanceled());
+                       assertTrue(targetSlots[i].isCanceled());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 5feeabc..42a63ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -320,9 +320,21 @@ public class ExecutionGraphTestUtils {
                        ScheduledExecutorService executor,
                        JobVertex... vertices) throws Exception {
 
+                       return createExecutionGraph(jid, slotProvider, 
restartStrategy, executor, Time.seconds(10L), vertices);
+       }
+
+       public static ExecutionGraph createExecutionGraph(
+                       JobID jid,
+                       SlotProvider slotProvider,
+                       RestartStrategy restartStrategy,
+                       ScheduledExecutorService executor,
+                       Time timeout,
+                       JobVertex... vertices) throws Exception {
+
                checkNotNull(jid);
                checkNotNull(restartStrategy);
                checkNotNull(vertices);
+               checkNotNull(timeout);
 
                return ExecutionGraphBuilder.buildGraph(
                        null,
@@ -333,7 +345,7 @@ public class ExecutionGraphTestUtils {
                        slotProvider,
                        ExecutionGraphTestUtils.class.getClassLoader(),
                        new StandaloneCheckpointRecoveryFactory(),
-                       Time.seconds(10),
+                       timeout,
                        restartStrategy,
                        new UnregisteredMetricsGroup(),
                        1,

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
new file mode 100644
index 0000000..fa845cf
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link Execution}.
+ */
+public class ExecutionTest extends TestLogger {
+
+       /**
+        * Tests that slots are released if we cannot assign the allocated 
resource to the
+        * Execution. In this case, a concurrent cancellation precedes the 
assignment.
+        */
+       @Test
+       public void testSlotReleaseOnFailedResourceAssignment() throws 
Exception {
+               final JobVertexID jobVertexId = new JobVertexID();
+               final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
+               jobVertex.setInvokableClass(NoOpInvokable.class);
+
+               final CompletableFuture<SimpleSlot> slotFuture = new 
CompletableFuture<>();
+               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
+               slotProvider.addSlot(jobVertexId, 0, slotFuture);
+
+               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+                       new JobID(),
+                       slotProvider,
+                       new NoRestartStrategy(),
+                       jobVertex);
+
+               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+               final Execution execution = 
executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+
+               final TestingSlotOwner slotOwner = new TestingSlotOwner();
+
+               final SimpleSlot slot = new SimpleSlot(
+                       new JobID(),
+                       slotOwner,
+                       new LocalTaskManagerLocation(),
+                       0,
+                       new SimpleAckingTaskManagerGateway());
+
+               CompletableFuture<Execution> allocationFuture = 
execution.allocateAndAssignSlotForExecution(
+                       slotProvider,
+                       false,
+                       LocationPreferenceConstraint.ALL);
+
+               assertFalse(allocationFuture.isDone());
+
+               assertEquals(ExecutionState.SCHEDULED, execution.getState());
+
+               // cancelling the execution should move it into state CANCELED; 
this happens before
+               // the slot future has been completed
+               execution.cancel();
+
+               assertEquals(ExecutionState.CANCELED, execution.getState());
+
+               // completing now the future should cause the slot to be 
released
+               slotFuture.complete(slot);
+
+               assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
+       }
+
+       /**
+        * Tests that the slot is released in case of a execution cancellation 
when having
+        * a slot assigned and being in state SCHEDULED.
+        */
+       @Test
+       public void testSlotReleaseOnExecutionCancellationInScheduled() throws 
Exception {
+               final JobVertexID jobVertexId = new JobVertexID();
+               final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
+               jobVertex.setInvokableClass(NoOpInvokable.class);
+
+               final TestingSlotOwner slotOwner = new TestingSlotOwner();
+
+               final SimpleSlot slot = new SimpleSlot(
+                       new JobID(),
+                       slotOwner,
+                       new LocalTaskManagerLocation(),
+                       0,
+                       new SimpleAckingTaskManagerGateway());
+
+               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
+               slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
+
+               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+                       new JobID(),
+                       slotProvider,
+                       new NoRestartStrategy(),
+                       jobVertex);
+
+               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+               final Execution execution = 
executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+
+               CompletableFuture<Execution> allocationFuture = 
execution.allocateAndAssignSlotForExecution(
+                       slotProvider,
+                       false,
+                       LocationPreferenceConstraint.ALL);
+
+               assertTrue(allocationFuture.isDone());
+
+               assertEquals(ExecutionState.SCHEDULED, execution.getState());
+
+               assertEquals(slot, execution.getAssignedResource());
+
+               // cancelling the execution should move it into state CANCELED
+               execution.cancel();
+               assertEquals(ExecutionState.CANCELED, execution.getState());
+
+               assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
+       }
+
+       /**
+        * Tests that the slot is released in case of a execution cancellation 
when being in state
+        * RUNNING.
+        */
+       @Test
+       public void testSlotReleaseOnExecutionCancellationInRunning() throws 
Exception {
+               final JobVertexID jobVertexId = new JobVertexID();
+               final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
+               jobVertex.setInvokableClass(NoOpInvokable.class);
+
+               final TestingSlotOwner slotOwner = new TestingSlotOwner();
+
+               final SimpleSlot slot = new SimpleSlot(
+                       new JobID(),
+                       slotOwner,
+                       new LocalTaskManagerLocation(),
+                       0,
+                       new SimpleAckingTaskManagerGateway());
+
+               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
+               slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
+
+               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+                       new JobID(),
+                       slotProvider,
+                       new NoRestartStrategy(),
+                       jobVertex);
+
+               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+               final Execution execution = 
executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+
+               CompletableFuture<Execution> allocationFuture = 
execution.allocateAndAssignSlotForExecution(
+                       slotProvider,
+                       false,
+                       LocationPreferenceConstraint.ALL);
+
+               assertTrue(allocationFuture.isDone());
+
+               assertEquals(ExecutionState.SCHEDULED, execution.getState());
+
+               assertEquals(slot, execution.getAssignedResource());
+
+               execution.deploy();
+
+               execution.switchToRunning();
+
+               // cancelling the execution should move it into state CANCELING
+               execution.cancel();
+               assertEquals(ExecutionState.CANCELING, execution.getState());
+
+               execution.cancelingComplete();
+
+               assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
+       }
+
+       /**
+        * Tests that all preferred locations are calculated.
+        */
+       @Test
+       public void testAllPreferredLocationCalculation() throws 
ExecutionException, InterruptedException {
+               final TaskManagerLocation taskManagerLocation1 = new 
LocalTaskManagerLocation();
+               final TaskManagerLocation taskManagerLocation2 = new 
LocalTaskManagerLocation();
+               final TaskManagerLocation taskManagerLocation3 = new 
LocalTaskManagerLocation();
+
+               final CompletableFuture<TaskManagerLocation> locationFuture1 = 
CompletableFuture.completedFuture(taskManagerLocation1);
+               final CompletableFuture<TaskManagerLocation> locationFuture2 = 
new CompletableFuture<>();
+               final CompletableFuture<TaskManagerLocation> locationFuture3 = 
new CompletableFuture<>();
+
+               final Execution execution = 
SchedulerTestUtils.getTestVertex(Arrays.asList(locationFuture1, 
locationFuture2, locationFuture3));
+
+               CompletableFuture<Collection<TaskManagerLocation>> 
preferredLocationsFuture = 
execution.calculatePreferredLocations(LocationPreferenceConstraint.ALL);
+
+               assertFalse(preferredLocationsFuture.isDone());
+
+               locationFuture3.complete(taskManagerLocation3);
+
+               assertFalse(preferredLocationsFuture.isDone());
+
+               locationFuture2.complete(taskManagerLocation2);
+
+               assertTrue(preferredLocationsFuture.isDone());
+
+               final Collection<TaskManagerLocation> preferredLocations = 
preferredLocationsFuture.get();
+
+               assertThat(preferredLocations, 
containsInAnyOrder(taskManagerLocation1, taskManagerLocation2, 
taskManagerLocation3));
+       }
+
+       /**
+        * Tests that any preferred locations are calculated.
+        */
+       @Test
+       public void testAnyPreferredLocationCalculation() throws 
ExecutionException, InterruptedException {
+               final TaskManagerLocation taskManagerLocation1 = new 
LocalTaskManagerLocation();
+               final TaskManagerLocation taskManagerLocation3 = new 
LocalTaskManagerLocation();
+
+               final CompletableFuture<TaskManagerLocation> locationFuture1 = 
CompletableFuture.completedFuture(taskManagerLocation1);
+               final CompletableFuture<TaskManagerLocation> locationFuture2 = 
new CompletableFuture<>();
+               final CompletableFuture<TaskManagerLocation> locationFuture3 = 
CompletableFuture.completedFuture(taskManagerLocation3);
+
+               final Execution execution = 
SchedulerTestUtils.getTestVertex(Arrays.asList(locationFuture1, 
locationFuture2, locationFuture3));
+
+               CompletableFuture<Collection<TaskManagerLocation>> 
preferredLocationsFuture = 
execution.calculatePreferredLocations(LocationPreferenceConstraint.ANY);
+
+               assertTrue(preferredLocationsFuture.isDone());
+
+               final Collection<TaskManagerLocation> preferredLocations = 
preferredLocationsFuture.get();
+
+               assertThat(preferredLocations, 
containsInAnyOrder(taskManagerLocation1, taskManagerLocation3));
+       }
+
+       /**
+        * Slot owner which records the first returned slot.
+        */
+       public static final class TestingSlotOwner implements SlotOwner {
+
+               final CompletableFuture<Slot> returnedSlot = new 
CompletableFuture<>();
+
+               public CompletableFuture<Slot> getReturnedSlotFuture() {
+                       return returnedSlot;
+               }
+
+               @Override
+               public boolean returnAllocatedSlot(Slot slot) {
+                       return returnedSlot.complete(slot);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 9908dae..cb31d15 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -447,7 +448,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                        // it can occur as the result of races
                        {
                                Scheduler scheduler = mock(Scheduler.class);
-                               vertex.scheduleForExecution(scheduler, false);
+                               vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
 
                                assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
                        }
@@ -486,7 +487,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                                setVertexState(vertex, 
ExecutionState.CANCELING);
 
                                Scheduler scheduler = mock(Scheduler.class);
-                               vertex.scheduleForExecution(scheduler, false);
+                               vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
                        }
                        catch (Exception e) {
                                fail("should not throw an exception");

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 26cb3f1..cf08687 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -35,6 +35,8 @@ import 
org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.util.Collection;
@@ -52,7 +54,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class ExecutionVertexDeploymentTest {
+public class ExecutionVertexDeploymentTest extends TestLogger {
 
        @Test
        public void testDeployCall() {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 4eac4aa..27f7f51 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 import org.mockito.Matchers;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
@@ -39,6 +41,7 @@ import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -62,11 +65,11 @@ public class ExecutionVertexSchedulingTest {
                        Scheduler scheduler = mock(Scheduler.class);
                        CompletableFuture<SimpleSlot> future = new 
CompletableFuture<>();
                        future.complete(slot);
-                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), 
anyBoolean())).thenReturn(future);
+                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), 
any(Collection.class))).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        // try to deploy to the slot
-                       vertex.scheduleForExecution(scheduler, false);
+                       vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
 
                        // will have failed
                        assertEquals(ExecutionState.FAILED, 
vertex.getExecutionState());
@@ -94,11 +97,11 @@ public class ExecutionVertexSchedulingTest {
                        final CompletableFuture<SimpleSlot> future = new 
CompletableFuture<>();
 
                        Scheduler scheduler = mock(Scheduler.class);
-                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), 
anyBoolean())).thenReturn(future);
+                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), 
any(Collection.class))).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        // try to deploy to the slot
-                       vertex.scheduleForExecution(scheduler, true);
+                       vertex.scheduleForExecution(scheduler, true, 
LocationPreferenceConstraint.ALL);
 
                        // future has not yet a slot
                        assertEquals(ExecutionState.SCHEDULED, 
vertex.getExecutionState());
@@ -128,12 +131,12 @@ public class ExecutionVertexSchedulingTest {
                        Scheduler scheduler = mock(Scheduler.class);
                        CompletableFuture<SimpleSlot> future = new 
CompletableFuture<>();
                        future.complete(slot);
-                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), 
anyBoolean())).thenReturn(future);
+                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), 
any(Collection.class))).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 
                        // try to deploy to the slot
-                       vertex.scheduleForExecution(scheduler, false);
+                       vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
                        assertEquals(ExecutionState.DEPLOYING, 
vertex.getExecutionState());
                }
                catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 4e89d43..f1e0f7c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -156,7 +157,7 @@ public class FailoverRegionTest extends TestLogger {
 
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
 
-               ev21.scheduleForExecution(slotProvider, true);
+               ev21.scheduleForExecution(slotProvider, true, 
LocationPreferenceConstraint.ALL);
                ev21.getCurrentExecutionAttempt().fail(new Exception("New 
fail"));
                assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev11).getState());
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev22).getState());
@@ -169,7 +170,7 @@ public class FailoverRegionTest extends TestLogger {
 
                ev11.getCurrentExecutionAttempt().markFinished();
                ev21.getCurrentExecutionAttempt().markFinished();
-               ev22.scheduleForExecution(slotProvider, true);
+               ev22.scheduleForExecution(slotProvider, true, 
LocationPreferenceConstraint.ALL);
                ev22.getCurrentExecutionAttempt().markFinished();
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev22).getState());
@@ -209,11 +210,11 @@ public class FailoverRegionTest extends TestLogger {
        }
 
        /**
-        * Tests that two failover regions failover at the same time, they will 
not influence each orther
+        * Tests that two failover regions failover at the same time, they will 
not influence each other
         * @throws Exception
         */
        @Test
-       public void testMutilRegionFailoverAtSameTime() throws Exception {
+       public void testMultiRegionFailoverAtSameTime() throws Exception {
                Instance instance = ExecutionGraphTestUtils.getInstance(
                                new ActorTaskManagerGateway(
                                                new 
SimpleActorGateway(TestingUtils.directExecutionContext())),

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
index fef6aaa..5d7fa1f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -22,7 +22,9 @@ import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -38,6 +40,8 @@ class ProgrammedSlotProvider implements SlotProvider {
 
        private final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> 
slotFutures = new HashMap<>();
 
+       private final Map<JobVertexID, CompletableFuture<Boolean>[]> 
slotFutureRequested = new HashMap<>();
+
        private final int parallelism;
 
        public ProgrammedSlotProvider(int parallelism) {
@@ -51,14 +55,20 @@ class ProgrammedSlotProvider implements SlotProvider {
                checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
 
                CompletableFuture<SimpleSlot>[] futures = 
slotFutures.get(vertex);
+               CompletableFuture<Boolean>[] requestedFutures = 
slotFutureRequested.get(vertex);
+
                if (futures == null) {
                        @SuppressWarnings("unchecked")
                        CompletableFuture<SimpleSlot>[] newArray = 
(CompletableFuture<SimpleSlot>[]) new CompletableFuture<?>[parallelism];
                        futures = newArray;
                        slotFutures.put(vertex, futures);
+
+                       requestedFutures = new CompletableFuture[parallelism];
+                       slotFutureRequested.put(vertex, requestedFutures);
                }
 
                futures[subtaskIndex] = future;
+               requestedFutures[subtaskIndex] = new CompletableFuture<>();
        }
 
        public void addSlots(JobVertexID vertex, 
CompletableFuture<SimpleSlot>[] futures) {
@@ -67,10 +77,25 @@ class ProgrammedSlotProvider implements SlotProvider {
                checkArgument(futures.length == parallelism);
 
                slotFutures.put(vertex, futures);
+
+               CompletableFuture<Boolean>[] requestedFutures = new 
CompletableFuture[futures.length];
+
+               for (int i = 0; i < futures.length; i++) {
+                       requestedFutures[i] = new CompletableFuture<>();
+               }
+
+               slotFutureRequested.put(vertex, requestedFutures);
+       }
+
+       public CompletableFuture<Boolean> getSlotRequestedFuture(JobVertexID 
jobVertexId, int subtaskIndex) {
+               return slotFutureRequested.get(jobVertexId)[subtaskIndex];
        }
 
        @Override
-       public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, 
boolean allowQueued) {
+       public CompletableFuture<SimpleSlot> allocateSlot(
+                       ScheduledUnit task,
+                       boolean allowQueued,
+                       Collection<TaskManagerLocation> preferredLocations) {
                JobVertexID vertexId = 
task.getTaskToExecute().getVertex().getJobvertexId();
                int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
 
@@ -78,6 +103,8 @@ class ProgrammedSlotProvider implements SlotProvider {
                if (forTask != null) {
                        CompletableFuture<SimpleSlot> future = forTask[subtask];
                        if (future != null) {
+                               
slotFutureRequested.get(vertexId)[subtask].complete(true);
+
                                return future;
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index be5282a..a2323bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.net.InetAddress;
 import java.util.ArrayDeque;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -70,7 +71,10 @@ public class SimpleSlotProvider implements SlotProvider, 
SlotOwner {
        }
 
        @Override
-       public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, 
boolean allowQueued) {
+       public CompletableFuture<SimpleSlot> allocateSlot(
+                       ScheduledUnit task,
+                       boolean allowQueued,
+                       Collection<TaskManagerLocation> preferredLocations) {
                final AllocatedSlot slot;
 
                synchronized (slots) {

Reply via email to