Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5403#discussion_r170260253 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java --- @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; + +/** + * A slot profile describes the profile of a slot into which a task wants to be scheduled. The profile contains + * attributes such as resource or locality constraints, some of which may be hard or soft. A matcher can be generated + * to filter out candidate slots by matching their {@link SlotContext} against the slot profile and, potentially, + * further requirements. + */ +public class SlotProfile { + + /** Singleton object for a slot profile without any requirements. */ + private static final SlotProfile NO_REQUIREMENTS = noLocality(ResourceProfile.UNKNOWN); + + /** This specifies the desired resource profile for the slot. */ + @Nonnull + private final ResourceProfile resourceProfile; + + /** This specifies the preferred locations for the slot. */ + @Nonnull + private final Collection<TaskManagerLocation> preferredLocations; + + /** This contains desired allocation ids of the slot. */ + @Nonnull + private final Collection<AllocationID> priorAllocations; + + public SlotProfile( + @Nonnull ResourceProfile resourceProfile, + @Nonnull Collection<TaskManagerLocation> preferredLocations, + @Nonnull Collection<AllocationID> priorAllocations) { + this.resourceProfile = resourceProfile; + this.preferredLocations = preferredLocations; + this.priorAllocations = priorAllocations; + } + + /** + * Returns the desired resource profile for the slot. + */ + @Nonnull + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + /** + * Returns the preferred locations for the slot. + */ + @Nonnull + public Collection<TaskManagerLocation> getPreferredLocations() { + return preferredLocations; + } + + /** + * Returns the desired allocation ids for the slot. + */ + @Nonnull + public Collection<AllocationID> getPriorAllocations() { + return priorAllocations; + } + + public ProfileToSlotContextMatcher matcher() { + if (priorAllocations.isEmpty()) { + return new LocalityAwareRequirementsToSlotMatcher(preferredLocations); + } else { + return new PreviousAllocationProfileToSlotContextMatcher(priorAllocations); + } + } + + /** + * Classes that implement this interface provide a method to match objects to somehow represent slot candidates + * against the {@link SlotProfile} that produced the matcher object. A matching candidate is transformed into a + * desired result. If the matcher does not find a matching candidate, it returns null. + */ + public interface ProfileToSlotContextMatcher { + + /** + * @param candidates stream of candidates to match against. + * @param contextExtractor function to extract the {@link SlotContext} from the candidates. + * @param additionalRequirementsFilter predicate to specify additional requirements for each candidate. + * @param resultProducer function to produce a result from a matching candidate input. + * @param <IN> type of the objects against we match the profile. + * @param <OUT> type of the produced output from a matching object. + * @return the result produced by resultProducer if a matching candidate was found or null otherwise. + */ + @Nullable + <IN, OUT> OUT findMatchWithLocality( + @Nonnull Stream<IN> candidates, + @Nonnull Function<IN, SlotContext> contextExtractor, + @Nonnull Predicate<IN> additionalRequirementsFilter, + @Nonnull BiFunction<IN, Locality, OUT> resultProducer); + } + + /** + * This matcher implementation is the presence of prior allocations. Prior allocations are supposed to overrule + * other locality requirements, such as preferred locations. Prior allocations also require strict matching and + * this matcher returns null if it cannot find a candidate for the same prior allocation. The background is that + * this will force the scheduler tor request a new slot that is guaranteed to be not the prior location of any + * other subtask, so that subtasks do not steal another subtasks prior allocation in case that the own prior + * allocation is no longer available (e.g. machine failure). This is important to enable local recovery for all + * tasks that can still return to their prior allocation. + */ + @VisibleForTesting + public static class PreviousAllocationProfileToSlotContextMatcher implements ProfileToSlotContextMatcher { + + /** Set of prior allocations. */ + private final HashSet<AllocationID> priorAllocations; + + @VisibleForTesting + PreviousAllocationProfileToSlotContextMatcher(Collection<AllocationID> priorAllocations) { + this.priorAllocations = new HashSet<>(priorAllocations); + Preconditions.checkState( + this.priorAllocations.size() > 0, + "This matcher should only be used if there are prior allocations!"); + } + + public <I, O> O findMatchWithLocality( + @Nonnull Stream<I> candidates, + @Nonnull Function<I, SlotContext> contextExtractor, + @Nonnull Predicate<I> additionalRequirementsFilter, + @Nonnull BiFunction<I, Locality, O> resultProducer) { + + Predicate<I> filterByAllocation = + (candidate) -> priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId()); + + return candidates + .filter(filterByAllocation.and(additionalRequirementsFilter)) + .findFirst() + .map((result) -> resultProducer.apply(result, Locality.LOCAL)) // TODO introduce special locality? + .orElse(null); + } + } + + /** + * This matcher is used whenever no prior allocation was specified in the {@link SlotProfile}. This implementation + * tries to achieve best possible locality if a preferred location is specified in the profile. + */ + @VisibleForTesting + public static class LocalityAwareRequirementsToSlotMatcher implements ProfileToSlotContextMatcher { + + private final Collection<TaskManagerLocation> locationPreferences; + + @VisibleForTesting + public LocalityAwareRequirementsToSlotMatcher(Collection<TaskManagerLocation> locationPreferences) { + this.locationPreferences = locationPreferences; + } + + @Override + public <IN, OUT> OUT findMatchWithLocality( + @Nonnull Stream<IN> candidates, + @Nonnull Function<IN, SlotContext> contextExtractor, --- End diff -- The extractor could return `TaskManagerLocation` to make it a bit more specific.
---