tillrohrmann commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r481089806
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -150,152 +167,72 @@ private SharedSlot getOrAllocateSharedSlot(
return sharedSlots
.computeIfAbsent(executionSlotSharingGroup, group -> {
SlotRequestId physicalSlotRequestId = new
SlotRequestId();
+ ResourceProfile physicalSlotResourceProfile =
getPhysicalSlotResourceProfile(group);
CompletableFuture<PhysicalSlot>
physicalSlotFuture = sharedSlotProfileRetriever
- .getSlotProfileFuture(group)
+ .getSlotProfileFuture(group,
physicalSlotResourceProfile)
.thenCompose(slotProfile ->
slotProvider.allocatePhysicalSlot(
new
PhysicalSlotRequest(physicalSlotRequestId, slotProfile,
slotWillBeOccupiedIndefinitely)))
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
- return new SharedSlot(physicalSlotRequestId,
group, physicalSlotFuture);
+ return new SharedSlot(
+ physicalSlotRequestId,
+ physicalSlotResourceProfile,
+ group,
+ physicalSlotFuture,
+ slotWillBeOccupiedIndefinitely,
+ this::releaseSharedSlot);
});
}
- private class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
- private final SlotRequestId physicalSlotRequestId;
-
- private final ExecutionSlotSharingGroup
executionSlotSharingGroup;
-
- private final CompletableFuture<PhysicalSlot> slotContextFuture;
-
- private final DualKeyLinkedMap<ExecutionVertexID,
SlotRequestId, CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
-
- private SharedSlot(
- SlotRequestId physicalSlotRequestId,
- ExecutionSlotSharingGroup
executionSlotSharingGroup,
- CompletableFuture<PhysicalSlot>
slotContextFuture) {
- this.physicalSlotRequestId = physicalSlotRequestId;
- this.executionSlotSharingGroup =
executionSlotSharingGroup;
- this.slotContextFuture =
slotContextFuture.thenApply(physicalSlot -> {
- Preconditions.checkState(
- physicalSlot.tryAssignPayload(this),
- "Unexpected physical slot payload
assignment failure!");
- return physicalSlot;
- });
- this.requestedLogicalSlots = new
DualKeyLinkedMap<>(executionSlotSharingGroup.getExecutionVertexIds().size());
- }
-
- private CompletableFuture<LogicalSlot>
allocateLogicalSlot(ExecutionVertexID executionVertexId) {
-
Preconditions.checkArgument(executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId));
- CompletableFuture<SingleLogicalSlot> logicalSlotFuture
= requestedLogicalSlots.getValueByKeyA(executionVertexId);
- if (logicalSlotFuture != null) {
- LOG.debug("Request for {} already exists",
getLogicalSlotString(executionVertexId));
- } else {
- logicalSlotFuture =
allocateNonExistentLogicalSlot(executionVertexId);
- }
- return logicalSlotFuture.thenApply(Function.identity());
+ private void releaseSharedSlot(ExecutionSlotSharingGroup
executionSlotSharingGroup) {
+ SharedSlot slot = sharedSlots.remove(executionSlotSharingGroup);
Review comment:
Maybe add an assertion that the `SharedSlot` is empty (has no requested
logical slots assigned).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]