tillrohrmann commented on a change in pull request #13879:
URL: https://github.com/apache/flink/pull/13879#discussion_r536139471



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID 
executionVertexId, Throw
                        ExecutionSlotSharingGroup group = entry.getKey();
                        List<ExecutionVertexID> executionIds = entry.getValue();
                        SharedSlot sharedSlot = getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever);
-
-                       for (ExecutionVertexID executionId : executionIds) {
-                               CompletableFuture<LogicalSlot> 
logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
-                               SlotExecutionVertexAssignment assignment = new 
SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
-                               assignments.put(executionId, assignment);
-                       }
+                       allocateLogicalSlotsFromSharedSlot(assignments, 
executionIds, sharedSlot);
                }
 
                return assignments;
        }
 
        private SharedSlot getOrAllocateSharedSlot(
-                       ExecutionSlotSharingGroup executionSlotSharingGroup,
+                       ExecutionSlotSharingGroup slotSharingGroup,
                        SharedSlotProfileRetriever sharedSlotProfileRetriever) {
-               return sharedSlots
-                       .computeIfAbsent(executionSlotSharingGroup, group -> {
-                               SlotRequestId physicalSlotRequestId = new 
SlotRequestId();
-                               ResourceProfile physicalSlotResourceProfile = 
getPhysicalSlotResourceProfile(group);
-                               SlotProfile slotProfile = 
sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
-                               PhysicalSlotRequest physicalSlotRequest =
-                                       new 
PhysicalSlotRequest(physicalSlotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
-                               CompletableFuture<PhysicalSlot> 
physicalSlotFuture = slotProvider
+               SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+               if (sharedSlot == null) {
+                       SlotRequestId physicalSlotRequestId = new 
SlotRequestId();
+                       ResourceProfile physicalSlotResourceProfile =
+                                       
getPhysicalSlotResourceProfile(slotSharingGroup);
+                       SlotProfile slotProfile = sharedSlotProfileRetriever
+                                       .getSlotProfile(slotSharingGroup, 
physicalSlotResourceProfile);
+                       PhysicalSlotRequest physicalSlotRequest = new 
PhysicalSlotRequest(
+                                       physicalSlotRequestId,
+                                       slotProfile,
+                                       slotWillBeOccupiedIndefinitely);
+                       CompletableFuture<PhysicalSlot> physicalSlotFuture = 
slotProvider
                                        
.allocatePhysicalSlot(physicalSlotRequest)
                                        
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-                               return new SharedSlot(
+                       sharedSlot = new SharedSlot(
                                        physicalSlotRequestId,
                                        physicalSlotResourceProfile,
-                                       group,
+                                       slotSharingGroup,
                                        physicalSlotFuture,
                                        slotWillBeOccupiedIndefinitely,
                                        this::releaseSharedSlot);
-                       });
+                       if (!physicalSlotFuture.isCompletedExceptionally()) {
+                               sharedSlots.put(slotSharingGroup, sharedSlot);
+                       }
+               }
+               return sharedSlot;
+       }
+
+       private static void allocateLogicalSlotsFromSharedSlot(
+                       Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
assignments,
+                       Iterable<ExecutionVertexID> executionIds,
+                       SharedSlot sharedSlot) {
+               for (ExecutionVertexID executionId : executionIds) {
+                       boolean physicalSlotHasAlreadyFailed = sharedSlot
+                                       .getSlotContextFuture()
+                                       .isCompletedExceptionally();

Review comment:
       we should be able to move this out of the for loop

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID 
executionVertexId, Throw
                        ExecutionSlotSharingGroup group = entry.getKey();
                        List<ExecutionVertexID> executionIds = entry.getValue();
                        SharedSlot sharedSlot = getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever);
-
-                       for (ExecutionVertexID executionId : executionIds) {
-                               CompletableFuture<LogicalSlot> 
logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
-                               SlotExecutionVertexAssignment assignment = new 
SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
-                               assignments.put(executionId, assignment);
-                       }
+                       allocateLogicalSlotsFromSharedSlot(assignments, 
executionIds, sharedSlot);
                }
 
                return assignments;
        }
 
        private SharedSlot getOrAllocateSharedSlot(
-                       ExecutionSlotSharingGroup executionSlotSharingGroup,
+                       ExecutionSlotSharingGroup slotSharingGroup,
                        SharedSlotProfileRetriever sharedSlotProfileRetriever) {
-               return sharedSlots
-                       .computeIfAbsent(executionSlotSharingGroup, group -> {
-                               SlotRequestId physicalSlotRequestId = new 
SlotRequestId();
-                               ResourceProfile physicalSlotResourceProfile = 
getPhysicalSlotResourceProfile(group);
-                               SlotProfile slotProfile = 
sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
-                               PhysicalSlotRequest physicalSlotRequest =
-                                       new 
PhysicalSlotRequest(physicalSlotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
-                               CompletableFuture<PhysicalSlot> 
physicalSlotFuture = slotProvider
+               SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+               if (sharedSlot == null) {
+                       SlotRequestId physicalSlotRequestId = new 
SlotRequestId();
+                       ResourceProfile physicalSlotResourceProfile =
+                                       
getPhysicalSlotResourceProfile(slotSharingGroup);
+                       SlotProfile slotProfile = sharedSlotProfileRetriever
+                                       .getSlotProfile(slotSharingGroup, 
physicalSlotResourceProfile);
+                       PhysicalSlotRequest physicalSlotRequest = new 
PhysicalSlotRequest(
+                                       physicalSlotRequestId,
+                                       slotProfile,
+                                       slotWillBeOccupiedIndefinitely);
+                       CompletableFuture<PhysicalSlot> physicalSlotFuture = 
slotProvider
                                        
.allocatePhysicalSlot(physicalSlotRequest)
                                        
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-                               return new SharedSlot(
+                       sharedSlot = new SharedSlot(
                                        physicalSlotRequestId,
                                        physicalSlotResourceProfile,
-                                       group,
+                                       slotSharingGroup,
                                        physicalSlotFuture,
                                        slotWillBeOccupiedIndefinitely,
                                        this::releaseSharedSlot);
-                       });
+                       if (!physicalSlotFuture.isCompletedExceptionally()) {
+                               sharedSlots.put(slotSharingGroup, sharedSlot);
+                       }
+               }
+               return sharedSlot;
+       }
+
+       private static void allocateLogicalSlotsFromSharedSlot(
+                       Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
assignments,
+                       Iterable<ExecutionVertexID> executionIds,
+                       SharedSlot sharedSlot) {
+               for (ExecutionVertexID executionId : executionIds) {
+                       boolean physicalSlotHasAlreadyFailed = sharedSlot
+                                       .getSlotContextFuture()
+                                       .isCompletedExceptionally();
+                       CompletableFuture<LogicalSlot> logicalSlotFuture = 
physicalSlotHasAlreadyFailed ?
+                                       
sharedSlot.getSlotContextFuture().thenApply(stub -> null) :
+                                       
sharedSlot.allocateLogicalSlot(executionId);
+                       SlotExecutionVertexAssignment assignment =
+                                       new 
SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
+                       assignments.put(executionId, assignment);

Review comment:
       I am not a huge fan of returning values through a parameter. I think 
returning the assignments can also work here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -213,20 +235,24 @@ private ResourceProfile 
getPhysicalSlotResourceProfile(ExecutionSlotSharingGroup
                        .reduce(ResourceProfile.ZERO, (r, e) -> 
r.merge(resourceProfileRetriever.apply(e)), ResourceProfile::merge);
        }
 
-       private SharingPhysicalSlotRequestBulk 
createBulk(Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
-               Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests 
= executions
-                       .keySet()
-                       .stream()
-                       .collect(Collectors.toMap(
-                               group -> group,
-                               group -> 
sharedSlots.get(group).getPhysicalSlotResourceProfile()
-                       ));
+       private Optional<SharingPhysicalSlotRequestBulk> createBulk(
+                       Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executions) {
+               Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests 
= new HashMap<>();
+               for (ExecutionSlotSharingGroup group : executions.keySet()) {
+                       SharedSlot sharedSlot = sharedSlots.get(group);
+                       if (sharedSlot == null || 
sharedSlot.getSlotContextFuture().isCompletedExceptionally()) {
+                               // there is no shared slot for this group or 
its physical slot has already failed
+                               // hence there is no point to track the whole 
bulk
+                               return Optional.empty();
+                       }

Review comment:
       Here it is a bit the same with the special case handling. What exactly 
makes the cases 1) directly failed physical slot future and 2) failing just 
after this method completes different?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID 
executionVertexId, Throw
                        ExecutionSlotSharingGroup group = entry.getKey();
                        List<ExecutionVertexID> executionIds = entry.getValue();
                        SharedSlot sharedSlot = getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever);
-
-                       for (ExecutionVertexID executionId : executionIds) {
-                               CompletableFuture<LogicalSlot> 
logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
-                               SlotExecutionVertexAssignment assignment = new 
SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
-                               assignments.put(executionId, assignment);
-                       }
+                       allocateLogicalSlotsFromSharedSlot(assignments, 
executionIds, sharedSlot);
                }
 
                return assignments;
        }
 
        private SharedSlot getOrAllocateSharedSlot(
-                       ExecutionSlotSharingGroup executionSlotSharingGroup,
+                       ExecutionSlotSharingGroup slotSharingGroup,
                        SharedSlotProfileRetriever sharedSlotProfileRetriever) {
-               return sharedSlots
-                       .computeIfAbsent(executionSlotSharingGroup, group -> {
-                               SlotRequestId physicalSlotRequestId = new 
SlotRequestId();
-                               ResourceProfile physicalSlotResourceProfile = 
getPhysicalSlotResourceProfile(group);
-                               SlotProfile slotProfile = 
sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
-                               PhysicalSlotRequest physicalSlotRequest =
-                                       new 
PhysicalSlotRequest(physicalSlotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
-                               CompletableFuture<PhysicalSlot> 
physicalSlotFuture = slotProvider
+               SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+               if (sharedSlot == null) {
+                       SlotRequestId physicalSlotRequestId = new 
SlotRequestId();
+                       ResourceProfile physicalSlotResourceProfile =
+                                       
getPhysicalSlotResourceProfile(slotSharingGroup);
+                       SlotProfile slotProfile = sharedSlotProfileRetriever
+                                       .getSlotProfile(slotSharingGroup, 
physicalSlotResourceProfile);
+                       PhysicalSlotRequest physicalSlotRequest = new 
PhysicalSlotRequest(
+                                       physicalSlotRequestId,
+                                       slotProfile,
+                                       slotWillBeOccupiedIndefinitely);
+                       CompletableFuture<PhysicalSlot> physicalSlotFuture = 
slotProvider
                                        
.allocatePhysicalSlot(physicalSlotRequest)
                                        
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-                               return new SharedSlot(
+                       sharedSlot = new SharedSlot(
                                        physicalSlotRequestId,
                                        physicalSlotResourceProfile,
-                                       group,
+                                       slotSharingGroup,
                                        physicalSlotFuture,
                                        slotWillBeOccupiedIndefinitely,
                                        this::releaseSharedSlot);
-                       });
+                       if (!physicalSlotFuture.isCompletedExceptionally()) {
+                               sharedSlots.put(slotSharingGroup, sharedSlot);
+                       }

Review comment:
       Hmm, this looks a bit like special case handling. I guess it would be 
bit nicer if the case that it is completed exceptionally now and at a later 
point would be handled the same way. What exactly is different from a 
`physicalSlotFuture` being completed exceptionally right away and the 
`phyiscalSlotFuture` being completed exceptionally just after the `SharedSlot` 
has been created?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID 
executionVertexId, Throw
                        ExecutionSlotSharingGroup group = entry.getKey();
                        List<ExecutionVertexID> executionIds = entry.getValue();
                        SharedSlot sharedSlot = getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever);
-
-                       for (ExecutionVertexID executionId : executionIds) {
-                               CompletableFuture<LogicalSlot> 
logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
-                               SlotExecutionVertexAssignment assignment = new 
SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
-                               assignments.put(executionId, assignment);
-                       }
+                       allocateLogicalSlotsFromSharedSlot(assignments, 
executionIds, sharedSlot);
                }
 
                return assignments;
        }
 
        private SharedSlot getOrAllocateSharedSlot(
-                       ExecutionSlotSharingGroup executionSlotSharingGroup,
+                       ExecutionSlotSharingGroup slotSharingGroup,
                        SharedSlotProfileRetriever sharedSlotProfileRetriever) {
-               return sharedSlots
-                       .computeIfAbsent(executionSlotSharingGroup, group -> {
-                               SlotRequestId physicalSlotRequestId = new 
SlotRequestId();
-                               ResourceProfile physicalSlotResourceProfile = 
getPhysicalSlotResourceProfile(group);
-                               SlotProfile slotProfile = 
sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
-                               PhysicalSlotRequest physicalSlotRequest =
-                                       new 
PhysicalSlotRequest(physicalSlotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
-                               CompletableFuture<PhysicalSlot> 
physicalSlotFuture = slotProvider
+               SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+               if (sharedSlot == null) {
+                       SlotRequestId physicalSlotRequestId = new 
SlotRequestId();
+                       ResourceProfile physicalSlotResourceProfile =
+                                       
getPhysicalSlotResourceProfile(slotSharingGroup);
+                       SlotProfile slotProfile = sharedSlotProfileRetriever
+                                       .getSlotProfile(slotSharingGroup, 
physicalSlotResourceProfile);
+                       PhysicalSlotRequest physicalSlotRequest = new 
PhysicalSlotRequest(
+                                       physicalSlotRequestId,
+                                       slotProfile,
+                                       slotWillBeOccupiedIndefinitely);
+                       CompletableFuture<PhysicalSlot> physicalSlotFuture = 
slotProvider
                                        
.allocatePhysicalSlot(physicalSlotRequest)
                                        
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-                               return new SharedSlot(
+                       sharedSlot = new SharedSlot(
                                        physicalSlotRequestId,
                                        physicalSlotResourceProfile,
-                                       group,
+                                       slotSharingGroup,
                                        physicalSlotFuture,
                                        slotWillBeOccupiedIndefinitely,
                                        this::releaseSharedSlot);
-                       });
+                       if (!physicalSlotFuture.isCompletedExceptionally()) {
+                               sharedSlots.put(slotSharingGroup, sharedSlot);
+                       }
+               }
+               return sharedSlot;
+       }
+
+       private static void allocateLogicalSlotsFromSharedSlot(
+                       Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
assignments,
+                       Iterable<ExecutionVertexID> executionIds,
+                       SharedSlot sharedSlot) {
+               for (ExecutionVertexID executionId : executionIds) {
+                       boolean physicalSlotHasAlreadyFailed = sharedSlot
+                                       .getSlotContextFuture()
+                                       .isCompletedExceptionally();
+                       CompletableFuture<LogicalSlot> logicalSlotFuture = 
physicalSlotHasAlreadyFailed ?
+                                       
sharedSlot.getSlotContextFuture().thenApply(stub -> null) :
+                                       
sharedSlot.allocateLogicalSlot(executionId);

Review comment:
       can't `SharedSlot.allocateLogicalSlot` return a failed future if the 
internal future has already failed? That way, we wouldn't need the special 
casing here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to