xintongsong commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r297472482
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ##########
 @@ -347,20 +365,65 @@ private MultiTaskSlot(
                                CompletableFuture<? extends SlotContext> 
slotContextFuture,
                                @Nullable SlotRequestId allocatedSlotRequestId) 
{
                        super(slotRequestId, groupId);
+                       Preconditions.checkNotNull(slotContextFuture);
 
                        this.parent = parent;
-                       this.slotContextFuture = 
Preconditions.checkNotNull(slotContextFuture);
                        this.allocatedSlotRequestId = allocatedSlotRequestId;
 
                        this.children = new HashMap<>(16);
                        this.releasingChildren = false;
 
-                       slotContextFuture.whenComplete(
-                               (SlotContext ignored, Throwable throwable) -> {
-                                       if (throwable != null) {
-                                               release(throwable);
+                       this.requestedResources = ResourceProfile.EMPTY;
+
+                       this.slotContextFuture = 
slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> {
+                               if (throwable != null) {
+                                       // If the underlying resource request 
fail, currently we fails all the requests to
+                                       // simplify the logic.
+                                       release(throwable);
+                                       throw new 
CompletionException(throwable);
+                               }
+
+                               if (parent == null) {
+                                       ResourceProfile allocated = 
ResourceProfile.EMPTY;
+                                       List<TaskSlot> childrenToEvict = new 
ArrayList<>();
+
+                                       for (TaskSlot slot : children.values()) 
{
+                                               ResourceProfile 
allocatedIfInclude = allocated.merge(slot.getRequestedResources());
+
+                                               if 
(slotContext.getResourceProfile().isMatching(allocatedIfInclude)) {
+                                                       allocated = 
allocatedIfInclude;
+                                               } else {
+                                                       
childrenToEvict.add(slot);
+                                               }
+                                       }
+
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Not all requests are 
fulfilled due to over-allocated, number of requests is {}, " +
+                                                                               
"number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, 
" +
+                                                                               
"evicted requests is {},",
+                                                               children.size(),
+                                                               
childrenToEvict.size(),
+                                                               
slotContext.getResourceProfile(),
+                                                               allocated,
+                                                               
childrenToEvict);
                                        }
-                               });
+
+                                       if (childrenToEvict.size() == 
children.size()) {
+                                               // This only happens when we 
request to RM using the resource profile of a task
+                                               // who is belonging to a 
CoLocationGroup. Similar to dealing with the fail of
 
 Review comment:
   It's not clear to me why this only happens for CoLocationGroup.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to