[FLINK-8781][scheduler] Try to reschedule failed tasks to previous allocation

This closes #5403.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d63bc75f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d63bc75f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d63bc75f

Branch: refs/heads/master
Commit: d63bc75ffa3ad1b0d4de82cd218b9c4d268b41ab
Parents: 647c552
Author: Stefan Richter <[email protected]>
Authored: Thu Feb 1 16:02:28 2018 +0100
Committer: Stefan Richter <[email protected]>
Committed: Mon Feb 26 15:32:01 2018 +0100

----------------------------------------------------------------------
 .../clusterframework/types/SlotProfile.java     | 283 +++++++++++++++++++
 .../flink/runtime/executiongraph/Execution.java |  29 +-
 .../runtime/executiongraph/ExecutionVertex.java |  27 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |  14 +-
 .../jobmanager/slots/SlotAndLocality.java       |  14 +-
 .../runtime/jobmaster/slotpool/SlotPool.java    | 134 +++------
 .../jobmaster/slotpool/SlotPoolGateway.java     |   7 +-
 .../jobmaster/slotpool/SlotProvider.java        |  11 +-
 .../jobmaster/slotpool/SlotSharingManager.java  | 102 +------
 .../clusterframework/types/SlotProfileTest.java | 133 +++++++++
 .../ExecutionGraphMetricsTest.java              |   4 +-
 .../ExecutionVertexSchedulingTest.java          |   8 +-
 .../executiongraph/ProgrammedSlotProvider.java  |   5 +-
 .../utils/SimpleSlotProvider.java               |  16 +-
 .../ScheduleWithCoLocationHintTest.java         | 128 +++++----
 .../scheduler/SchedulerIsolatedTasksTest.java   |  50 ++--
 .../scheduler/SchedulerSlotSharingTest.java     | 228 +++++++--------
 .../jobmanager/scheduler/SchedulerTest.java     |   4 +-
 .../jobmanager/scheduler/SchedulerTestBase.java |  19 +-
 .../jobmaster/slotpool/AvailableSlotsTest.java  |   5 +-
 .../slotpool/SlotPoolCoLocationTest.java        |  10 +-
 .../jobmaster/slotpool/SlotPoolRpcTest.java     |  16 +-
 .../slotpool/SlotPoolSlotSharingTest.java       |  22 +-
 .../jobmaster/slotpool/SlotPoolTest.java        |  39 ++-
 .../slotpool/SlotSharingManagerTest.java        |  12 +-
 25 files changed, 833 insertions(+), 487 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
