tillrohrmann commented on a change in pull request #10555: [FLINK-15013] Fix
non local slot selection and root slot resolution
URL: https://github.com/apache/flink/pull/10555#discussion_r357677294
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
##########
@@ -143,41 +144,44 @@ MultiTaskSlot createRootSlot(
SlotRequestId slotRequestId,
CompletableFuture<? extends SlotContext>
slotContextFuture,
SlotRequestId allocatedSlotRequestId) {
+ final CompletableFuture<SlotContext>
slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();
final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
slotRequestId,
- slotContextFuture,
+ slotContextFutureAfterRootSlotResolution,
allocatedSlotRequestId);
LOG.debug("Create multi task slot [{}] in slot [{}].",
slotRequestId, allocatedSlotRequestId);
allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
-
unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
- // add the root node to the set of resolved root nodes once the
SlotContext future has
- // been completed and we know the slot's TaskManagerLocation
- slotContextFuture.whenComplete(
- (SlotContext slotContext, Throwable throwable) -> {
- if (slotContext != null) {
- final MultiTaskSlot resolvedRootNode =
unresolvedRootSlots.remove(slotRequestId);
+ FutureUtils.forward(
+ slotContextFuture.thenApply(
+ (SlotContext slotContext) -> {
+ // add the root node to the set of
resolved root nodes once the SlotContext future has
+ // been completed and we know the
slot's TaskManagerLocation
+ tryMarkSlotAsResolved(slotRequestId,
slotContext);
+ return slotContext;
+ }),
+ slotContextFutureAfterRootSlotResolution);
- if (resolvedRootNode != null) {
- final AllocationID allocationId
= slotContext.getAllocationId();
- LOG.trace("Fulfill multi task
slot [{}] with slot [{}].", slotRequestId, allocationId);
+ return rootMultiTaskSlot;
+ }
- final Map<AllocationID,
MultiTaskSlot> innerMap = resolvedRootSlots.computeIfAbsent(
-
slotContext.getTaskManagerLocation(),
- taskManagerLocation ->
new HashMap<>(4));
+ private void tryMarkSlotAsResolved(SlotRequestId slotRequestId,
SlotInfo slotInfo) {
+ final MultiTaskSlot resolvedRootNode =
unresolvedRootSlots.remove(slotRequestId);
- MultiTaskSlot previousValue =
innerMap.put(allocationId, resolvedRootNode);
-
Preconditions.checkState(previousValue == null);
- }
- } else {
- rootMultiTaskSlot.release(throwable);
Review comment:
A `MultiTaskSlot` will call `release` on itself if the `slotContextFuture`
is completed exceptionally:
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java#L417.
Hence, I believe that we don't need this error handling here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services