Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5091#discussion_r155768880
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
    @@ -0,0 +1,722 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster.slotpool;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.jobmaster.LogicalSlot;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +import org.apache.flink.runtime.jobmaster.SlotOwner;
    +import org.apache.flink.runtime.jobmaster.SlotRequestId;
    +import org.apache.flink.runtime.instance.SlotSharingGroupId;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.util.AbstractID;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.AbstractCollection;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +
    +/**
    + * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
    + * tasks in the same slot and to realize co-location constraints.
    + *
    + * <p>The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
    + * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
    + * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
    + * co-location constraint running in this slot.
    + *
    + * <p>The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
    + * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
    + * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
    + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
    + * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
    + * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
    + * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
    + * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
    + * the actual task or the co-location constraint.
    + *
    + * <p>Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
    + * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
    + * task.
    + *
    + * <p>Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
    + * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
    + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
    + * multi task slot.
    + */
    +public class SlotSharingManager {
    +
    +   private final SlotSharingGroupId slotSharingGroupId;
    +
    +   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
    +   private final AllocatedSlotActions allocatedSlotActions;
    +
    +   // owner of the slots to which to return them when they are released 
from the outside
    +   private final SlotOwner slotOwner;
    +
    +   private final Map<SlotRequestId, TaskSlot> allTaskSlots;
    +
    +   // Root nodes which have not been completed because the allocated slot 
is still pending
    +   private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
    +
    +   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
    +   private final Map<TaskManagerLocation, Set<MultiTaskSlot>> 
resolvedRootSlots;
    +
    +   // Internal class to iterate over all resolved root slots
    +   private ResolvedRootSlotValues resolvedMultiTaskSlotValues;
    +
    +   public SlotSharingManager(
    +                   SlotSharingGroupId slotSharingGroupId,
    +                   AllocatedSlotActions allocatedSlotActions,
    +                   SlotOwner slotOwner) {
    +           this.slotSharingGroupId = 
Preconditions.checkNotNull(slotSharingGroupId);
    +           this.allocatedSlotActions = 
Preconditions.checkNotNull(allocatedSlotActions);
    +           this.slotOwner = Preconditions.checkNotNull(slotOwner);
    +
    +           allTaskSlots = new HashMap<>(16);
    +           unresolvedRootSlots = new HashMap<>(16);
    +           resolvedRootSlots = new HashMap<>(16);
    +
    +           resolvedMultiTaskSlotValues = null;
    +   }
    +
    +   public boolean isEmpty() {
    +           return allTaskSlots.isEmpty();
    +   }
    +
    +   public boolean contains(SlotRequestId slotRequestId) {
    +           return allTaskSlots.containsKey(slotRequestId);
    +   }
    +
    +   @Nullable
    +   public TaskSlot getTaskSlot(SlotRequestId slotRequestId) {
    +           return allTaskSlots.get(slotRequestId);
    +   }
    +
    +   /**
    +    * Creates a new root slot with the given {@link SlotRequestId}, {@link 
SlotContext} future and
    +    * the {@link SlotRequestId} of the allocated slot.
    +    *
    +    * @param slotRequestId of the root slot
    +    * @param slotContextFuture with which we create the root slot
    +    * @param allocatedSlotRequestId slot request id of the underlying 
allocated slot which can be used
    +    *                               to cancel the pending slot request or 
release the allocated slot
    +    * @return New root slot
    +    */
    +   public MultiTaskSlot createRootSlot(
    +                   SlotRequestId slotRequestId,
    +                   CompletableFuture<SlotContext> slotContextFuture,
    +                   SlotRequestId allocatedSlotRequestId) {
    +           final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
    +                   slotRequestId,
    +                   slotContextFuture,
    +                   allocatedSlotRequestId);
    +
    +           allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
    +           unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
    +
    +           // add the root node to the set of resolved root nodes once the 
SlotContext future has been completed
    +           // and we know the slot's TaskManagerLocation
    +           slotContextFuture.whenComplete(
    +                   (SlotContext slotContext, Throwable throwable) -> {
    +                           if (slotContext != null) {
    +                                   final MultiTaskSlot resolvedRootNode = 
unresolvedRootSlots.remove(slotRequestId);
    +
    +                                   if (resolvedRootNode != null) {
    +                                           final Set<MultiTaskSlot> 
innerCollection = resolvedRootSlots.computeIfAbsent(
    +                                                   
slotContext.getTaskManagerLocation(),
    +                                                   taskManagerLocation -> 
new HashSet<>(4));
    +
    +                                           
innerCollection.add(resolvedRootNode);
    +                                   }
    +                           } else {
    +                                   rootMultiTaskSlot.release(throwable);
    +                           }
    +                   });
    +
    +           return rootMultiTaskSlot;
    +   }
    +
    +   /**
    +    * Gets a resolved root slot which does not yet contain the given 
groupId. First the given set of
    +    * preferred locations is checked.
    +    *
    +    * @param groupId which the returned slot must not contain
    +    * @param locationPreferences specifying which locations are preferred
    +    * @return the resolved root slot and its locality wrt to the specified 
location preferences
    +    *              or null if there was no root slot which did not contain 
the given groupId
    +    */
    +   @Nullable
    +   public MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, 
