tillrohrmann commented on a change in pull request #10555: [FLINK-15013] Fix
non local slot selection and root slot resolution
URL: https://github.com/apache/flink/pull/10555#discussion_r357673281
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
##########
@@ -143,41 +144,44 @@ MultiTaskSlot createRootSlot(
SlotRequestId slotRequestId,
CompletableFuture<? extends SlotContext>
slotContextFuture,
SlotRequestId allocatedSlotRequestId) {
+ final CompletableFuture<SlotContext>
slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();
final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
slotRequestId,
- slotContextFuture,
+ slotContextFutureAfterRootSlotResolution,
allocatedSlotRequestId);
LOG.debug("Create multi task slot [{}] in slot [{}].",
slotRequestId, allocatedSlotRequestId);
allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
-
unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
- // add the root node to the set of resolved root nodes once the
SlotContext future has
- // been completed and we know the slot's TaskManagerLocation
- slotContextFuture.whenComplete(
- (SlotContext slotContext, Throwable throwable) -> {
- if (slotContext != null) {
- final MultiTaskSlot resolvedRootNode =
unresolvedRootSlots.remove(slotRequestId);
+ FutureUtils.forward(
+ slotContextFuture.thenApply(
+ (SlotContext slotContext) -> {
+ // add the root node to the set of
resolved root nodes once the SlotContext future has
+ // been completed and we know the
slot's TaskManagerLocation
+ tryMarkSlotAsResolved(slotRequestId,
slotContext);
+ return slotContext;
+ }),
+ slotContextFutureAfterRootSlotResolution);
- if (resolvedRootNode != null) {
- final AllocationID allocationId
= slotContext.getAllocationId();
- LOG.trace("Fulfill multi task
slot [{}] with slot [{}].", slotRequestId, allocationId);
+ return rootMultiTaskSlot;
+ }
- final Map<AllocationID,
MultiTaskSlot> innerMap = resolvedRootSlots.computeIfAbsent(
-
slotContext.getTaskManagerLocation(),
- taskManagerLocation ->
new HashMap<>(4));
+ private void tryMarkSlotAsResolved(SlotRequestId slotRequestId,
SlotInfo slotInfo) {
+ final MultiTaskSlot resolvedRootNode =
unresolvedRootSlots.remove(slotRequestId);
- MultiTaskSlot previousValue =
innerMap.put(allocationId, resolvedRootNode);
-
Preconditions.checkState(previousValue == null);
- }
- } else {
- rootMultiTaskSlot.release(throwable);
- }
- });
+ if (resolvedRootNode != null) {
+ final AllocationID allocationId =
slotInfo.getAllocationId();
+ LOG.trace("Fulfill multi task slot [{}] with slot
[{}].", slotRequestId, allocationId);
- return rootMultiTaskSlot;
+ final Map<AllocationID, MultiTaskSlot> innerMap =
resolvedRootSlots.computeIfAbsent(
+ slotInfo.getTaskManagerLocation(),
+ taskManagerLocation -> new HashMap<>(4));
Review comment:
I think there is no benefit and according to the coding guidelines this
should be avoided. I will add another commit to remove it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services