[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16281884#comment-16281884
 ] 

ASF GitHub Bot commented on FLINK-7956:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5091#discussion_r155528224
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
    @@ -0,0 +1,722 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster.slotpool;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.jobmaster.LogicalSlot;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +import org.apache.flink.runtime.jobmaster.SlotOwner;
    +import org.apache.flink.runtime.jobmaster.SlotRequestId;
    +import org.apache.flink.runtime.instance.SlotSharingGroupId;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.util.AbstractID;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.AbstractCollection;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +
    +/**
    + * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
    + * tasks in the same slot and to realize co-location constraints.
    + *
    + * <p>The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
    + * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
    + * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
    + * co-location constraint running in this slot.
    + *
    + * <p>The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
    + * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
    + * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
    + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
    + * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
    + * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
    + * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
    + * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
    + * the actual task or the co-location constraint.
    + *
    + * <p>Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
    + * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
    + * task.
    + *
    + * <p>Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
    + * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
    + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
    + * multi task slot.
    + */
    +public class SlotSharingManager {
    +
    +   private final SlotSharingGroupId slotSharingGroupId;
    +
    +   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
    +   private final AllocatedSlotActions allocatedSlotActions;
    +
    +   // owner of the slots to which to return them when they are released 
from the outside
    +   private final SlotOwner slotOwner;
    +
    +   private final Map<SlotRequestId, TaskSlot> allTaskSlots;
    +
    +   // Root nodes which have not been completed because the allocated slot 
is still pending
    +   private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
    +
    +   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
    +   private final Map<TaskManagerLocation, Set<MultiTaskSlot>> 
resolvedRootSlots;
    +
    +   // Internal class to iterate over all resolved root slots
    +   private ResolvedRootSlotValues resolvedMultiTaskSlotValues;
    +
    +   public SlotSharingManager(
    +                   SlotSharingGroupId slotSharingGroupId,
    +                   AllocatedSlotActions allocatedSlotActions,
    +                   SlotOwner slotOwner) {
    +           this.slotSharingGroupId = 
Preconditions.checkNotNull(slotSharingGroupId);
    +           this.allocatedSlotActions = 
Preconditions.checkNotNull(allocatedSlotActions);
    +           this.slotOwner = Preconditions.checkNotNull(slotOwner);
    +
    +           allTaskSlots = new HashMap<>(16);
    +           unresolvedRootSlots = new HashMap<>(16);
    +           resolvedRootSlots = new HashMap<>(16);
    +
    +           resolvedMultiTaskSlotValues = null;
    +   }
    +
    +   public boolean isEmpty() {
    +           return allTaskSlots.isEmpty();
    +   }
    +
    +   public boolean contains(SlotRequestId slotRequestId) {
    +           return allTaskSlots.containsKey(slotRequestId);
    +   }
    +
    +   @Nullable
    +   public TaskSlot getTaskSlot(SlotRequestId slotRequestId) {
    +           return allTaskSlots.get(slotRequestId);
    +   }
    +
    +   /**
    +    * Creates a new root slot with the given {@link SlotRequestId}, {@link 
SlotContext} future and
    +    * the {@link SlotRequestId} of the allocated slot.
    +    *
    +    * @param slotRequestId of the root slot
    +    * @param slotContextFuture with which we create the root slot
    +    * @param allocatedSlotRequestId slot request id of the underlying 
allocated slot which can be used
    +    *                               to cancel the pending slot request or 
release the allocated slot
    +    * @return New root slot
    +    */
    +   public MultiTaskSlot createRootSlot(
    +                   SlotRequestId slotRequestId,
    +                   CompletableFuture<SlotContext> slotContextFuture,
    +                   SlotRequestId allocatedSlotRequestId) {
    +           final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
    +                   slotRequestId,
    +                   slotContextFuture,
    +                   allocatedSlotRequestId);
    +
    +           allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
    +           unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
    +
    +           // add the root node to the set of resolved root nodes once the 
SlotContext future has been completed
    +           // and we know the slot's TaskManagerLocation
    +           slotContextFuture.whenComplete(
    +                   (SlotContext slotContext, Throwable throwable) -> {
    +                           if (slotContext != null) {
    +                                   final MultiTaskSlot resolvedRootNode = 
unresolvedRootSlots.remove(slotRequestId);
    +
    +                                   if (resolvedRootNode != null) {
    +                                           final Set<MultiTaskSlot> 
innerCollection = resolvedRootSlots.computeIfAbsent(
    --- End diff --
    
    In theory the signature of this method allows concurrent modifications on 
`resolvedRootSlots`. Maybe a comment why this cannot happen would be nice; 
perhaps with`@GuardedBy`.


> Add support for scheduling with slot sharing
> --------------------------------------------
>
>                 Key: FLINK-7956
>                 URL: https://issues.apache.org/jira/browse/FLINK-7956
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Scheduler
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>              Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to