gaoyunhaii commented on a change in pull request #8841:
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r298434963
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
##########
@@ -347,20 +365,65 @@ private MultiTaskSlot(
CompletableFuture<? extends SlotContext>
slotContextFuture,
@Nullable SlotRequestId allocatedSlotRequestId)
{
super(slotRequestId, groupId);
+ Preconditions.checkNotNull(slotContextFuture);
this.parent = parent;
- this.slotContextFuture =
Preconditions.checkNotNull(slotContextFuture);
this.allocatedSlotRequestId = allocatedSlotRequestId;
this.children = new HashMap<>(16);
this.releasingChildren = false;
- slotContextFuture.whenComplete(
- (SlotContext ignored, Throwable throwable) -> {
- if (throwable != null) {
- release(throwable);
+ this.requestedResources = ResourceProfile.EMPTY;
+
+ this.slotContextFuture =
slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> {
+ if (throwable != null) {
+ // If the underlying resource request
fail, currently we fails all the requests to
+ // simplify the logic.
+ release(throwable);
+ throw new
CompletionException(throwable);
+ }
+
+ if (parent == null) {
+ ResourceProfile allocated =
ResourceProfile.EMPTY;
+ List<TaskSlot> childrenToEvict = new
ArrayList<>();
+
+ for (TaskSlot slot : children.values())
{
+ ResourceProfile
allocatedIfInclude = allocated.merge(slot.getRequestedResources());
+
+ if
(slotContext.getResourceProfile().isMatching(allocatedIfInclude)) {
+ allocated =
allocatedIfInclude;
+ } else {
+
childrenToEvict.add(slot);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not all requests are
fulfilled due to over-allocated, number of requests is {}, " +
+
"number of evicted requests is {}, underlying allocated is {}, fulfilled is {},
" +
+
"evicted requests is {},",
+ children.size(),
+
childrenToEvict.size(),
+
slotContext.getResourceProfile(),
+ allocated,
+
childrenToEvict);
}
- });
+
+ if (childrenToEvict.size() ==
children.size()) {
+ // This only happens when we
request to RM using the resource profile of a task
+ // who is belonging to a
CoLocationGroup. Similar to dealing with the fail of
Review comment:
This is because RM always returns a slot whose resource is larger than the
requested one, without co-location, there should be at least the one who
triggers the request to RM get fulfilled.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services