tillrohrmann commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r481089806



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -150,152 +167,72 @@ private SharedSlot getOrAllocateSharedSlot(
                return sharedSlots
                        .computeIfAbsent(executionSlotSharingGroup, group -> {
                                SlotRequestId physicalSlotRequestId = new 
SlotRequestId();
+                               ResourceProfile physicalSlotResourceProfile = 
getPhysicalSlotResourceProfile(group);
                                CompletableFuture<PhysicalSlot> 
physicalSlotFuture = sharedSlotProfileRetriever
-                                       .getSlotProfileFuture(group)
+                                       .getSlotProfileFuture(group, 
physicalSlotResourceProfile)
                                        .thenCompose(slotProfile -> 
slotProvider.allocatePhysicalSlot(
                                                new 
PhysicalSlotRequest(physicalSlotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely)))
                                        
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-                               return new SharedSlot(physicalSlotRequestId, 
group, physicalSlotFuture);
+                               return new SharedSlot(
+                                       physicalSlotRequestId,
+                                       physicalSlotResourceProfile,
+                                       group,
+                                       physicalSlotFuture,
+                                       slotWillBeOccupiedIndefinitely,
+                                       this::releaseSharedSlot);
                        });
        }
 
-       private class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
-               private final SlotRequestId physicalSlotRequestId;
-
-               private final ExecutionSlotSharingGroup 
executionSlotSharingGroup;
-
-               private final CompletableFuture<PhysicalSlot> slotContextFuture;
-
-               private final DualKeyLinkedMap<ExecutionVertexID, 
SlotRequestId, CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
-
-               private SharedSlot(
-                               SlotRequestId physicalSlotRequestId,
-                               ExecutionSlotSharingGroup 
executionSlotSharingGroup,
-                               CompletableFuture<PhysicalSlot> 
slotContextFuture) {
-                       this.physicalSlotRequestId = physicalSlotRequestId;
-                       this.executionSlotSharingGroup = 
executionSlotSharingGroup;
-                       this.slotContextFuture = 
slotContextFuture.thenApply(physicalSlot -> {
-                               Preconditions.checkState(
-                                       physicalSlot.tryAssignPayload(this),
-                                       "Unexpected physical slot payload 
assignment failure!");
-                               return physicalSlot;
-                       });
-                       this.requestedLogicalSlots = new 
DualKeyLinkedMap<>(executionSlotSharingGroup.getExecutionVertexIds().size());
-               }
-
-               private CompletableFuture<LogicalSlot> 
allocateLogicalSlot(ExecutionVertexID executionVertexId) {
-                       
Preconditions.checkArgument(executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId));
-                       CompletableFuture<SingleLogicalSlot> logicalSlotFuture 
= requestedLogicalSlots.getValueByKeyA(executionVertexId);
-                       if (logicalSlotFuture != null) {
-                               LOG.debug("Request for {} already exists", 
getLogicalSlotString(executionVertexId));
-                       } else {
-                               logicalSlotFuture = 
allocateNonExistentLogicalSlot(executionVertexId);
-                       }
-                       return logicalSlotFuture.thenApply(Function.identity());
+       private void releaseSharedSlot(ExecutionSlotSharingGroup 
executionSlotSharingGroup) {
+               SharedSlot slot = sharedSlots.remove(executionSlotSharingGroup);

Review comment:
       Maybe add an assertion that the `SharedSlot` is empty (has no requested 
logical slots assigned).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to