This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b4155ecd2185b82cff713d4382f5245d661ec353 Author: Andrey Zagrebin <[email protected]> AuthorDate: Tue Dec 8 19:20:50 2020 +0300 [FLINK-19832][coordinator] Do not use the allocator sharedSlots to create a bulk in SlotSharingExecutionSlotAllocator We need to pass the slots map to the createBulk method instead of using the allocator's 'sharedSlots' because if any physical slots have already failed, their shared slots have been removed from the allocator's 'sharedSlots' by failed logical slots. This closes #13879. --- .../apache/flink/runtime/scheduler/SharedSlot.java | 5 +++- .../SlotSharingExecutionSlotAllocator.java | 33 ++++++++++++++-------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java index 6123031..e35e0da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java @@ -110,6 +110,10 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload { return physicalSlotResourceProfile; } + public ExecutionSlotSharingGroup getExecutionSlotSharingGroup() { + return executionSlotSharingGroup; + } + CompletableFuture<PhysicalSlot> getSlotContextFuture() { return slotContextFuture; } @@ -123,7 +127,6 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload { * @return the logical slot future */ CompletableFuture<LogicalSlot> allocateLogicalSlot(ExecutionVertexID executionVertexId) { - Preconditions.checkState(state == State.ALLOCATED, "SharedSlot (physical request %s) has been released", physicalSlotRequestId); Preconditions.checkArgument( executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId), "Trying to allocate a logical slot for execution %s which is not in the ExecutionSlotSharingGroup", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java index 921e86d..3bf619c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java @@ -123,10 +123,19 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator { Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executionsByGroup = executionVertexIds .stream() .collect(Collectors.groupingBy(slotSharingStrategy::getExecutionSlotSharingGroup)); + Map<ExecutionSlotSharingGroup, SharedSlot> slots = executionsByGroup + .keySet() + .stream() + .map(group -> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) + .collect(Collectors.toMap(SharedSlot::getExecutionSlotSharingGroup, Function.identity())); Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments = - allocateLogicalSlotsFromSharedSlots(sharedSlotProfileRetriever, executionsByGroup); + allocateLogicalSlotsFromSharedSlots(slots, executionsByGroup); - bulkChecker.schedulePendingRequestBulkTimeoutCheck(createBulk(executionsByGroup), allocationTimeout); + // we need to pass the slots map to the createBulk method instead of using the allocator's 'sharedSlots' + // because if any physical slots have already failed, their shared slots have been removed + // from the allocator's 'sharedSlots' by failed logical slots. + SharingPhysicalSlotRequestBulk bulk = createBulk(slots, executionsByGroup); + bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, allocationTimeout); return executionVertexIds.stream().map(assignments::get).collect(Collectors.toList()); } @@ -150,8 +159,8 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator { } } - private Map<ExecutionVertexID, SlotExecutionVertexAssignment> allocateLogicalSlotsFromSharedSlots( - SharedSlotProfileRetriever sharedSlotProfileRetriever, + private static Map<ExecutionVertexID, SlotExecutionVertexAssignment> allocateLogicalSlotsFromSharedSlots( + Map<ExecutionSlotSharingGroup, SharedSlot> slots, Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executionsByGroup) { Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments = new HashMap<>(); @@ -159,10 +168,9 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator { for (Map.Entry<ExecutionSlotSharingGroup, List<ExecutionVertexID>> entry : executionsByGroup.entrySet()) { ExecutionSlotSharingGroup group = entry.getKey(); List<ExecutionVertexID> executionIds = entry.getValue(); - SharedSlot sharedSlot = getOrAllocateSharedSlot(group, sharedSlotProfileRetriever); for (ExecutionVertexID executionId : executionIds) { - CompletableFuture<LogicalSlot> logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId); + CompletableFuture<LogicalSlot> logicalSlotFuture = slots.get(group).allocateLogicalSlot(executionId); SlotExecutionVertexAssignment assignment = new SlotExecutionVertexAssignment(executionId, logicalSlotFuture); assignments.put(executionId, assignment); } @@ -213,27 +221,30 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator { .reduce(ResourceProfile.ZERO, (r, e) -> r.merge(resourceProfileRetriever.apply(e)), ResourceProfile::merge); } - private SharingPhysicalSlotRequestBulk createBulk(Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) { + private SharingPhysicalSlotRequestBulk createBulk( + Map<ExecutionSlotSharingGroup, SharedSlot> slots, + Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) { Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests = executions .keySet() .stream() .collect(Collectors.toMap( group -> group, - group -> sharedSlots.get(group).getPhysicalSlotResourceProfile() + group -> slots.get(group).getPhysicalSlotResourceProfile() )); SharingPhysicalSlotRequestBulk bulk = new SharingPhysicalSlotRequestBulk( executions, pendingRequests, this::cancelLogicalSlotRequest); - registerPhysicalSlotRequestBulkCallbacks(executions.keySet(), bulk); + registerPhysicalSlotRequestBulkCallbacks(slots, executions.keySet(), bulk); return bulk; } - private void registerPhysicalSlotRequestBulkCallbacks( + private static void registerPhysicalSlotRequestBulkCallbacks( + Map<ExecutionSlotSharingGroup, SharedSlot> slots, Iterable<ExecutionSlotSharingGroup> executions, SharingPhysicalSlotRequestBulk bulk) { for (ExecutionSlotSharingGroup group : executions) { - CompletableFuture<PhysicalSlot> slotContextFuture = sharedSlots.get(group).getSlotContextFuture(); + CompletableFuture<PhysicalSlot> slotContextFuture = slots.get(group).getSlotContextFuture(); slotContextFuture.thenAccept(physicalSlot -> bulk.markFulfilled(group, physicalSlot.getAllocationId())); slotContextFuture.exceptionally(t -> { // clear the bulk to stop the fulfillability check