Collection<TaskManagerLocation> locationPreferences) {
    +           Preconditions.checkNotNull(locationPreferences);
    +
    +           if (locationPreferences.isEmpty()) {
    +                   return 
getResolvedRootSlotWithoutLocationPreferences(groupId);
    +           } else {
    +                   return 
getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences);
    +           }
    +   }
    +
    +   /**
    +    * Gets a resolved root slot which does not yet contain the given 
groupId. The method will try to
    +    * find a slot of a TaskManager contained in the collection of 
preferred locations. If there is no such slot
    +    * with free capacities available, then the method will look for slots 
of TaskManager which run on the same
    +    * machine as the TaskManager in the collection of preferred locations. 
If there is no such slot, then any slot
    +    * with free capacities is returned. If there is no such slot, then 
null is returned.
    +    *
    +    * @param groupId which the returned slot must not contain
    +    * @param locationPreferences specifying which locations are preferred
    +    * @return the resolved root slot and its locality wrt to the specified 
location preferences
    +    *              or null if there was not root slot which did not 
contain the given groupId
    +    */
    +   @Nullable
    +   private MultiTaskSlotLocality 
getResolvedRootSlotWithLocationPreferences(AbstractID groupId, 
Collection<TaskManagerLocation> locationPreferences) {
    +           Preconditions.checkNotNull(groupId);
    +           Preconditions.checkNotNull(locationPreferences);
    +           final Set<String> hostnameSet = new HashSet<>();
    +
    +           for (TaskManagerLocation locationPreference : 
locationPreferences) {
    +                   final Set<MultiTaskSlot> multiTaskSlots = 
resolvedRootSlots.get(locationPreference);
    +
    +                   if (multiTaskSlots != null) {
    +                           for (MultiTaskSlot multiTaskSlot : 
multiTaskSlots) {
    +                                   if (!multiTaskSlot.contains(groupId)) {
    +                                           return 
MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
    +                                   }
    +                           }
    +
    +                           
hostnameSet.add(locationPreference.getHostname());
    +                   }
    +           }
    +
    +           MultiTaskSlot nonLocalMultiTaskSlot = null;
    +
    +           for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>> 
taskManagerLocationSetEntry : resolvedRootSlots.entrySet()) {
    +                   if 
(hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) {
    +                           for (MultiTaskSlot multiTaskSlot : 
taskManagerLocationSetEntry.getValue()) {
    +                                   if (!multiTaskSlot.contains(groupId)) {
    +                                           return 
MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL);
    +                                   }
    +                           }
    +                   } else if (nonLocalMultiTaskSlot == null) {
    +                           for (MultiTaskSlot multiTaskSlot : 
taskManagerLocationSetEntry.getValue()) {
    +                                   if (!multiTaskSlot.contains(groupId)) {
    +                                           nonLocalMultiTaskSlot = 
multiTaskSlot;
    +                                   }
    +                           }
    +                   }
    +           }
    +
    +           if (nonLocalMultiTaskSlot != null) {
    +                   return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot, 
Locality.NON_LOCAL);
    +           } else {
    +                   return null;
    +           }
    +   }
    +
    +   /**
    +    * Gets a resolved slot which does not yet contain the given groupId 
without any location
    +    * preferences.
    +    *
    +    * @param groupId which the returned slot must not contain
    +    * @return the resolved slot or null if there was no root slot with 
free capacities
    +    */
    +   @Nullable
    +   private MultiTaskSlotLocality 
