This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b4155ecd2185b82cff713d4382f5245d661ec353
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Tue Dec 8 19:20:50 2020 +0300

    [FLINK-19832][coordinator] Do not use the allocator sharedSlots to create a 
bulk in SlotSharingExecutionSlotAllocator
    
    We need to pass the slots map to the createBulk method instead of using the 
allocator's 'sharedSlots'
    because if any physical slots have already failed, their shared slots have 
been removed
    from the allocator's 'sharedSlots' by failed logical slots.
    
    This closes #13879.
---
 .../apache/flink/runtime/scheduler/SharedSlot.java |  5 +++-
 .../SlotSharingExecutionSlotAllocator.java         | 33 ++++++++++++++--------
 2 files changed, 26 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
index 6123031..e35e0da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
@@ -110,6 +110,10 @@ class SharedSlot implements SlotOwner, 
PhysicalSlot.Payload {
                return physicalSlotResourceProfile;
        }
 
+       public ExecutionSlotSharingGroup getExecutionSlotSharingGroup() {
+               return executionSlotSharingGroup;
+       }
+
        CompletableFuture<PhysicalSlot> getSlotContextFuture() {
                return slotContextFuture;
        }
@@ -123,7 +127,6 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload 
{
         * @return the logical slot future
         */
        CompletableFuture<LogicalSlot> allocateLogicalSlot(ExecutionVertexID 
executionVertexId) {
-               Preconditions.checkState(state == State.ALLOCATED, "SharedSlot 
(physical request %s) has been released", physicalSlotRequestId);
                Preconditions.checkArgument(
                        
executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId),
                        "Trying to allocate a logical slot for execution %s 
which is not in the ExecutionSlotSharingGroup",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
index 921e86d..3bf619c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
@@ -123,10 +123,19 @@ class SlotSharingExecutionSlotAllocator implements 
ExecutionSlotAllocator {
                Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executionsByGroup = executionVertexIds
                        .stream()
                        
.collect(Collectors.groupingBy(slotSharingStrategy::getExecutionSlotSharingGroup));
+               Map<ExecutionSlotSharingGroup, SharedSlot> slots = 
executionsByGroup
+                       .keySet()
+                       .stream()
+                       .map(group -> getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever))
+                       
.collect(Collectors.toMap(SharedSlot::getExecutionSlotSharingGroup, 
Function.identity()));
                Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
assignments =
-                       
allocateLogicalSlotsFromSharedSlots(sharedSlotProfileRetriever, 
executionsByGroup);
+                       allocateLogicalSlotsFromSharedSlots(slots, 
executionsByGroup);
 
-               
bulkChecker.schedulePendingRequestBulkTimeoutCheck(createBulk(executionsByGroup),
 allocationTimeout);
+               // we need to pass the slots map to the createBulk method 
instead of using the allocator's 'sharedSlots'
+               // because if any physical slots have already failed, their 
shared slots have been removed
+               // from the allocator's 'sharedSlots' by failed logical slots.
+               SharingPhysicalSlotRequestBulk bulk = createBulk(slots, 
executionsByGroup);
+               bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, 
allocationTimeout);
 
                return 
executionVertexIds.stream().map(assignments::get).collect(Collectors.toList());
        }
@@ -150,8 +159,8 @@ class SlotSharingExecutionSlotAllocator implements 
ExecutionSlotAllocator {
                }
        }
 
-       private Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
allocateLogicalSlotsFromSharedSlots(
-                       SharedSlotProfileRetriever sharedSlotProfileRetriever,
+       private static Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
allocateLogicalSlotsFromSharedSlots(
+                       Map<ExecutionSlotSharingGroup, SharedSlot> slots,
                        Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executionsByGroup) {
 
                Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
assignments = new HashMap<>();
@@ -159,10 +168,9 @@ class SlotSharingExecutionSlotAllocator implements 
ExecutionSlotAllocator {
                for (Map.Entry<ExecutionSlotSharingGroup, 
List<ExecutionVertexID>> entry : executionsByGroup.entrySet()) {
                        ExecutionSlotSharingGroup group = entry.getKey();
                        List<ExecutionVertexID> executionIds = entry.getValue();
-                       SharedSlot sharedSlot = getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever);
 
                        for (ExecutionVertexID executionId : executionIds) {
-                               CompletableFuture<LogicalSlot> 
logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
+                               CompletableFuture<LogicalSlot> 
logicalSlotFuture = slots.get(group).allocateLogicalSlot(executionId);
                                SlotExecutionVertexAssignment assignment = new 
SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
                                assignments.put(executionId, assignment);
                        }
@@ -213,27 +221,30 @@ class SlotSharingExecutionSlotAllocator implements 
ExecutionSlotAllocator {
                        .reduce(ResourceProfile.ZERO, (r, e) -> 
r.merge(resourceProfileRetriever.apply(e)), ResourceProfile::merge);
        }
 
-       private SharingPhysicalSlotRequestBulk 
createBulk(Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
+       private SharingPhysicalSlotRequestBulk createBulk(
+                       Map<ExecutionSlotSharingGroup, SharedSlot> slots,
+                       Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executions) {
                Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests 
= executions
                        .keySet()
                        .stream()
                        .collect(Collectors.toMap(
                                group -> group,
-                               group -> 
sharedSlots.get(group).getPhysicalSlotResourceProfile()
+                               group -> 
slots.get(group).getPhysicalSlotResourceProfile()
                        ));
                SharingPhysicalSlotRequestBulk bulk = new 
SharingPhysicalSlotRequestBulk(
                        executions,
                        pendingRequests,
                        this::cancelLogicalSlotRequest);
-               registerPhysicalSlotRequestBulkCallbacks(executions.keySet(), 
bulk);
+               registerPhysicalSlotRequestBulkCallbacks(slots, 
executions.keySet(), bulk);
                return bulk;
        }
 
-       private void registerPhysicalSlotRequestBulkCallbacks(
+       private static void registerPhysicalSlotRequestBulkCallbacks(
+                       Map<ExecutionSlotSharingGroup, SharedSlot> slots,
                        Iterable<ExecutionSlotSharingGroup> executions,
                        SharingPhysicalSlotRequestBulk bulk) {
                for (ExecutionSlotSharingGroup group : executions) {
-                       CompletableFuture<PhysicalSlot> slotContextFuture = 
sharedSlots.get(group).getSlotContextFuture();
+                       CompletableFuture<PhysicalSlot> slotContextFuture = 
slots.get(group).getSlotContextFuture();
                        slotContextFuture.thenAccept(physicalSlot -> 
bulk.markFulfilled(group, physicalSlot.getAllocationId()));
                        slotContextFuture.exceptionally(t -> {
                                // clear the bulk to stop the fulfillability 
check

Reply via email to