Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5091#discussion_r156891235
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
    @@ -266,104 +279,367 @@ public void disconnectResourceManager() {
        // 
------------------------------------------------------------------------
     
        @Override
    -   public CompletableFuture<SimpleSlot> allocateSlot(
    -                   SlotRequestID requestId,
    -                   ScheduledUnit task,
    -                   ResourceProfile resources,
    -                   Iterable<TaskManagerLocation> locationPreferences,
    +   public CompletableFuture<LogicalSlot> allocateSlot(
    +                   SlotRequestId slotRequestId,
    +                   ScheduledUnit scheduledUnit,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling,
                        Time timeout) {
     
    -           return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
    +           return internalAllocateSlot(
    +                   slotRequestId,
    +                   scheduledUnit,
    +                   resourceProfile,
    +                   locationPreferences,
    +                   allowQueuedScheduling);
        }
     
    -   @Override
    -   public void returnAllocatedSlot(Slot slot) {
    -           internalReturnAllocatedSlot(slot);
    +   private CompletableFuture<LogicalSlot> internalAllocateSlot(
    +                   SlotRequestId slotRequestId,
    +                   ScheduledUnit task,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling) {
    +
    +           final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
    +
    +           if (slotSharingGroupId != null) {
    +                   // allocate slot with slot sharing
    +                   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
    +                           slotSharingGroupId,
    +                           id -> new SlotSharingManager(
    +                                   id,
    +                                   this,
    +                                   providerAndOwner));
    +
    +                   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
    +
    +                   try {
    +                           if (task.getCoLocationConstraint() != null) {
    +                                   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
    +                                           task.getCoLocationConstraint(),
    +                                           multiTaskSlotManager,
    +                                           resourceProfile,
    +                                           locationPreferences,
    +                                           allowQueuedScheduling);
    +                           } else {
    +                                   multiTaskSlotFuture = 
allocateMultiTaskSlot(
    +                                           task.getJobVertexId(), 
multiTaskSlotManager,
    +                                           resourceProfile,
    +                                           locationPreferences,
    +                                           allowQueuedScheduling);
    +                           }
    +                   } catch (NoResourceAvailableException 
noResourceException) {
    +                           return 
FutureUtils.completedExceptionally(noResourceException);
    +                   }
    +
    +                   // sanity check
    +                   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
    +
    +                   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
    --- End diff --
    
    jup, we wouldn't want the single task slot to leave after all.


---

Reply via email to