getResolvedRootSlotWithoutLocationPreferences(AbstractID groupId) {
    +           Preconditions.checkNotNull(groupId);
    +
    +           for (Set<MultiTaskSlot> multiTaskSlots : 
resolvedRootSlots.values()) {
    +                   for (MultiTaskSlot multiTaskSlot : multiTaskSlots) {
    +                           if (!multiTaskSlot.contains(groupId)) {
    +                                   return 
MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED);
    +                           }
    +                   }
    +           }
    +
    +           return null;
    +   }
    +
    +   /**
    +    * Gets an unresolved slot which does not yet contain the given 
groupId. An unresolved
    +    * slot is a slot whose underlying allocated slot has not been 
allocated yet.
    +    *
    +    * @param groupId which the returned slot must not contain
    +    * @return the unresolved slot or null if there was no root slot with 
free capacities
    +    */
    +   @Nullable
    +   public MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
    +           for (MultiTaskSlot multiTaskSlot : 
unresolvedRootSlots.values()) {
    +                   if (!multiTaskSlot.contains(groupId)) {
    +                           return multiTaskSlot;
    +                   }
    +           }
    +
    +           return null;
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   // Inner classes: TaskSlot hierarchy and helper classes
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Helper class which contains a {@link MultiTaskSlot} and its {@link 
Locality}.
    +    */
    +   public static final class MultiTaskSlotLocality {
    +           private final MultiTaskSlot multiTaskSlot;
    +
    +           private final Locality locality;
    +
    +           public MultiTaskSlotLocality(MultiTaskSlot multiTaskSlot, 
Locality locality) {
    +                   this.multiTaskSlot = 
Preconditions.checkNotNull(multiTaskSlot);
    +                   this.locality = Preconditions.checkNotNull(locality);
    +           }
    +
    +           public MultiTaskSlot getMultiTaskSlot() {
    +                   return multiTaskSlot;
    +           }
    +
    +           public Locality getLocality() {
    +                   return locality;
    +           }
    +
    +           public static MultiTaskSlotLocality of(MultiTaskSlot 
multiTaskSlot, Locality locality) {
    +                   return new MultiTaskSlotLocality(multiTaskSlot, 
locality);
    +           }
    +   }
    +
    +   /**
    +    * Base class for all task slots.
    +    */
    +   public abstract static class TaskSlot {
    +           // every TaskSlot has an associated slot request id
    +           private final SlotRequestId slotRequestId;
    +
    +           // all task slots except for the root slots have a group id 
assigned
    +           @Nullable
    +           private final AbstractID groupId;
    +
    +           protected TaskSlot(SlotRequestId slotRequestId, @Nullable 
AbstractID groupId) {
    +                   this.slotRequestId = 
Preconditions.checkNotNull(slotRequestId);
    +                   this.groupId = groupId;
    +           }
    +
    +           public SlotRequestId getSlotRequestId() {
    +                   return slotRequestId;
    +           }
    +
    +           @Nullable
    +           public AbstractID getGroupId() {
    +                   return groupId;
    +           }
    +
    +           /**
    +            * Check whether the task slot contains the given groupId.
    +            *
    +            * @param groupId which to check whether it is contained
    +            * @return true if the task slot contains the given groupId, 
otherwise false
    +            */
    +           public boolean contains(AbstractID groupId) {
    +                   return Objects.equals(this.groupId, groupId);
    +           }
    +
    +           /**
    +            * Release the task slot.
    +            *
    +            * @param cause for the release
    +            * @return true if the slot could be released, otherwise false
    +            */
    +           public abstract boolean release(Throwable cause);
    +   }
    +
    +   /**
    +    * {@link TaskSlot} implementation which can have multiple other task 
slots assigned as children.
    +    */
    +   public final class MultiTaskSlot extends TaskSlot implements 