new file mode 100644
index 0000000..5b1fa08
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * A slot profile describes the profile of a slot into which a task wants to 
be scheduled. The profile contains
+ * attributes such as resource or locality constraints, some of which may be 
hard or soft. A matcher can be generated
+ * to filter out candidate slots by matching their {@link SlotContext} against 
the slot profile and, potentially,
+ * further requirements.
+ */
+public class SlotProfile {
+
+       /** Singleton object for a slot profile without any requirements. */
+       private static final SlotProfile NO_REQUIREMENTS = 
noLocality(ResourceProfile.UNKNOWN);
+
+       /** This specifies the desired resource profile for the slot. */
+       @Nonnull
+       private final ResourceProfile resourceProfile;
+
+       /** This specifies the preferred locations for the slot. */
+       @Nonnull
+       private final Collection<TaskManagerLocation> preferredLocations;
+
+       /** This contains desired allocation ids of the slot. */
+       @Nonnull
+       private final Collection<AllocationID> priorAllocations;
+
+       public SlotProfile(
+               @Nonnull ResourceProfile resourceProfile,
+               @Nonnull Collection<TaskManagerLocation> preferredLocations,
+               @Nonnull Collection<AllocationID> priorAllocations) {
+
+               this.resourceProfile = resourceProfile;
+               this.preferredLocations = preferredLocations;
+               this.priorAllocations = priorAllocations;
+       }
+
+       /**
+        * Returns the desired resource profile for the slot.
+        */
+       @Nonnull
+       public ResourceProfile getResourceProfile() {
+               return resourceProfile;
+       }
+
+       /**
+        * Returns the preferred locations for the slot.
+        */
+       @Nonnull
+       public Collection<TaskManagerLocation> getPreferredLocations() {
+               return preferredLocations;
+       }
+
+       /**
+        * Returns the desired allocation ids for the slot.
+        */
+       @Nonnull
+       public Collection<AllocationID> getPriorAllocations() {
+               return priorAllocations;
+       }
+
+       /**
+        * Returns the matcher for this profile that helps to find slots that 
fit the profile.
+        */
+       public ProfileToSlotContextMatcher matcher() {
+               if (priorAllocations.isEmpty()) {
+                       return new 
LocalityAwareRequirementsToSlotMatcher(preferredLocations);
+               } else {
+                       return new 
PreviousAllocationProfileToSlotContextMatcher(priorAllocations);
+               }
+       }
+
+       /**
+        * Classes that implement this interface provide a method to match 
objects to somehow represent slot candidates
+        * against the {@link SlotProfile} that produced the matcher object. A 
matching candidate is transformed into a
+        * desired result. If the matcher does not find a matching candidate, 
it returns null.
+        */
+       public interface ProfileToSlotContextMatcher {
+
+               /**
+                * This method takes the candidate slots, extracts slot 
contexts from them, filters them by the profile
+                * requirements and potentially by additional requirements, and 
produces a result from a match.
+                *
+                * @param candidates                   stream of candidates to 
match against.
+                * @param contextExtractor             function to extract the 
{@link SlotContext} from the candidates.
+                * @param additionalRequirementsFilter predicate to specify 
additional requirements for each candidate.
+                * @param resultProducer               function to produce a 
result from a matching candidate input.
+                * @param <IN>                         type of the objects 
against we match the profile.
+                * @param <OUT>                        type of the produced 
output from a matching object.
+                * @return the result produced by resultProducer if a matching 
candidate was found or null otherwise.
+                */
+               @Nullable
+               <IN, OUT> OUT findMatchWithLocality(
+                       @Nonnull Stream<IN> candidates,
+                       @Nonnull Function<IN, SlotContext> contextExtractor,
+                       @Nonnull Predicate<IN> additionalRequirementsFilter,
+                       @Nonnull BiFunction<IN, Locality, OUT> resultProducer);
+       }
+
+       /**
+        * This matcher implementation is the presence of prior allocations. 
Prior allocations are supposed to overrule
+        * other locality requirements, such as preferred locations. Prior 
allocations also require strict matching and
+        * this matcher returns null if it cannot find a candidate for the same 
prior allocation. The background is that
+        * this will force the scheduler tor request a new slot that is 
guaranteed to be not the prior location of any
+        * other subtask, so that subtasks do not steal another subtasks prior 
allocation in case that the own prior
+        * allocation is no longer available (e.g. machine failure). This is 
important to enable local recovery for all
+        * tasks that can still return to their prior allocation.
+        */
+       @VisibleForTesting
+       public static class PreviousAllocationProfileToSlotContextMatcher 
implements ProfileToSlotContextMatcher {
+
+               /** Set of prior allocations. */
+               private final HashSet<AllocationID> priorAllocations;
+
+               @VisibleForTesting
+               PreviousAllocationProfileToSlotContextMatcher(@Nonnull 
Collection<AllocationID> priorAllocations) {
+                       this.priorAllocations = new HashSet<>(priorAllocations);
+                       Preconditions.checkState(
+                               this.priorAllocations.size() > 0,
+                               "This matcher should only be used if there are 
prior allocations!");
+               }
+
+               public <I, O> O findMatchWithLocality(
+                       @Nonnull Stream<I> candidates,
+                       @Nonnull Function<I, SlotContext> contextExtractor,
+                       @Nonnull Predicate<I> additionalRequirementsFilter,
+                       @Nonnull BiFunction<I, Locality, O> resultProducer) {
+
+                       Predicate<I> filterByAllocation =
+                               (candidate) -> 
priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId());
+
+                       return candidates
+                               
.filter(filterByAllocation.and(additionalRequirementsFilter))
+                               .findFirst()
+                               .map((result) -> resultProducer.apply(result, 
Locality.LOCAL)) // TODO introduce special locality?
+                               .orElse(null);
+               }
+       }
+
+       /**
+        * This matcher is used whenever no prior allocation was specified in 
the {@link SlotProfile}. This implementation
+        * tries to achieve best possible locality if a preferred location is 
specified in the profile.
+        */
+       @VisibleForTesting
+       public static class LocalityAwareRequirementsToSlotMatcher implements 
ProfileToSlotContextMatcher {
+
+               private final Collection<TaskManagerLocation> 
locationPreferences;
+
+               @VisibleForTesting
+               public LocalityAwareRequirementsToSlotMatcher(@Nonnull 
Collection<TaskManagerLocation> locationPreferences) {
+                       this.locationPreferences = new 
ArrayList<>(locationPreferences);
+               }
+
+               @Override
+               public <IN, OUT> OUT findMatchWithLocality(
+                       @Nonnull Stream<IN> candidates,
+                       @Nonnull Function<IN, SlotContext> contextExtractor,
+                       @Nonnull Predicate<IN> additionalRequirementsFilter,
+                       @Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
+
+                       // if we have no location preferences, we can only 
filter by the additional requirements.
+                       if (locationPreferences.isEmpty()) {
+                               return candidates
+                                       .filter(additionalRequirementsFilter)
+                                       .findFirst()
+                                       .map((result) -> 
resultProducer.apply(result, Locality.UNCONSTRAINED))
+                                       .orElse(null);
+                       }
+
+                       // we build up two indexes, one for resource id and one 
for host names of the preferred locations.
+                       HashSet<ResourceID> preferredResourceIDs = new 
HashSet<>(locationPreferences.size());
+                       HashSet<String> preferredFQHostNames = new 
HashSet<>(locationPreferences.size());
+
+                       for (TaskManagerLocation locationPreference : 
locationPreferences) {
+                               
preferredResourceIDs.add(locationPreference.getResourceID());
+                               
preferredFQHostNames.add(locationPreference.getFQDNHostname());
+                       }
+
+                       Iterator<IN> iterator = candidates.iterator();
+
+                       IN matchByHostName = null;
+                       IN matchByAdditionalRequirements = null;
+
+                       while (iterator.hasNext()) {
+
+                               IN candidate = iterator.next();
+                               SlotContext slotContext = 
contextExtractor.apply(candidate);
+
+                               // this if checks if the candidate has is a 
local slot
+                               if 
(preferredResourceIDs.contains(slotContext.getTaskManagerLocation().getResourceID()))
 {
+                                       if 
(additionalRequirementsFilter.test(candidate)) {
+                                               // we can stop, because we 
found a match with best possible locality.
+                                               return 
resultProducer.apply(candidate, Locality.LOCAL);
+                                       } else {
+                                               // next candidate because this 
failed on the additional requirements.
+                                               continue;
+                                       }
+                               }
+
+                               // this if checks if the candidate is at least 
host-local, if we did not find another host-local
+                               // candidate before.
+                               if (matchByHostName == null) {
+                                       if 
(preferredFQHostNames.contains(slotContext.getTaskManagerLocation().getFQDNHostname()))
 {
+                                               if 
(additionalRequirementsFilter.test(candidate)) {
+                                                       // We remember the 
candidate, but still continue because there might still be a candidate
+                                                       // that is local to the 
desired task manager.
+                                                       matchByHostName = 
candidate;
+                                               } else {
+                                                       // next candidate 
because this failed on the additional requirements.
+                                                       continue;
+                                               }
+                                       }
+
+                                       // this if checks if the candidate at 
least fulfils the resource requirements, and is only required
+                                       // if we did not yet find a valid 
candidate with better locality.
+                                       if (matchByAdditionalRequirements == 
null
+                                               && 
additionalRequirementsFilter.test(candidate)) {
+                                               // Again, we remember but 
continue in hope for a candidate with better locality.
+                                               matchByAdditionalRequirements = 
candidate;
+                                       }
+                               }
+                       }
+
+                       // at the end of the iteration, we return the candidate 
with best possible locality or null.
+                       if (matchByHostName != null) {
+                               return resultProducer.apply(matchByHostName, 
Locality.HOST_LOCAL);
+                       } else if (matchByAdditionalRequirements != null) {
+                               return 
resultProducer.apply(matchByAdditionalRequirements, Locality.NON_LOCAL);
+                       } else {
+                               return null;
+                       }
+               }
+       }
+
+       /**
+        * Returns a slot profile that has no requirements.
+        */
+       public static SlotProfile noRequirements() {
+               return NO_REQUIREMENTS;
+       }
+
+       /**
+        * Returns a slot profile for the given resource profile, without any 
locality requirements.
+        */
+       public static SlotProfile noLocality(ResourceProfile resourceProfile) {
+               return new SlotProfile(resourceProfile, 
Collections.emptyList(), Collections.emptyList());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 946f6e4..2fb831a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -26,7 +26,10 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -155,6 +158,10 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        @Nullable
        private volatile JobManagerTaskRestore taskRestore;
 
+       /** This field holds the allocation id once it was assigned 
successfully. */
+       @Nullable
+       private volatile AllocationID assignedAllocationID;
+
        // ------------------------ Accumulators & Metrics 
------------------------
 
        /** Lock for updating the accumulators atomically.
@@ -234,6 +241,11 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                return state;
        }
 
+       @Nullable
+       public AllocationID getAssignedAllocationID() {
+               return assignedAllocationID;
+       }
+
        /**
         * Gets the global modification version of the execution graph when 
this execution was created.
         * 
@@ -271,7 +283,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                if (state == SCHEDULED || state == CREATED) {
                                        
checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture 
should not be set if we haven't assigned a resource yet.");
                                        
taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
-
+                                       assignedAllocationID = 
logicalSlot.getAllocationId();
                                        return true;
                                } else {
                                        // free assigned resource and return 
false
@@ -458,8 +470,16 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                        new ScheduledUnit(this, 
slotSharingGroupId) :
                                        new ScheduledUnit(this, 
slotSharingGroupId, locationConstraint);
 
+                       // try to extract previous allocation ids, if 
applicable, so that we can reschedule to the same slot
+                       ExecutionVertex executionVertex = getVertex();
+                       AllocationID lastAllocation = 
executionVertex.getLatestPriorAllocation();
+
+                       Collection<AllocationID> previousAllocationIDs =
+                               lastAllocation != null ? 
Collections.singletonList(lastAllocation) : Collections.emptyList();
+
                        // calculate the preferred locations
-                       final 
CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = 
calculatePreferredLocations(locationPreferenceConstraint);
+                       final 
CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
+                               
calculatePreferredLocations(locationPreferenceConstraint);
 
                        final SlotRequestId slotRequestId = new SlotRequestId();
 
@@ -470,7 +490,10 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                                        slotRequestId,
                                                        toSchedule,
                                                        queued,
-                                                       preferredLocations,
+                                                       new SlotProfile(
+                                                               
ResourceProfile.UNKNOWN,
+                                                               
preferredLocations,
+                                                               
previousAllocationIDs),
                                                        allocationTimeout));
 
                        // register call back to cancel slot request in case 
that the execution gets canceled

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index f13e42c..8b57a7a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -295,17 +296,11 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                }
        }
 
-       /**
-        * Gets the location where the latest completed/canceled/failed 
execution of the vertex's
-        * task happened.
-        * 
-        * @return The latest prior execution location, or null, if there is 
none, yet.
-        */
-       public TaskManagerLocation getLatestPriorLocation() {
+       public Execution getLatestPriorExecution() {
                synchronized (priorExecutions) {
                        final int size = priorExecutions.size();
                        if (size > 0) {
-                               return priorExecutions.get(size - 
1).getAssignedResourceLocation();
+                               return priorExecutions.get(size - 1);
                        }
                        else {
                                return null;
@@ -313,6 +308,22 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                }
        }
 
+       /**
+        * Gets the location where the latest completed/canceled/failed 
execution of the vertex's
+        * task happened.
+        * 
+        * @return The latest prior execution location, or null, if there is 
none, yet.
+        */
+       public TaskManagerLocation getLatestPriorLocation() {
+               Execution latestPriorExecution = getLatestPriorExecution();
+               return latestPriorExecution != null ? 
latestPriorExecution.getAssignedResourceLocation() : null;
+       }
+
+       public AllocationID getLatestPriorAllocation() {
+               Execution latestPriorExecution = getLatestPriorExecution();
+               return latestPriorExecution != null ? 
latestPriorExecution.getAssignedAllocationID() : null;
+       }
+
        EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
                synchronized (priorExecutions) {
                        return new EvictingBoundedList<>(priorExecutions);

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index fc79d40..0116fdb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -21,19 +21,20 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SharedSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
@@ -49,7 +50,6 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -148,11 +148,11 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                        SlotRequestId slotRequestId,
                        ScheduledUnit task,
                        boolean allowQueued,
-                       Collection<TaskManagerLocation> preferredLocations,
+                       SlotProfile slotProfile,
                        Time allocationTimeout) {
 
                try {
-                       final Object ret = scheduleTask(task, allowQueued, 
preferredLocations);
+                       final Object ret = scheduleTask(task, allowQueued, 
slotProfile.getPreferredLocations());
 
                        if (ret instanceof SimpleSlot) {
                                return 
CompletableFuture.completedFuture((SimpleSlot) ret);

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
index 85871c8..fed26c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
@@ -18,31 +18,35 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
-import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import javax.annotation.Nonnull;
 
 /**
  * A combination of a {@link AllocatedSlot} and a {@link Locality}.
  */
 public class SlotAndLocality {
 
+       @Nonnull
        private final AllocatedSlot slot;
 
+       @Nonnull
        private final Locality locality;
 
-       public SlotAndLocality(AllocatedSlot slot, Locality locality) {
-               this.slot = checkNotNull(slot);
-               this.locality = checkNotNull(locality);
+       public SlotAndLocality(@Nonnull AllocatedSlot slot, @Nonnull Locality 
locality) {
+               this.slot = slot;
+               this.locality = locality;
        }
 
        // 
------------------------------------------------------------------------
 
+       @Nonnull
        public AllocatedSlot getSlot() {
                return slot;
        }
 
+       @Nonnull
        public Locality getLocality() {
                return locality;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index a49e6ed..8a2dd45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
@@ -303,16 +304,14 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        public CompletableFuture<LogicalSlot> allocateSlot(
                        SlotRequestId slotRequestId,
                        ScheduledUnit scheduledUnit,
-                       ResourceProfile resourceProfile,
-                       Collection<TaskManagerLocation> locationPreferences,
+                       SlotProfile slotProfile,
                        boolean allowQueuedScheduling,
                        Time timeout) {
 
                return internalAllocateSlot(
                        slotRequestId,
                        scheduledUnit,
-                       resourceProfile,
-                       locationPreferences,
+                       slotProfile,
                        allowQueuedScheduling,
                        timeout);
        }
@@ -320,8 +319,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        private CompletableFuture<LogicalSlot> internalAllocateSlot(
                        SlotRequestId slotRequestId,
                        ScheduledUnit task,
-                       ResourceProfile resourceProfile,
-                       Collection<TaskManagerLocation> locationPreferences,
+                       SlotProfile slotProfile,
                        boolean allowQueuedScheduling,
                        Time allocationTimeout) {
 
@@ -343,16 +341,14 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                        multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
                                                task.getCoLocationConstraint(),
                                                multiTaskSlotManager,
-                                               resourceProfile,
-                                               locationPreferences,
+                                               slotProfile,
                                                allowQueuedScheduling,
                                                allocationTimeout);
                                } else {
                                        multiTaskSlotLocality = 
allocateMultiTaskSlot(
                                                task.getJobVertexId(),
                                                multiTaskSlotManager,
-                                               resourceProfile,
-                                               locationPreferences,
+                                               slotProfile,
                                                allowQueuedScheduling,
                                                allocationTimeout);
                                }
@@ -373,8 +369,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        // request an allocated slot to assign a single logical 
slot to
                        CompletableFuture<SlotAndLocality> 
slotAndLocalityFuture = requestAllocatedSlot(
                                slotRequestId,
-                               resourceProfile,
-                               locationPreferences,
+                               slotProfile,
                                allowQueuedScheduling,
                                allocationTimeout);
 
@@ -408,8 +403,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
         *
         * @param coLocationConstraint for which to allocate a {@link 
SlotSharingManager.MultiTaskSlot}
         * @param multiTaskSlotManager responsible for the slot sharing group 
for which to allocate the slot
-        * @param resourceProfile specifying the requirements for the requested 
slot
-        * @param locationPreferences containing preferred TaskExecutors on 
which to allocate the slot
+        * @param slotProfile specifying the requirements for the requested slot
         * @param allowQueuedScheduling true if queued scheduling (the returned 
task slot must not be completed yet) is allowed, otherwise false
         * @param allocationTimeout timeout before the slot allocation times out
         * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which 
contains the allocated{@link SlotSharingManager.MultiTaskSlot}
@@ -419,8 +413,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        private SlotSharingManager.MultiTaskSlotLocality 
allocateCoLocatedMultiTaskSlot(
                        CoLocationConstraint coLocationConstraint,
                        SlotSharingManager multiTaskSlotManager,
-                       ResourceProfile resourceProfile,
-                       Collection<TaskManagerLocation> locationPreferences,
+                       SlotProfile slotProfile,
                        boolean allowQueuedScheduling,
                        Time allocationTimeout) throws 
NoResourceAvailableException {
                final SlotRequestId coLocationSlotRequestId = 
coLocationConstraint.getSlotRequestId();
@@ -438,19 +431,18 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        }
                }
 
-               final Collection<TaskManagerLocation> actualLocationPreferences;
-
                if (coLocationConstraint.isAssigned()) {
-                       actualLocationPreferences = 
Collections.singleton(coLocationConstraint.getLocation());
-               } else {
-                       actualLocationPreferences = locationPreferences;
+                       // refine the preferred locations of the slot profile
+                       slotProfile = new SlotProfile(
+                               slotProfile.getResourceProfile(),
+                               
Collections.singleton(coLocationConstraint.getLocation()),
+                               slotProfile.getPriorAllocations());
                }
 
                // get a new multi task slot
                final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality = allocateMultiTaskSlot(
                        coLocationConstraint.getGroupId(), multiTaskSlotManager,
-                       resourceProfile,
-                       actualLocationPreferences,
+                       slotProfile,
                        allowQueuedScheduling,
                        allocationTimeout);
 
@@ -505,8 +497,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
         *
         * @param groupId for which to allocate a new {@link 
SlotSharingManager.MultiTaskSlot}
         * @param slotSharingManager responsible for the slot sharing group for 
which to allocate the slot
-        * @param resourceProfile specifying the requirements for the requested 
slot
-        * @param locationPreferences containing preferred TaskExecutors on 
which to allocate the slot
+        * @param slotProfile slot profile that specifies the requirements for 
the slot
         * @param allowQueuedScheduling true if queued scheduling (the returned 
task slot must not be completed yet) is allowed, otherwise false
         * @param allocationTimeout timeout before the slot allocation times out
         * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which 
contains the allocated {@link SlotSharingManager.MultiTaskSlot}
@@ -516,15 +507,14 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
                        AbstractID groupId,
                        SlotSharingManager slotSharingManager,
-                       ResourceProfile resourceProfile,
-                       Collection<TaskManagerLocation> locationPreferences,
+                       SlotProfile slotProfile,
                        boolean allowQueuedScheduling,
                        Time allocationTimeout) throws 
NoResourceAvailableException {
 
                // check first whether we have a resolved root slot which we 
can use
                SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality 
= slotSharingManager.getResolvedRootSlot(
                        groupId,
-                       locationPreferences);
+                       slotProfile.matcher());
 
                if (multiTaskSlotLocality != null && 
multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
                        return multiTaskSlotLocality;
@@ -534,7 +524,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                final SlotRequestId multiTaskSlotRequestId = new 
SlotRequestId();
 
                // check whether we have an allocated slot available which we 
can use to create a new multi task slot in
-               final SlotAndLocality polledSlotAndLocality = 
pollAndAllocateSlot(allocatedSlotRequestId, resourceProfile, 
locationPreferences);
+               final SlotAndLocality polledSlotAndLocality = 
pollAndAllocateSlot(allocatedSlotRequestId, slotProfile);
 
                if (polledSlotAndLocality != null && 
(polledSlotAndLocality.getLocality() == Locality.LOCAL || multiTaskSlotLocality 
== null)) {
 
@@ -571,7 +561,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                // it seems as if we have to request a new slot 
from the resource manager, this is always the last resort!!!
                                final CompletableFuture<AllocatedSlot> 
futureSlot = requestNewAllocatedSlot(
                                        allocatedSlotRequestId,
-                                       resourceProfile,
+                                       slotProfile.getResourceProfile(),
                                        allocationTimeout);
 
                                multiTaskSlotFuture = 
slotSharingManager.createRootSlot(
@@ -613,24 +603,21 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
         * Allocates an allocated slot first by polling from the available 
slots and then requesting a new
         * slot from the ResourceManager if no fitting slot could be found.
         *
-        * @param slotRequestId identifying the slot allocation request
-        * @param resourceProfile which the allocated slot should fulfill
-        * @param locationPreferences for the allocated slot
+        * @param slotProfile slot profile that specifies the requirements for 
the slot
         * @param allowQueuedScheduling true if the slot allocation can be 
completed in the future
         * @param allocationTimeout timeout before the slot allocation times out
         * @return Future containing the allocated simple slot
         */
        private CompletableFuture<SlotAndLocality> requestAllocatedSlot(
                        SlotRequestId slotRequestId,
-                       ResourceProfile resourceProfile,
-                       Collection<TaskManagerLocation> locationPreferences,
+                       SlotProfile slotProfile,
                        boolean allowQueuedScheduling,
                        Time allocationTimeout) {
 
                final CompletableFuture<SlotAndLocality> 
allocatedSlotLocalityFuture;
 
                // (1) do we have a slot available already?
-               SlotAndLocality slotFromPool = 
pollAndAllocateSlot(slotRequestId, resourceProfile, locationPreferences);
+               SlotAndLocality slotFromPool = 
pollAndAllocateSlot(slotRequestId, slotProfile);
 
                if (slotFromPool != null) {
                        allocatedSlotLocalityFuture = 
CompletableFuture.completedFuture(slotFromPool);
@@ -638,7 +625,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        // we have to request a new allocated slot
                        CompletableFuture<AllocatedSlot> allocatedSlotFuture = 
requestNewAllocatedSlot(
                                slotRequestId,
-                               resourceProfile,
+                               slotProfile.getResourceProfile(),
                                allocationTimeout);
 
                        allocatedSlotLocalityFuture = 
allocatedSlotFuture.thenApply((AllocatedSlot allocatedSlot) -> new 
SlotAndLocality(allocatedSlot, Locality.UNKNOWN));
@@ -832,11 +819,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        }
 
        @Nullable
-       private SlotAndLocality pollAndAllocateSlot(
-               SlotRequestId slotRequestId,
-               ResourceProfile resourceProfile,
-               Collection<TaskManagerLocation> locationPreferences) {
-               SlotAndLocality slotFromPool = 
availableSlots.poll(resourceProfile, locationPreferences);
+       private SlotAndLocality pollAndAllocateSlot(SlotRequestId 
slotRequestId, SlotProfile slotProfile) {
+               SlotAndLocality slotFromPool = availableSlots.poll(slotProfile);
 
                if (slotFromPool != null) {
                        allocatedSlots.add(slotRequestId, 
slotFromPool.getSlot());
@@ -1404,63 +1388,34 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                 * Poll a slot which matches the required resource profile. The 
polling tries to satisfy the
                 * location preferences, by TaskManager and by host.
                 *
-                * @param resourceProfile      The required resource profile.
-                * @param locationPreferences  The location preferences, in 
order to be checked.
+                * @param slotProfile slot profile that specifies the 
requirements for the slot
                 *
                 * @return Slot which matches the resource profile, null if we 
can't find a match
                 */
-               SlotAndLocality poll(ResourceProfile resourceProfile, 
Collection<TaskManagerLocation> locationPreferences) {
+               SlotAndLocality poll(SlotProfile slotProfile) {
                        // fast path if no slots are available
                        if (availableSlots.isEmpty()) {
                                return null;
                        }
 
-                       boolean hadLocationPreference = false;
-
-                       if (locationPreferences != null && 
!locationPreferences.isEmpty()) {
-
-                               // first search by TaskManager
-                               for (TaskManagerLocation location : 
locationPreferences) {
-                                       hadLocationPreference = true;
-
-                                       final Set<AllocatedSlot> onTaskManager 
= availableSlotsByTaskManager.get(location.getResourceID());
-                                       if (onTaskManager != null) {
-                                               for (AllocatedSlot candidate : 
onTaskManager) {
-                                                       if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
-                                                               
remove(candidate.getAllocationId());
-                                                               return new 
SlotAndLocality(candidate, Locality.LOCAL);
-                                                       }
-                                               }
-                                       }
-                               }
-
-                               // now, search by host
-                               for (TaskManagerLocation location : 
locationPreferences) {
-                                       final Set<AllocatedSlot> onHost = 
availableSlotsByHost.get(location.getFQDNHostname());
-                                       if (onHost != null) {
-                                               for (AllocatedSlot candidate : 
onHost) {
-                                                       if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
-                                                               
remove(candidate.getAllocationId());
-                                                               return new 
SlotAndLocality(candidate, Locality.HOST_LOCAL);
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
+                       SlotProfile.ProfileToSlotContextMatcher matcher = 
slotProfile.matcher();
+                       Collection<SlotAndTimestamp> slotAndTimestamps = 
availableSlots.values();
 
-                       // take any slot
-                       for (SlotAndTimestamp candidate : 
availableSlots.values()) {
-                               final AllocatedSlot slot = candidate.slot();
+                       SlotAndLocality matchingSlotAndLocality = 
matcher.findMatchWithLocality(
+                               slotAndTimestamps.stream(),
+                               SlotAndTimestamp::slot,
+                               (SlotAndTimestamp slot) -> 
slot.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile()),
+                               (SlotAndTimestamp slotAndTimestamp, Locality 
locality) -> {
+                                       AllocatedSlot slot = 
slotAndTimestamp.slot();
+                                       return new SlotAndLocality(slot, 
locality);
+                               });
 
-                               if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
-                                       remove(slot.getAllocationId());
-                                       return new SlotAndLocality(
-                                                       slot, 
hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
-                               }
+                       if (matchingSlotAndLocality != null) {
+                               AllocatedSlot slot = 
matchingSlotAndLocality.getSlot();
+                               remove(slot.getAllocationId());
                        }
 
-                       // nothing available that matches
-                       return null;
+                       return matchingSlotAndLocality;
                }
 
                /**
@@ -1574,14 +1529,13 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                SlotRequestId slotRequestId,
                                ScheduledUnit task,
                                boolean allowQueued,
-                               Collection<TaskManagerLocation> 
preferredLocations,
+                               SlotProfile slotProfile,
                                Time timeout) {
 
                        CompletableFuture<LogicalSlot> slotFuture = 
gateway.allocateSlot(
                                slotRequestId,
                                task,
-                               ResourceProfile.UNKNOWN,
-                               preferredLocations,
+                               slotProfile,
                                allowQueued,
                                timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 7d11681..1aad92a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -143,8 +144,7 @@ public interface SlotPoolGateway extends 
AllocatedSlotActions, RpcGateway {
         *
         * @param slotRequestId identifying the requested slot
         * @param scheduledUnit for which to allocate slot
-        * @param resourceProfile which the allocated slot must fulfill
-        * @param locationPreferences which define where the allocated slot 
should be placed, this can also be empty
+        * @param slotProfile profile that specifies the requirements for the 
requested slot
         * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
         * @param timeout for the operation
         * @return Future which is completed with the allocated {@link 
LogicalSlot}
@@ -152,8 +152,7 @@ public interface SlotPoolGateway extends 
AllocatedSlotActions, RpcGateway {
        CompletableFuture<LogicalSlot> allocateSlot(
                        SlotRequestId slotRequestId,
                        ScheduledUnit scheduledUnit,
-                       ResourceProfile resourceProfile,
-                       Collection<TaskManagerLocation> locationPreferences,
+                       SlotProfile slotProfile,
                        boolean allowQueuedScheduling,
                        @RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
index 80b2689..1653138 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
@@ -19,16 +19,15 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nullable;
 
-import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -57,7 +56,7 @@ public interface SlotProvider {
                SlotRequestId slotRequestId,
                ScheduledUnit task,
                boolean allowQueued,
-               Collection<TaskManagerLocation> preferredLocations,
+               SlotProfile slotProfile,
                Time timeout);
 
        /**
@@ -65,20 +64,20 @@ public interface SlotProvider {
         *
         * @param task The task to allocate the slot for
         * @param allowQueued Whether allow the task be queued if we do not 
have enough resource
-        * @param preferredLocations preferred locations for the slot allocation
+        * @param slotProfile profile of the requested slot
         * @param timeout after which the allocation fails with a timeout 
exception
         * @return The future of the allocation
         */
        default CompletableFuture<LogicalSlot> allocateSlot(
                ScheduledUnit task,
                boolean allowQueued,
-               Collection<TaskManagerLocation> preferredLocations,
+               SlotProfile slotProfile,
                Time timeout) {
                return allocateSlot(
                        new SlotRequestId(),
                        task,
                        allowQueued,
-                       preferredLocations,
+                       slotProfile,
                        timeout);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index fd6be46..242d645 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -175,108 +176,23 @@ public class SlotSharingManager {
         * preferred locations is checked.
         *
         * @param groupId which the returned slot must not contain
-        * @param locationPreferences specifying which locations are preferred
+        * @param matcher slot profile matcher to match slot with the profile 
requirements
         * @return the resolved root slot and its locality wrt to the specified 
location preferences
         *              or null if there was no root slot which did not contain 
the given groupId
         */
        @Nullable
-       MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, 
Collection<TaskManagerLocation> locationPreferences) {
-               Preconditions.checkNotNull(locationPreferences);
-
-               final MultiTaskSlotLocality multiTaskSlotLocality;
-
-               if (locationPreferences.isEmpty()) {
-                       multiTaskSlotLocality = 
getResolvedRootSlotWithoutLocationPreferences(groupId);
-               } else {
-                       multiTaskSlotLocality = 
getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences);
-               }
-
-               return multiTaskSlotLocality;
-       }
-
-       /**
-        * Gets a resolved root slot which does not yet contain the given 
groupId. The method will try to
-        * find a slot of a TaskManager contained in the collection of 
preferred locations. If there is no such slot
-        * with free capacities available, then the method will look for slots 
of TaskManager which run on the same
-        * machine as the TaskManager in the collection of preferred locations. 
If there is no such slot, then any slot
-        * with free capacities is returned. If there is no such slot, then 
null is returned.
-        *
-        * @param groupId which the returned slot must not contain
-        * @param locationPreferences specifying which locations are preferred
-        * @return the resolved root slot and its locality wrt to the specified 
location preferences
-        *              or null if there was not root slot which did not 
contain the given groupId
-        */
-       @Nullable
-       private MultiTaskSlotLocality 
getResolvedRootSlotWithLocationPreferences(AbstractID groupId, 
Collection<TaskManagerLocation> locationPreferences) {
-               Preconditions.checkNotNull(groupId);
-               Preconditions.checkNotNull(locationPreferences);
-               final Set<String> hostnameSet = new HashSet<>(16);
-               MultiTaskSlot nonLocalMultiTaskSlot = null;
-
+       MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, 
SlotProfile.ProfileToSlotContextMatcher matcher) {
                synchronized (lock) {
-                       for (TaskManagerLocation locationPreference : 
locationPreferences) {
-                               final Set<MultiTaskSlot> multiTaskSlots = 
resolvedRootSlots.get(locationPreference);
-
-                               if (multiTaskSlots != null) {
-                                       for (MultiTaskSlot multiTaskSlot : 
multiTaskSlots) {
-                                               if 
(!multiTaskSlot.contains(groupId)) {
-                                                       return 
MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
-                                               }
-                                       }
-
-                                       
hostnameSet.add(locationPreference.getHostname());
-                               }
-                       }
-
-                       for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>> 
taskManagerLocationSetEntry : resolvedRootSlots.entrySet()) {
-                               if 
(hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) {
-                                       for (MultiTaskSlot multiTaskSlot : 
taskManagerLocationSetEntry.getValue()) {
-                                               if 
(!multiTaskSlot.contains(groupId)) {
-                                                       return 
MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL);
-                                               }
-                                       }
-                               } else if (nonLocalMultiTaskSlot == null) {
-                                       for (MultiTaskSlot multiTaskSlot : 
taskManagerLocationSetEntry.getValue()) {
-                                               if 
(!multiTaskSlot.contains(groupId)) {
-                                                       nonLocalMultiTaskSlot = 
multiTaskSlot;
-                                               }
-                                       }
-                               }
-                       }
-               }
-
-               if (nonLocalMultiTaskSlot != null) {
-                       return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot, 
Locality.NON_LOCAL);
-               } else {
-                       return null;
+                       Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues 
= this.resolvedRootSlots.values();
+                       return matcher.findMatchWithLocality(
+                               
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+                               (MultiTaskSlot multiTaskSlot) -> 
multiTaskSlot.getSlotContextFuture().join(),
+                               (MultiTaskSlot multiTaskSlot) -> 
!multiTaskSlot.contains(groupId),
+                               MultiTaskSlotLocality::of);
                }
        }
 
        /**
-        * Gets a resolved slot which does not yet contain the given groupId 
without any location
-        * preferences.
-        *
-        * @param groupId which the returned slot must not contain
-        * @return the resolved slot or null if there was no root slot with 
free capacities
-        */
-       @Nullable
-       private MultiTaskSlotLocality 
getResolvedRootSlotWithoutLocationPreferences(AbstractID groupId) {
-               Preconditions.checkNotNull(groupId);
-
-               synchronized (lock) {
-                       for (Set<MultiTaskSlot> multiTaskSlots : 
resolvedRootSlots.values()) {
-                               for (MultiTaskSlot multiTaskSlot : 
multiTaskSlots) {
-                                       if (!multiTaskSlot.contains(groupId)) {
-                                               return 
MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED);
-                                       }
-                               }
-                       }
-               }
-
-               return null;
-       }
-
-       /**
         * Gets an unresolved slot which does not yet contain the given 
groupId. An unresolved
         * slot is a slot whose underlying allocated slot has not been 
allocated yet.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
new file mode 100644
index 0000000..c09d4bb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class SlotProfileTest {
+
+       private final ResourceProfile resourceProfile = new ResourceProfile(2, 
1024);
+
+       private final AllocationID aid1 = new AllocationID();
+       private final AllocationID aid2 = new AllocationID();
+       private final AllocationID aid3 = new AllocationID();
+       private final AllocationID aid4 = new AllocationID();
+       private final AllocationID aidX = new AllocationID();
+
+       private final TaskManagerLocation tml1 = new TaskManagerLocation(new 
ResourceID("tm-1"), InetAddress.getLoopbackAddress(), 42);
+       private final TaskManagerLocation tml2 = new TaskManagerLocation(new 
ResourceID("tm-2"), InetAddress.getLoopbackAddress(), 43);
+       private final TaskManagerLocation tml3 = new TaskManagerLocation(new 
ResourceID("tm-3"), InetAddress.getLoopbackAddress(), 44);
+       private final TaskManagerLocation tml4 = new TaskManagerLocation(new 
ResourceID("tm-4"), InetAddress.getLoopbackAddress(), 45);
+       private final TaskManagerLocation tmlX = new TaskManagerLocation(new 
ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46);
+
+       private final TaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+
+       private SimpleSlotContext ssc1 = new SimpleSlotContext(aid1, tml1, 1, 
taskManagerGateway);
+       private SimpleSlotContext ssc2 = new SimpleSlotContext(aid2, tml2, 2, 
taskManagerGateway);
+       private SimpleSlotContext ssc3 = new SimpleSlotContext(aid3, tml3, 3, 
taskManagerGateway);
+       private SimpleSlotContext ssc4 = new SimpleSlotContext(aid4, tml4, 4, 
taskManagerGateway);
+
+       private final Set<SlotContext> candidates = 
Collections.unmodifiableSet(createCandidates());
+
+       private Set<SlotContext> createCandidates() {
+               Set<SlotContext> candidates = new HashSet<>(4);
+               candidates.add(ssc1);
+               candidates.add(ssc2);
+               candidates.add(ssc3);
+               candidates.add(ssc4);
+               return candidates;
+       }
+
+       @Test
+       public void matchNoRequirements() {
+
+               SlotProfile slotProfile = new SlotProfile(resourceProfile, 
Collections.emptyList(), Collections.emptyList());
+               SlotContext match = runMatching(slotProfile);
+
+               Assert.assertTrue(candidates.contains(match));
+       }
+
+       @Test
+       public void matchPreferredLocationNotAvailable() {
+
+               SlotProfile slotProfile = new SlotProfile(resourceProfile, 
Collections.singletonList(tmlX), Collections.emptyList());
+               SlotContext match = runMatching(slotProfile);
+
+               Assert.assertTrue(candidates.contains(match));
+       }
+
+       @Test
+       public void matchPreferredLocation() {
+
+               SlotProfile slotProfile = new SlotProfile(resourceProfile, 
Collections.singletonList(tml2), Collections.emptyList());
+               SlotContext match = runMatching(slotProfile);
+
+               Assert.assertEquals(ssc2, match);
+
+               slotProfile = new SlotProfile(resourceProfile, 
Arrays.asList(tmlX, tml4), Collections.emptyList());
+               match = runMatching(slotProfile);
+
+               Assert.assertEquals(ssc4, match);
+       }
+
+       @Test
+       public void matchPreviousAllocationOverridesPreferredLocation() {
+
+               SlotProfile slotProfile = new SlotProfile(resourceProfile, 
Collections.singletonList(tml2), Collections.singletonList(aid3));
+               SlotContext match = runMatching(slotProfile);
+
+               Assert.assertEquals(ssc3, match);
+
+               slotProfile = new SlotProfile(resourceProfile, 
Arrays.asList(tmlX, tml1), Arrays.asList(aidX, aid2));
+               match = runMatching(slotProfile);
+
+               Assert.assertEquals(ssc2, match);
+       }
+
+       @Test
+       public void matchPreviousLocationNotAvailable() {
+
+               SlotProfile slotProfile = new SlotProfile(resourceProfile, 
Collections.singletonList(tml4), Collections.singletonList(aidX));
+               SlotContext match = runMatching(slotProfile);
+
+               Assert.assertEquals(null, match);
+       }
+
+       private SlotContext runMatching(SlotProfile slotProfile) {
+               SlotProfile.ProfileToSlotContextMatcher matcher = 
slotProfile.matcher();
+               return matcher.findMatchWithLocality(
+                       candidates.stream(),
+                       (candidate) -> candidate,
+                       (candidate) -> true,
+                       (candidate, locality) -> candidate);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 1b835ad..63b3238 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -44,7 +45,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -80,7 +80,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
                        CompletableFuture<LogicalSlot> slotFuture1 = 
CompletableFuture.completedFuture(new TestingLogicalSlot());
                        CompletableFuture<LogicalSlot> slotFuture2 = 
CompletableFuture.completedFuture(new TestingLogicalSlot());
-                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(Collection.class), 
any(Time.class))).thenReturn(slotFuture1, slotFuture2);
+                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), 
any(Time.class))).thenReturn(slotFuture1, slotFuture2);
 
                        TestingRestartStrategy testingRestartStrategy = new 
TestingRestartStrategy();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index c0d5dc0..51d1827 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
@@ -35,7 +36,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
 
-import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
@@ -67,7 +67,7 @@ public class ExecutionVertexSchedulingTest {
                        Scheduler scheduler = mock(Scheduler.class);
                        CompletableFuture<LogicalSlot> future = new 
CompletableFuture<>();
                        future.complete(slot);
-                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(Collection.class), 
any(Time.class))).thenReturn(future);
+                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), 
any(Time.class))).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        // try to deploy to the slot
@@ -99,7 +99,7 @@ public class ExecutionVertexSchedulingTest {
                        final CompletableFuture<LogicalSlot> future = new 
CompletableFuture<>();
 
                        Scheduler scheduler = mock(Scheduler.class);
-                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(Collection.class), 
any(Time.class))).thenReturn(future);
+                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), 
any(Time.class))).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        // try to deploy to the slot
@@ -133,7 +133,7 @@ public class ExecutionVertexSchedulingTest {
                        Scheduler scheduler = mock(Scheduler.class);
                        CompletableFuture<LogicalSlot> future = new 
CompletableFuture<>();
                        future.complete(slot);
-                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(Collection.class), 
any(Time.class))).thenReturn(future);
+                       when(scheduler.allocateSlot(any(SlotRequestId.class), 
any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), 
any(Time.class))).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
index 2e7ebc9..daaf63b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -26,11 +27,9 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nullable;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -117,7 +116,7 @@ class ProgrammedSlotProvider implements SlotProvider {
                        SlotRequestId slotRequestId,
                        ScheduledUnit task,
                        boolean allowQueued,
-                       Collection<TaskManagerLocation> preferredLocations,
+                       SlotProfile slotProfile,
                        Time allocationTimeout) {
                JobVertexID vertexId = 
task.getTaskToExecute().getVertex().getJobvertexId();
                int subtask = task.getTaskToExecute().getParallelSubtaskIndex();

http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index c082e9a..7d11d37 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -22,19 +22,20 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.instance.Slot;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkException;
@@ -44,7 +45,6 @@ import javax.annotation.Nullable;
 
 import java.net.InetAddress;
 import java.util.ArrayDeque;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
 
@@ -89,7 +89,7 @@ public class SimpleSlotProvider implements SlotProvider, 
SlotOwner {
                        SlotRequestId slotRequestId,
                        ScheduledUnit task,
                        boolean allowQueued,
-                       Collection<TaskManagerLocation> preferredLocations,
+                       SlotProfile slotProfile,
                        Time allocationTimeout) {
                final SlotContext slot;
 

Reply via email to