[FLINK-8781][scheduler] Try to reschedule failed tasks to previous allocation
This closes #5403. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d63bc75f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d63bc75f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d63bc75f Branch: refs/heads/master Commit: d63bc75ffa3ad1b0d4de82cd218b9c4d268b41ab Parents: 647c552 Author: Stefan Richter <[email protected]> Authored: Thu Feb 1 16:02:28 2018 +0100 Committer: Stefan Richter <[email protected]> Committed: Mon Feb 26 15:32:01 2018 +0100 ---------------------------------------------------------------------- .../clusterframework/types/SlotProfile.java | 283 +++++++++++++++++++ .../flink/runtime/executiongraph/Execution.java | 29 +- .../runtime/executiongraph/ExecutionVertex.java | 27 +- .../runtime/jobmanager/scheduler/Scheduler.java | 14 +- .../jobmanager/slots/SlotAndLocality.java | 14 +- .../runtime/jobmaster/slotpool/SlotPool.java | 134 +++------ .../jobmaster/slotpool/SlotPoolGateway.java | 7 +- .../jobmaster/slotpool/SlotProvider.java | 11 +- .../jobmaster/slotpool/SlotSharingManager.java | 102 +------ .../clusterframework/types/SlotProfileTest.java | 133 +++++++++ .../ExecutionGraphMetricsTest.java | 4 +- .../ExecutionVertexSchedulingTest.java | 8 +- .../executiongraph/ProgrammedSlotProvider.java | 5 +- .../utils/SimpleSlotProvider.java | 16 +- .../ScheduleWithCoLocationHintTest.java | 128 +++++---- .../scheduler/SchedulerIsolatedTasksTest.java | 50 ++-- .../scheduler/SchedulerSlotSharingTest.java | 228 +++++++-------- .../jobmanager/scheduler/SchedulerTest.java | 4 +- .../jobmanager/scheduler/SchedulerTestBase.java | 19 +- .../jobmaster/slotpool/AvailableSlotsTest.java | 5 +- .../slotpool/SlotPoolCoLocationTest.java | 10 +- .../jobmaster/slotpool/SlotPoolRpcTest.java | 16 +- .../slotpool/SlotPoolSlotSharingTest.java | 22 +- .../jobmaster/slotpool/SlotPoolTest.java | 39 ++- .../slotpool/SlotSharingManagerTest.java | 12 +- 25 files changed, 833 insertions(+), 487 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java new file mode 100644 index 0000000..5b1fa08 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; + +/** + * A slot profile describes the profile of a slot into which a task wants to be scheduled. The profile contains + * attributes such as resource or locality constraints, some of which may be hard or soft. A matcher can be generated + * to filter out candidate slots by matching their {@link SlotContext} against the slot profile and, potentially, + * further requirements. + */ +public class SlotProfile { + + /** Singleton object for a slot profile without any requirements. */ + private static final SlotProfile NO_REQUIREMENTS = noLocality(ResourceProfile.UNKNOWN); + + /** This specifies the desired resource profile for the slot. */ + @Nonnull + private final ResourceProfile resourceProfile; + + /** This specifies the preferred locations for the slot. */ + @Nonnull + private final Collection<TaskManagerLocation> preferredLocations; + + /** This contains desired allocation ids of the slot. */ + @Nonnull + private final Collection<AllocationID> priorAllocations; + + public SlotProfile( + @Nonnull ResourceProfile resourceProfile, + @Nonnull Collection<TaskManagerLocation> preferredLocations, + @Nonnull Collection<AllocationID> priorAllocations) { + + this.resourceProfile = resourceProfile; + this.preferredLocations = preferredLocations; + this.priorAllocations = priorAllocations; + } + + /** + * Returns the desired resource profile for the slot. + */ + @Nonnull + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + /** + * Returns the preferred locations for the slot. + */ + @Nonnull + public Collection<TaskManagerLocation> getPreferredLocations() { + return preferredLocations; + } + + /** + * Returns the desired allocation ids for the slot. + */ + @Nonnull + public Collection<AllocationID> getPriorAllocations() { + return priorAllocations; + } + + /** + * Returns the matcher for this profile that helps to find slots that fit the profile. + */ + public ProfileToSlotContextMatcher matcher() { + if (priorAllocations.isEmpty()) { + return new LocalityAwareRequirementsToSlotMatcher(preferredLocations); + } else { + return new PreviousAllocationProfileToSlotContextMatcher(priorAllocations); + } + } + + /** + * Classes that implement this interface provide a method to match objects to somehow represent slot candidates + * against the {@link SlotProfile} that produced the matcher object. A matching candidate is transformed into a + * desired result. If the matcher does not find a matching candidate, it returns null. + */ + public interface ProfileToSlotContextMatcher { + + /** + * This method takes the candidate slots, extracts slot contexts from them, filters them by the profile + * requirements and potentially by additional requirements, and produces a result from a match. + * + * @param candidates stream of candidates to match against. + * @param contextExtractor function to extract the {@link SlotContext} from the candidates. + * @param additionalRequirementsFilter predicate to specify additional requirements for each candidate. + * @param resultProducer function to produce a result from a matching candidate input. + * @param <IN> type of the objects against we match the profile. + * @param <OUT> type of the produced output from a matching object. + * @return the result produced by resultProducer if a matching candidate was found or null otherwise. + */ + @Nullable + <IN, OUT> OUT findMatchWithLocality( + @Nonnull Stream<IN> candidates, + @Nonnull Function<IN, SlotContext> contextExtractor, + @Nonnull Predicate<IN> additionalRequirementsFilter, + @Nonnull BiFunction<IN, Locality, OUT> resultProducer); + } + + /** + * This matcher implementation is the presence of prior allocations. Prior allocations are supposed to overrule + * other locality requirements, such as preferred locations. Prior allocations also require strict matching and + * this matcher returns null if it cannot find a candidate for the same prior allocation. The background is that + * this will force the scheduler tor request a new slot that is guaranteed to be not the prior location of any + * other subtask, so that subtasks do not steal another subtasks prior allocation in case that the own prior + * allocation is no longer available (e.g. machine failure). This is important to enable local recovery for all + * tasks that can still return to their prior allocation. + */ + @VisibleForTesting + public static class PreviousAllocationProfileToSlotContextMatcher implements ProfileToSlotContextMatcher { + + /** Set of prior allocations. */ + private final HashSet<AllocationID> priorAllocations; + + @VisibleForTesting + PreviousAllocationProfileToSlotContextMatcher(@Nonnull Collection<AllocationID> priorAllocations) { + this.priorAllocations = new HashSet<>(priorAllocations); + Preconditions.checkState( + this.priorAllocations.size() > 0, + "This matcher should only be used if there are prior allocations!"); + } + + public <I, O> O findMatchWithLocality( + @Nonnull Stream<I> candidates, + @Nonnull Function<I, SlotContext> contextExtractor, + @Nonnull Predicate<I> additionalRequirementsFilter, + @Nonnull BiFunction<I, Locality, O> resultProducer) { + + Predicate<I> filterByAllocation = + (candidate) -> priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId()); + + return candidates + .filter(filterByAllocation.and(additionalRequirementsFilter)) + .findFirst() + .map((result) -> resultProducer.apply(result, Locality.LOCAL)) // TODO introduce special locality? + .orElse(null); + } + } + + /** + * This matcher is used whenever no prior allocation was specified in the {@link SlotProfile}. This implementation + * tries to achieve best possible locality if a preferred location is specified in the profile. + */ + @VisibleForTesting + public static class LocalityAwareRequirementsToSlotMatcher implements ProfileToSlotContextMatcher { + + private final Collection<TaskManagerLocation> locationPreferences; + + @VisibleForTesting + public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection<TaskManagerLocation> locationPreferences) { + this.locationPreferences = new ArrayList<>(locationPreferences); + } + + @Override + public <IN, OUT> OUT findMatchWithLocality( + @Nonnull Stream<IN> candidates, + @Nonnull Function<IN, SlotContext> contextExtractor, + @Nonnull Predicate<IN> additionalRequirementsFilter, + @Nonnull BiFunction<IN, Locality, OUT> resultProducer) { + + // if we have no location preferences, we can only filter by the additional requirements. + if (locationPreferences.isEmpty()) { + return candidates + .filter(additionalRequirementsFilter) + .findFirst() + .map((result) -> resultProducer.apply(result, Locality.UNCONSTRAINED)) + .orElse(null); + } + + // we build up two indexes, one for resource id and one for host names of the preferred locations. + HashSet<ResourceID> preferredResourceIDs = new HashSet<>(locationPreferences.size()); + HashSet<String> preferredFQHostNames = new HashSet<>(locationPreferences.size()); + + for (TaskManagerLocation locationPreference : locationPreferences) { + preferredResourceIDs.add(locationPreference.getResourceID()); + preferredFQHostNames.add(locationPreference.getFQDNHostname()); + } + + Iterator<IN> iterator = candidates.iterator(); + + IN matchByHostName = null; + IN matchByAdditionalRequirements = null; + + while (iterator.hasNext()) { + + IN candidate = iterator.next(); + SlotContext slotContext = contextExtractor.apply(candidate); + + // this if checks if the candidate has is a local slot + if (preferredResourceIDs.contains(slotContext.getTaskManagerLocation().getResourceID())) { + if (additionalRequirementsFilter.test(candidate)) { + // we can stop, because we found a match with best possible locality. + return resultProducer.apply(candidate, Locality.LOCAL); + } else { + // next candidate because this failed on the additional requirements. + continue; + } + } + + // this if checks if the candidate is at least host-local, if we did not find another host-local + // candidate before. + if (matchByHostName == null) { + if (preferredFQHostNames.contains(slotContext.getTaskManagerLocation().getFQDNHostname())) { + if (additionalRequirementsFilter.test(candidate)) { + // We remember the candidate, but still continue because there might still be a candidate + // that is local to the desired task manager. + matchByHostName = candidate; + } else { + // next candidate because this failed on the additional requirements. + continue; + } + } + + // this if checks if the candidate at least fulfils the resource requirements, and is only required + // if we did not yet find a valid candidate with better locality. + if (matchByAdditionalRequirements == null + && additionalRequirementsFilter.test(candidate)) { + // Again, we remember but continue in hope for a candidate with better locality. + matchByAdditionalRequirements = candidate; + } + } + } + + // at the end of the iteration, we return the candidate with best possible locality or null. + if (matchByHostName != null) { + return resultProducer.apply(matchByHostName, Locality.HOST_LOCAL); + } else if (matchByAdditionalRequirements != null) { + return resultProducer.apply(matchByAdditionalRequirements, Locality.NON_LOCAL); + } else { + return null; + } + } + } + + /** + * Returns a slot profile that has no requirements. + */ + public static SlotProfile noRequirements() { + return NO_REQUIREMENTS; + } + + /** + * Returns a slot profile for the given resource profile, without any locality requirements. + */ + public static SlotProfile noLocality(ResourceProfile resourceProfile) { + return new SlotProfile(resourceProfile, Collections.emptyList(), Collections.emptyList()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 946f6e4..2fb831a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -26,7 +26,10 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor; @@ -155,6 +158,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution @Nullable private volatile JobManagerTaskRestore taskRestore; + /** This field holds the allocation id once it was assigned successfully. */ + @Nullable + private volatile AllocationID assignedAllocationID; + // ------------------------ Accumulators & Metrics ------------------------ /** Lock for updating the accumulators atomically. @@ -234,6 +241,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution return state; } + @Nullable + public AllocationID getAssignedAllocationID() { + return assignedAllocationID; + } + /** * Gets the global modification version of the execution graph when this execution was created. * @@ -271,7 +283,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution if (state == SCHEDULED || state == CREATED) { checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet."); taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation()); - + assignedAllocationID = logicalSlot.getAllocationId(); return true; } else { // free assigned resource and return false @@ -458,8 +470,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution new ScheduledUnit(this, slotSharingGroupId) : new ScheduledUnit(this, slotSharingGroupId, locationConstraint); + // try to extract previous allocation ids, if applicable, so that we can reschedule to the same slot + ExecutionVertex executionVertex = getVertex(); + AllocationID lastAllocation = executionVertex.getLatestPriorAllocation(); + + Collection<AllocationID> previousAllocationIDs = + lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList(); + // calculate the preferred locations - final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = calculatePreferredLocations(locationPreferenceConstraint); + final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = + calculatePreferredLocations(locationPreferenceConstraint); final SlotRequestId slotRequestId = new SlotRequestId(); @@ -470,7 +490,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution slotRequestId, toSchedule, queued, - preferredLocations, + new SlotProfile( + ResourceProfile.UNKNOWN, + preferredLocations, + previousAllocationIDs), allocationTimeout)); // register call back to cancel slot request in case that the execution gets canceled http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index f13e42c..8b57a7a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor; @@ -295,17 +296,11 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi } } - /** - * Gets the location where the latest completed/canceled/failed execution of the vertex's - * task happened. - * - * @return The latest prior execution location, or null, if there is none, yet. - */ - public TaskManagerLocation getLatestPriorLocation() { + public Execution getLatestPriorExecution() { synchronized (priorExecutions) { final int size = priorExecutions.size(); if (size > 0) { - return priorExecutions.get(size - 1).getAssignedResourceLocation(); + return priorExecutions.get(size - 1); } else { return null; @@ -313,6 +308,22 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi } } + /** + * Gets the location where the latest completed/canceled/failed execution of the vertex's + * task happened. + * + * @return The latest prior execution location, or null, if there is none, yet. + */ + public TaskManagerLocation getLatestPriorLocation() { + Execution latestPriorExecution = getLatestPriorExecution(); + return latestPriorExecution != null ? latestPriorExecution.getAssignedResourceLocation() : null; + } + + public AllocationID getLatestPriorAllocation() { + Execution latestPriorExecution = getLatestPriorExecution(); + return latestPriorExecution != null ? latestPriorExecution.getAssignedAllocationID() : null; + } + EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() { synchronized (priorExecutions) { return new EvictingBoundedList<>(priorExecutions); http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index fc79d40..0116fdb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -21,19 +21,20 @@ package org.apache.flink.runtime.jobmanager.scheduler; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceDiedException; import org.apache.flink.runtime.instance.InstanceListener; -import org.apache.flink.runtime.instance.SlotSharingGroupId; -import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.instance.SharedSlot; import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.instance.SlotSharingGroupAssignment; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; @@ -49,7 +50,6 @@ import javax.annotation.Nullable; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -148,11 +148,11 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, - Collection<TaskManagerLocation> preferredLocations, + SlotProfile slotProfile, Time allocationTimeout) { try { - final Object ret = scheduleTask(task, allowQueued, preferredLocations); + final Object ret = scheduleTask(task, allowQueued, slotProfile.getPreferredLocations()); if (ret instanceof SimpleSlot) { return CompletableFuture.completedFuture((SimpleSlot) ret); http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java index 85871c8..fed26c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java @@ -18,31 +18,35 @@ package org.apache.flink.runtime.jobmanager.slots; -import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot; import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot; -import static org.apache.flink.util.Preconditions.checkNotNull; +import javax.annotation.Nonnull; /** * A combination of a {@link AllocatedSlot} and a {@link Locality}. */ public class SlotAndLocality { + @Nonnull private final AllocatedSlot slot; + @Nonnull private final Locality locality; - public SlotAndLocality(AllocatedSlot slot, Locality locality) { - this.slot = checkNotNull(slot); - this.locality = checkNotNull(locality); + public SlotAndLocality(@Nonnull AllocatedSlot slot, @Nonnull Locality locality) { + this.slot = slot; + this.locality = locality; } // ------------------------------------------------------------------------ + @Nonnull public AllocatedSlot getSlot() { return slot; } + @Nonnull public Locality getLocality() { return locality; } http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index a49e6ed..8a2dd45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.instance.SlotSharingGroupId; @@ -303,16 +304,14 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS public CompletableFuture<LogicalSlot> allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, - ResourceProfile resourceProfile, - Collection<TaskManagerLocation> locationPreferences, + SlotProfile slotProfile, boolean allowQueuedScheduling, Time timeout) { return internalAllocateSlot( slotRequestId, scheduledUnit, - resourceProfile, - locationPreferences, + slotProfile, allowQueuedScheduling, timeout); } @@ -320,8 +319,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS private CompletableFuture<LogicalSlot> internalAllocateSlot( SlotRequestId slotRequestId, ScheduledUnit task, - ResourceProfile resourceProfile, - Collection<TaskManagerLocation> locationPreferences, + SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) { @@ -343,16 +341,14 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot( task.getCoLocationConstraint(), multiTaskSlotManager, - resourceProfile, - locationPreferences, + slotProfile, allowQueuedScheduling, allocationTimeout); } else { multiTaskSlotLocality = allocateMultiTaskSlot( task.getJobVertexId(), multiTaskSlotManager, - resourceProfile, - locationPreferences, + slotProfile, allowQueuedScheduling, allocationTimeout); } @@ -373,8 +369,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS // request an allocated slot to assign a single logical slot to CompletableFuture<SlotAndLocality> slotAndLocalityFuture = requestAllocatedSlot( slotRequestId, - resourceProfile, - locationPreferences, + slotProfile, allowQueuedScheduling, allocationTimeout); @@ -408,8 +403,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS * * @param coLocationConstraint for which to allocate a {@link SlotSharingManager.MultiTaskSlot} * @param multiTaskSlotManager responsible for the slot sharing group for which to allocate the slot - * @param resourceProfile specifying the requirements for the requested slot - * @param locationPreferences containing preferred TaskExecutors on which to allocate the slot + * @param slotProfile specifying the requirements for the requested slot * @param allowQueuedScheduling true if queued scheduling (the returned task slot must not be completed yet) is allowed, otherwise false * @param allocationTimeout timeout before the slot allocation times out * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated{@link SlotSharingManager.MultiTaskSlot} @@ -419,8 +413,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot( CoLocationConstraint coLocationConstraint, SlotSharingManager multiTaskSlotManager, - ResourceProfile resourceProfile, - Collection<TaskManagerLocation> locationPreferences, + SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) throws NoResourceAvailableException { final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId(); @@ -438,19 +431,18 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS } } - final Collection<TaskManagerLocation> actualLocationPreferences; - if (coLocationConstraint.isAssigned()) { - actualLocationPreferences = Collections.singleton(coLocationConstraint.getLocation()); - } else { - actualLocationPreferences = locationPreferences; + // refine the preferred locations of the slot profile + slotProfile = new SlotProfile( + slotProfile.getResourceProfile(), + Collections.singleton(coLocationConstraint.getLocation()), + slotProfile.getPriorAllocations()); } // get a new multi task slot final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = allocateMultiTaskSlot( coLocationConstraint.getGroupId(), multiTaskSlotManager, - resourceProfile, - actualLocationPreferences, + slotProfile, allowQueuedScheduling, allocationTimeout); @@ -505,8 +497,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS * * @param groupId for which to allocate a new {@link SlotSharingManager.MultiTaskSlot} * @param slotSharingManager responsible for the slot sharing group for which to allocate the slot - * @param resourceProfile specifying the requirements for the requested slot - * @param locationPreferences containing preferred TaskExecutors on which to allocate the slot + * @param slotProfile slot profile that specifies the requirements for the slot * @param allowQueuedScheduling true if queued scheduling (the returned task slot must not be completed yet) is allowed, otherwise false * @param allocationTimeout timeout before the slot allocation times out * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated {@link SlotSharingManager.MultiTaskSlot} @@ -516,15 +507,14 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot( AbstractID groupId, SlotSharingManager slotSharingManager, - ResourceProfile resourceProfile, - Collection<TaskManagerLocation> locationPreferences, + SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) throws NoResourceAvailableException { // check first whether we have a resolved root slot which we can use SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = slotSharingManager.getResolvedRootSlot( groupId, - locationPreferences); + slotProfile.matcher()); if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) { return multiTaskSlotLocality; @@ -534,7 +524,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS final SlotRequestId multiTaskSlotRequestId = new SlotRequestId(); // check whether we have an allocated slot available which we can use to create a new multi task slot in - final SlotAndLocality polledSlotAndLocality = pollAndAllocateSlot(allocatedSlotRequestId, resourceProfile, locationPreferences); + final SlotAndLocality polledSlotAndLocality = pollAndAllocateSlot(allocatedSlotRequestId, slotProfile); if (polledSlotAndLocality != null && (polledSlotAndLocality.getLocality() == Locality.LOCAL || multiTaskSlotLocality == null)) { @@ -571,7 +561,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS // it seems as if we have to request a new slot from the resource manager, this is always the last resort!!! final CompletableFuture<AllocatedSlot> futureSlot = requestNewAllocatedSlot( allocatedSlotRequestId, - resourceProfile, + slotProfile.getResourceProfile(), allocationTimeout); multiTaskSlotFuture = slotSharingManager.createRootSlot( @@ -613,24 +603,21 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS * Allocates an allocated slot first by polling from the available slots and then requesting a new * slot from the ResourceManager if no fitting slot could be found. * - * @param slotRequestId identifying the slot allocation request - * @param resourceProfile which the allocated slot should fulfill - * @param locationPreferences for the allocated slot + * @param slotProfile slot profile that specifies the requirements for the slot * @param allowQueuedScheduling true if the slot allocation can be completed in the future * @param allocationTimeout timeout before the slot allocation times out * @return Future containing the allocated simple slot */ private CompletableFuture<SlotAndLocality> requestAllocatedSlot( SlotRequestId slotRequestId, - ResourceProfile resourceProfile, - Collection<TaskManagerLocation> locationPreferences, + SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) { final CompletableFuture<SlotAndLocality> allocatedSlotLocalityFuture; // (1) do we have a slot available already? - SlotAndLocality slotFromPool = pollAndAllocateSlot(slotRequestId, resourceProfile, locationPreferences); + SlotAndLocality slotFromPool = pollAndAllocateSlot(slotRequestId, slotProfile); if (slotFromPool != null) { allocatedSlotLocalityFuture = CompletableFuture.completedFuture(slotFromPool); @@ -638,7 +625,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS // we have to request a new allocated slot CompletableFuture<AllocatedSlot> allocatedSlotFuture = requestNewAllocatedSlot( slotRequestId, - resourceProfile, + slotProfile.getResourceProfile(), allocationTimeout); allocatedSlotLocalityFuture = allocatedSlotFuture.thenApply((AllocatedSlot allocatedSlot) -> new SlotAndLocality(allocatedSlot, Locality.UNKNOWN)); @@ -832,11 +819,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS } @Nullable - private SlotAndLocality pollAndAllocateSlot( - SlotRequestId slotRequestId, - ResourceProfile resourceProfile, - Collection<TaskManagerLocation> locationPreferences) { - SlotAndLocality slotFromPool = availableSlots.poll(resourceProfile, locationPreferences); + private SlotAndLocality pollAndAllocateSlot(SlotRequestId slotRequestId, SlotProfile slotProfile) { + SlotAndLocality slotFromPool = availableSlots.poll(slotProfile); if (slotFromPool != null) { allocatedSlots.add(slotRequestId, slotFromPool.getSlot()); @@ -1404,63 +1388,34 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS * Poll a slot which matches the required resource profile. The polling tries to satisfy the * location preferences, by TaskManager and by host. * - * @param resourceProfile The required resource profile. - * @param locationPreferences The location preferences, in order to be checked. + * @param slotProfile slot profile that specifies the requirements for the slot * * @return Slot which matches the resource profile, null if we can't find a match */ - SlotAndLocality poll(ResourceProfile resourceProfile, Collection<TaskManagerLocation> locationPreferences) { + SlotAndLocality poll(SlotProfile slotProfile) { // fast path if no slots are available if (availableSlots.isEmpty()) { return null; } - boolean hadLocationPreference = false; - - if (locationPreferences != null && !locationPreferences.isEmpty()) { - - // first search by TaskManager - for (TaskManagerLocation location : locationPreferences) { - hadLocationPreference = true; - - final Set<AllocatedSlot> onTaskManager = availableSlotsByTaskManager.get(location.getResourceID()); - if (onTaskManager != null) { - for (AllocatedSlot candidate : onTaskManager) { - if (candidate.getResourceProfile().isMatching(resourceProfile)) { - remove(candidate.getAllocationId()); - return new SlotAndLocality(candidate, Locality.LOCAL); - } - } - } - } - - // now, search by host - for (TaskManagerLocation location : locationPreferences) { - final Set<AllocatedSlot> onHost = availableSlotsByHost.get(location.getFQDNHostname()); - if (onHost != null) { - for (AllocatedSlot candidate : onHost) { - if (candidate.getResourceProfile().isMatching(resourceProfile)) { - remove(candidate.getAllocationId()); - return new SlotAndLocality(candidate, Locality.HOST_LOCAL); - } - } - } - } - } + SlotProfile.ProfileToSlotContextMatcher matcher = slotProfile.matcher(); + Collection<SlotAndTimestamp> slotAndTimestamps = availableSlots.values(); - // take any slot - for (SlotAndTimestamp candidate : availableSlots.values()) { - final AllocatedSlot slot = candidate.slot(); + SlotAndLocality matchingSlotAndLocality = matcher.findMatchWithLocality( + slotAndTimestamps.stream(), + SlotAndTimestamp::slot, + (SlotAndTimestamp slot) -> slot.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile()), + (SlotAndTimestamp slotAndTimestamp, Locality locality) -> { + AllocatedSlot slot = slotAndTimestamp.slot(); + return new SlotAndLocality(slot, locality); + }); - if (slot.getResourceProfile().isMatching(resourceProfile)) { - remove(slot.getAllocationId()); - return new SlotAndLocality( - slot, hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED); - } + if (matchingSlotAndLocality != null) { + AllocatedSlot slot = matchingSlotAndLocality.getSlot(); + remove(slot.getAllocationId()); } - // nothing available that matches - return null; + return matchingSlotAndLocality; } /** @@ -1574,14 +1529,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, - Collection<TaskManagerLocation> preferredLocations, + SlotProfile slotProfile, Time timeout) { CompletableFuture<LogicalSlot> slotFuture = gateway.allocateSlot( slotRequestId, task, - ResourceProfile.UNKNOWN, - preferredLocations, + slotProfile, allowQueued, timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java index 7d11681..1aad92a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; @@ -143,8 +144,7 @@ public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway { * * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot - * @param resourceProfile which the allocated slot must fulfill - * @param locationPreferences which define where the allocated slot should be placed, this can also be empty + * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} @@ -152,8 +152,7 @@ public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway { CompletableFuture<LogicalSlot> allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, - ResourceProfile resourceProfile, - Collection<TaskManagerLocation> locationPreferences, + SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java index 80b2689..1653138 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java @@ -19,16 +19,15 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import javax.annotation.Nullable; -import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -57,7 +56,7 @@ public interface SlotProvider { SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, - Collection<TaskManagerLocation> preferredLocations, + SlotProfile slotProfile, Time timeout); /** @@ -65,20 +64,20 @@ public interface SlotProvider { * * @param task The task to allocate the slot for * @param allowQueued Whether allow the task be queued if we do not have enough resource - * @param preferredLocations preferred locations for the slot allocation + * @param slotProfile profile of the requested slot * @param timeout after which the allocation fails with a timeout exception * @return The future of the allocation */ default CompletableFuture<LogicalSlot> allocateSlot( ScheduledUnit task, boolean allowQueued, - Collection<TaskManagerLocation> preferredLocations, + SlotProfile slotProfile, Time timeout) { return allocateSlot( new SlotRequestId(), task, allowQueued, - preferredLocations, + slotProfile, timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java index fd6be46..242d645 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmaster.LogicalSlot; @@ -175,108 +176,23 @@ public class SlotSharingManager { * preferred locations is checked. * * @param groupId which the returned slot must not contain - * @param locationPreferences specifying which locations are preferred + * @param matcher slot profile matcher to match slot with the profile requirements * @return the resolved root slot and its locality wrt to the specified location preferences * or null if there was no root slot which did not contain the given groupId */ @Nullable - MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, Collection<TaskManagerLocation> locationPreferences) { - Preconditions.checkNotNull(locationPreferences); - - final MultiTaskSlotLocality multiTaskSlotLocality; - - if (locationPreferences.isEmpty()) { - multiTaskSlotLocality = getResolvedRootSlotWithoutLocationPreferences(groupId); - } else { - multiTaskSlotLocality = getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences); - } - - return multiTaskSlotLocality; - } - - /** - * Gets a resolved root slot which does not yet contain the given groupId. The method will try to - * find a slot of a TaskManager contained in the collection of preferred locations. If there is no such slot - * with free capacities available, then the method will look for slots of TaskManager which run on the same - * machine as the TaskManager in the collection of preferred locations. If there is no such slot, then any slot - * with free capacities is returned. If there is no such slot, then null is returned. - * - * @param groupId which the returned slot must not contain - * @param locationPreferences specifying which locations are preferred - * @return the resolved root slot and its locality wrt to the specified location preferences - * or null if there was not root slot which did not contain the given groupId - */ - @Nullable - private MultiTaskSlotLocality getResolvedRootSlotWithLocationPreferences(AbstractID groupId, Collection<TaskManagerLocation> locationPreferences) { - Preconditions.checkNotNull(groupId); - Preconditions.checkNotNull(locationPreferences); - final Set<String> hostnameSet = new HashSet<>(16); - MultiTaskSlot nonLocalMultiTaskSlot = null; - + MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SlotProfile.ProfileToSlotContextMatcher matcher) { synchronized (lock) { - for (TaskManagerLocation locationPreference : locationPreferences) { - final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(locationPreference); - - if (multiTaskSlots != null) { - for (MultiTaskSlot multiTaskSlot : multiTaskSlots) { - if (!multiTaskSlot.contains(groupId)) { - return MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL); - } - } - - hostnameSet.add(locationPreference.getHostname()); - } - } - - for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>> taskManagerLocationSetEntry : resolvedRootSlots.entrySet()) { - if (hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) { - for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) { - if (!multiTaskSlot.contains(groupId)) { - return MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL); - } - } - } else if (nonLocalMultiTaskSlot == null) { - for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) { - if (!multiTaskSlot.contains(groupId)) { - nonLocalMultiTaskSlot = multiTaskSlot; - } - } - } - } - } - - if (nonLocalMultiTaskSlot != null) { - return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot, Locality.NON_LOCAL); - } else { - return null; + Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values(); + return matcher.findMatchWithLocality( + resolvedRootSlotsValues.stream().flatMap(Collection::stream), + (MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(), + (MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId), + MultiTaskSlotLocality::of); } } /** - * Gets a resolved slot which does not yet contain the given groupId without any location - * preferences. - * - * @param groupId which the returned slot must not contain - * @return the resolved slot or null if there was no root slot with free capacities - */ - @Nullable - private MultiTaskSlotLocality getResolvedRootSlotWithoutLocationPreferences(AbstractID groupId) { - Preconditions.checkNotNull(groupId); - - synchronized (lock) { - for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) { - for (MultiTaskSlot multiTaskSlot : multiTaskSlots) { - if (!multiTaskSlot.contains(groupId)) { - return MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED); - } - } - } - } - - return null; - } - - /** * Gets an unresolved slot which does not yet contain the given groupId. An unresolved * slot is a slot whose underlying allocated slot has not been allocated yet. * http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java new file mode 100644 index 0000000..c09d4bb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import org.junit.Assert; +import org.junit.Test; + +import java.net.InetAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class SlotProfileTest { + + private final ResourceProfile resourceProfile = new ResourceProfile(2, 1024); + + private final AllocationID aid1 = new AllocationID(); + private final AllocationID aid2 = new AllocationID(); + private final AllocationID aid3 = new AllocationID(); + private final AllocationID aid4 = new AllocationID(); + private final AllocationID aidX = new AllocationID(); + + private final TaskManagerLocation tml1 = new TaskManagerLocation(new ResourceID("tm-1"), InetAddress.getLoopbackAddress(), 42); + private final TaskManagerLocation tml2 = new TaskManagerLocation(new ResourceID("tm-2"), InetAddress.getLoopbackAddress(), 43); + private final TaskManagerLocation tml3 = new TaskManagerLocation(new ResourceID("tm-3"), InetAddress.getLoopbackAddress(), 44); + private final TaskManagerLocation tml4 = new TaskManagerLocation(new ResourceID("tm-4"), InetAddress.getLoopbackAddress(), 45); + private final TaskManagerLocation tmlX = new TaskManagerLocation(new ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46); + + private final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + private SimpleSlotContext ssc1 = new SimpleSlotContext(aid1, tml1, 1, taskManagerGateway); + private SimpleSlotContext ssc2 = new SimpleSlotContext(aid2, tml2, 2, taskManagerGateway); + private SimpleSlotContext ssc3 = new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway); + private SimpleSlotContext ssc4 = new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway); + + private final Set<SlotContext> candidates = Collections.unmodifiableSet(createCandidates()); + + private Set<SlotContext> createCandidates() { + Set<SlotContext> candidates = new HashSet<>(4); + candidates.add(ssc1); + candidates.add(ssc2); + candidates.add(ssc3); + candidates.add(ssc4); + return candidates; + } + + @Test + public void matchNoRequirements() { + + SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.emptyList(), Collections.emptyList()); + SlotContext match = runMatching(slotProfile); + + Assert.assertTrue(candidates.contains(match)); + } + + @Test + public void matchPreferredLocationNotAvailable() { + + SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tmlX), Collections.emptyList()); + SlotContext match = runMatching(slotProfile); + + Assert.assertTrue(candidates.contains(match)); + } + + @Test + public void matchPreferredLocation() { + + SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tml2), Collections.emptyList()); + SlotContext match = runMatching(slotProfile); + + Assert.assertEquals(ssc2, match); + + slotProfile = new SlotProfile(resourceProfile, Arrays.asList(tmlX, tml4), Collections.emptyList()); + match = runMatching(slotProfile); + + Assert.assertEquals(ssc4, match); + } + + @Test + public void matchPreviousAllocationOverridesPreferredLocation() { + + SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tml2), Collections.singletonList(aid3)); + SlotContext match = runMatching(slotProfile); + + Assert.assertEquals(ssc3, match); + + slotProfile = new SlotProfile(resourceProfile, Arrays.asList(tmlX, tml1), Arrays.asList(aidX, aid2)); + match = runMatching(slotProfile); + + Assert.assertEquals(ssc2, match); + } + + @Test + public void matchPreviousLocationNotAvailable() { + + SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tml4), Collections.singletonList(aidX)); + SlotContext match = runMatching(slotProfile); + + Assert.assertEquals(null, match); + } + + private SlotContext runMatching(SlotProfile slotProfile) { + SlotProfile.ProfileToSlotContextMatcher matcher = slotProfile.matcher(); + return matcher.findMatchWithLocality( + candidates.stream(), + (candidate) -> candidate, + (candidate) -> true, + (candidate, locality) -> candidate); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index 1b835ad..63b3238 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; @@ -44,7 +45,6 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -80,7 +80,7 @@ public class ExecutionGraphMetricsTest extends TestLogger { CompletableFuture<LogicalSlot> slotFuture1 = CompletableFuture.completedFuture(new TestingLogicalSlot()); CompletableFuture<LogicalSlot> slotFuture2 = CompletableFuture.completedFuture(new TestingLogicalSlot()); - when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(slotFuture1, slotFuture2); + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(slotFuture1, slotFuture2); TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy(); http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index c0d5dc0..51d1827 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.DummyActorGateway; import org.apache.flink.runtime.instance.Instance; @@ -35,7 +36,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; -import java.util.Collection; import java.util.concurrent.CompletableFuture; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; @@ -67,7 +67,7 @@ public class ExecutionVertexSchedulingTest { Scheduler scheduler = mock(Scheduler.class); CompletableFuture<LogicalSlot> future = new CompletableFuture<>(); future.complete(slot); - when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(future); + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot @@ -99,7 +99,7 @@ public class ExecutionVertexSchedulingTest { final CompletableFuture<LogicalSlot> future = new CompletableFuture<>(); Scheduler scheduler = mock(Scheduler.class); - when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(future); + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot @@ -133,7 +133,7 @@ public class ExecutionVertexSchedulingTest { Scheduler scheduler = mock(Scheduler.class); CompletableFuture<LogicalSlot> future = new CompletableFuture<>(); future.complete(slot); - when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(Collection.class), any(Time.class))).thenReturn(future); + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java index 2e7ebc9..daaf63b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; @@ -26,11 +27,9 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import javax.annotation.Nullable; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -117,7 +116,7 @@ class ProgrammedSlotProvider implements SlotProvider { SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, - Collection<TaskManagerLocation> preferredLocations, + SlotProfile slotProfile, Time allocationTimeout) { JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId(); int subtask = task.getTaskToExecute().getParallelSubtaskIndex(); http://git-wip-us.apache.org/repos/asf/flink/blob/d63bc75f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java index c082e9a..7d11d37 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java @@ -22,19 +22,20 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.instance.SlotSharingGroupId; -import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.instance.Slot; -import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotContext; import org.apache.flink.runtime.jobmaster.SlotOwner; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.FlinkException; @@ -44,7 +45,6 @@ import javax.annotation.Nullable; import java.net.InetAddress; import java.util.ArrayDeque; -import java.util.Collection; import java.util.HashMap; import java.util.concurrent.CompletableFuture; @@ -89,7 +89,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner { SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, - Collection<TaskManagerLocation> preferredLocations, + SlotProfile slotProfile, Time allocationTimeout) { final SlotContext slot;
