[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290606#comment-16290606
 ] 

ASF GitHub Bot commented on FLINK-7956:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5091#discussion_r156891235
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
    @@ -266,104 +279,367 @@ public void disconnectResourceManager() {
        // 
------------------------------------------------------------------------
     
        @Override
    -   public CompletableFuture<SimpleSlot> allocateSlot(
    -                   SlotRequestID requestId,
    -                   ScheduledUnit task,
    -                   ResourceProfile resources,
    -                   Iterable<TaskManagerLocation> locationPreferences,
    +   public CompletableFuture<LogicalSlot> allocateSlot(
    +                   SlotRequestId slotRequestId,
    +                   ScheduledUnit scheduledUnit,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling,
                        Time timeout) {
     
    -           return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
    +           return internalAllocateSlot(
    +                   slotRequestId,
    +                   scheduledUnit,
    +                   resourceProfile,
    +                   locationPreferences,
    +                   allowQueuedScheduling);
        }
     
    -   @Override
    -   public void returnAllocatedSlot(Slot slot) {
    -           internalReturnAllocatedSlot(slot);
    +   private CompletableFuture<LogicalSlot> internalAllocateSlot(
    +                   SlotRequestId slotRequestId,
    +                   ScheduledUnit task,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling) {
    +
    +           final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
    +
    +           if (slotSharingGroupId != null) {
    +                   // allocate slot with slot sharing
    +                   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
    +                           slotSharingGroupId,
    +                           id -> new SlotSharingManager(
    +                                   id,
    +                                   this,
    +                                   providerAndOwner));
    +
    +                   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
    +
    +                   try {
    +                           if (task.getCoLocationConstraint() != null) {
    +                                   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
    +                                           task.getCoLocationConstraint(),
    +                                           multiTaskSlotManager,
    +                                           resourceProfile,
    +                                           locationPreferences,
    +                                           allowQueuedScheduling);
    +                           } else {
    +                                   multiTaskSlotFuture = 
allocateMultiTaskSlot(
    +                                           task.getJobVertexId(), 
multiTaskSlotManager,
    +                                           resourceProfile,
    +                                           locationPreferences,
    +                                           allowQueuedScheduling);
    +                           }
    +                   } catch (NoResourceAvailableException 
noResourceException) {
    +                           return 
FutureUtils.completedExceptionally(noResourceException);
    +                   }
    +
    +                   // sanity check
    +                   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
    +
    +                   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
    --- End diff --
    
    jup, we wouldn't want the single task slot to leave after all.


> Add support for scheduling with slot sharing
> --------------------------------------------
>
>                 Key: FLINK-7956
>                 URL: https://issues.apache.org/jira/browse/FLINK-7956
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Scheduler
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>              Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to