This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6cd71506fe474eca3b02fbb064912fba9f242b94 Author: Rui Fan <[email protected]> AuthorDate: Mon Jan 1 11:59:54 2024 +0800 [FLINK-33960][Scheduler] Fix the bug that Adaptive Scheduler doesn't respect the lowerBound when one flink job has more than 1 tasks --- .../allocator/SlotSharingSlotAllocator.java | 34 ++++++++++++---------- .../allocator/SlotSharingSlotAllocatorTest.java | 21 +++++++++++++ 2 files changed, 40 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java index 9849a923d77..8880bd2471e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java @@ -90,7 +90,7 @@ public class SlotSharingSlotAllocator implements SlotAllocator { final int minimumRequiredSlots = slotSharingGroupMetaInfo.values().stream() - .map(SlotSharingGroupMetaInfo::getMinLowerBound) + .map(SlotSharingGroupMetaInfo::getMaxLowerBound) .reduce(0, Integer::sum); if (minimumRequiredSlots > freeSlots.size()) { @@ -164,7 +164,7 @@ public class SlotSharingSlotAllocator implements SlotAllocator { for (SlotSharingGroupId slotSharingGroup : sortSlotSharingGroupsByHighestParallelismRange(slotSharingGroupMetaInfo)) { final int minParallelism = - slotSharingGroupMetaInfo.get(slotSharingGroup).getMinLowerBound(); + slotSharingGroupMetaInfo.get(slotSharingGroup).getMaxLowerBound(); // if we reached this point we know we have more slots than we need to fulfill the // minimum requirements for each slot sharing group. @@ -302,26 +302,29 @@ public class SlotSharingSlotAllocator implements SlotAllocator { private static class SlotSharingGroupMetaInfo { private final int minLowerBound; + private final int maxLowerBound; private final int maxUpperBound; - private final int maxLowerUpperBoundRange; - private SlotSharingGroupMetaInfo( - int minLowerBound, int maxUpperBound, int maxLowerUpperBoundRange) { + private SlotSharingGroupMetaInfo(int minLowerBound, int maxLowerBound, int maxUpperBound) { this.minLowerBound = minLowerBound; + this.maxLowerBound = maxLowerBound; this.maxUpperBound = maxUpperBound; - this.maxLowerUpperBoundRange = maxLowerUpperBoundRange; } public int getMinLowerBound() { return minLowerBound; } + public int getMaxLowerBound() { + return maxLowerBound; + } + public int getMaxUpperBound() { return maxUpperBound; } public int getMaxLowerUpperBoundRange() { - return maxLowerUpperBoundRange; + return maxUpperBound - maxLowerBound; } public static Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> from( @@ -332,18 +335,19 @@ public class SlotSharingSlotAllocator implements SlotAllocator { vertexInformation -> new SlotSharingGroupMetaInfo( vertexInformation.getMinParallelism(), - vertexInformation.getParallelism(), - vertexInformation.getParallelism() - - vertexInformation.getMinParallelism()), + vertexInformation.getMinParallelism(), + vertexInformation.getParallelism()), (metaInfo1, metaInfo2) -> new SlotSharingGroupMetaInfo( - Math.min(metaInfo1.getMinLowerBound(), metaInfo2.minLowerBound), + Math.min( + metaInfo1.getMinLowerBound(), + metaInfo2.getMinLowerBound()), Math.max( - metaInfo1.getMaxUpperBound(), - metaInfo2.getMaxUpperBound()), + metaInfo1.getMaxLowerBound(), + metaInfo2.getMaxLowerBound()), Math.max( - metaInfo1.getMaxLowerUpperBoundRange(), - metaInfo2.getMaxLowerUpperBoundRange()))); + metaInfo1.getMaxUpperBound(), + metaInfo2.getMaxUpperBound()))); } private static <T> Map<SlotSharingGroupId, T> getPerSlotSharingGroups( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java index 82e38a269c3..e6409553aa6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java @@ -229,6 +229,27 @@ class SlotSharingSlotAllocatorTest { assertThat(vertexParallelism).isNotPresent(); } + @Test + void testDetermineParallelismWithLowerBoundsInsufficientSlotsForPartialVertices() { + final SlotSharingSlotAllocator slotAllocator = + SlotSharingSlotAllocator.createSlotSharingSlotAllocator( + TEST_RESERVE_SLOT_FUNCTION, + TEST_FREE_SLOT_FUNCTION, + TEST_IS_SLOT_FREE_FUNCTION); + SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); + final JobInformation.VertexInformation vertex1 = + new TestVertexInformation(new JobVertexID(), 2, 2, slotSharingGroup); + final JobInformation.VertexInformation vertex2 = + new TestVertexInformation(new JobVertexID(), 8, 8, slotSharingGroup); + + final JobInformation jobInformation = + new TestJobInformation(Arrays.asList(vertex1, vertex2)); + final Optional<VertexParallelism> vertexParallelism = + slotAllocator.determineParallelism(jobInformation, getSlots(5)); + + assertThat(vertexParallelism).isNotPresent(); + } + @Test void testDetermineParallelismWithAllEqualLowerUpperBoundFreSlots() { final SlotSharingSlotAllocator slotAllocator =