AllocatedSlot.Payload {
    +
    +           private final Map<AbstractID, TaskSlot> children;
    +
    +           // the root node has its parent set to null
    +           @Nullable
    +           private final MultiTaskSlot parent;
    +
    +           // underlying allocated slot
    +           private final CompletableFuture<SlotContext> slotContextFuture;
    +
    +           // slot request id of the allocated slot
    +           @Nullable
    +           private final SlotRequestId allocatedSlotRequestId;
    +
    +           // true if we are currently releasing our children
    +           private boolean releasingChildren;
    +
    +           private MultiTaskSlot(
    +                           SlotRequestId slotRequestId,
    +                           AbstractID groupId,
    +                           MultiTaskSlot parent) {
    +                   this(
    +                           slotRequestId,
    +                           groupId,
    +                           Preconditions.checkNotNull(parent),
    +                           parent.getSlotContextFuture(),
    +                           null);
    +           }
    +
    +           private MultiTaskSlot(
    +                           SlotRequestId slotRequestId,
    +                           CompletableFuture<SlotContext> 
slotContextFuture,
    +                           SlotRequestId allocatedSlotRequestId) {
    +                   this(
    +                           slotRequestId,
    +                           null,
    +                           null,
    +                           slotContextFuture,
    +                           allocatedSlotRequestId);
    +           }
    +
    +           private MultiTaskSlot(
    +                           SlotRequestId slotRequestId,
    +                           @Nullable AbstractID groupId,
    +                           MultiTaskSlot parent,
    +                           CompletableFuture<SlotContext> 
slotContextFuture,
    +                           SlotRequestId allocatedSlotRequestId) {
    +                   super(slotRequestId, groupId);
    +
    +                   this.parent = parent;
    +                   this.slotContextFuture = 
Preconditions.checkNotNull(slotContextFuture);
    +                   this.allocatedSlotRequestId = allocatedSlotRequestId;
    +
    +                   this.children = new HashMap<>(16);
    +                   this.releasingChildren = false;
    +
    +                   slotContextFuture.whenComplete(
    +                           (SlotContext ignored, Throwable throwable) -> {
    +                                   if (throwable != null) {
    +                                           release(throwable);
    +                                   }
    +                           });
    +           }
    +
    +           public CompletableFuture<SlotContext> getSlotContextFuture() {
    +                   return slotContextFuture;
    +           }
    +
    +           /**
    +            * Allocates a {@link MultiTaskSlot} and registers it under the 
given groupId at
    +            * this {@link MultiTaskSlot}.
    +            *
    +            * @param slotRequestId of the new multi task slot
    +            * @param groupId under which the new multi task slot is 
registered
    +            * @return the newly allocated {@link MultiTaskSlot}
    +            */
    +           MultiTaskSlot allocateMultiTaskSlot(SlotRequestId 
slotRequestId, AbstractID groupId) {
    +                   Preconditions.checkState(!super.contains(groupId));
    +
    +                   final MultiTaskSlot inner = new MultiTaskSlot(
    +                           slotRequestId,
    +                           groupId,
    +                           this);
    +
    +                   children.put(groupId, inner);
    +
    +                   // register the newly allocated slot also at the 
SlotSharingManager
    +                   allTaskSlots.put(slotRequestId, inner);
    +
    +                   return inner;
    +           }
    +
    +           /**
    +            * Allocates a {@link SingleTaskSlot} and registeres it under 
the given groupId at
    +            * this {@link MultiTaskSlot}.
    +            *
    +            * @param slotRequestId of the new single task slot
    +            * @param groupId under which the new single task slot is 
registered
    +            * @param locality of the allocation
    +            * @return the newly allocated {@link SingleTaskSlot}
    +            */
    +           SingleTaskSlot allocateSingleTaskSlot(
    +                           SlotRequestId slotRequestId,
    +                           AbstractID groupId,
    +                           Locality locality) {
    +                   Preconditions.checkState(!super.contains(groupId));
    +
    +                   final SingleTaskSlot leave = new SingleTaskSlot(
    --- End diff --
    
    nit: *leaf*


---

Reply via email to