This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cd71506fe474eca3b02fbb064912fba9f242b94
Author: Rui Fan <[email protected]>
AuthorDate: Mon Jan 1 11:59:54 2024 +0800

    [FLINK-33960][Scheduler] Fix the bug that Adaptive Scheduler doesn't 
respect the lowerBound when one flink job has more than 1 tasks
---
 .../allocator/SlotSharingSlotAllocator.java        | 34 ++++++++++++----------
 .../allocator/SlotSharingSlotAllocatorTest.java    | 21 +++++++++++++
 2 files changed, 40 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 9849a923d77..8880bd2471e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -90,7 +90,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
 
         final int minimumRequiredSlots =
                 slotSharingGroupMetaInfo.values().stream()
-                        .map(SlotSharingGroupMetaInfo::getMinLowerBound)
+                        .map(SlotSharingGroupMetaInfo::getMaxLowerBound)
                         .reduce(0, Integer::sum);
 
         if (minimumRequiredSlots > freeSlots.size()) {
@@ -164,7 +164,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         for (SlotSharingGroupId slotSharingGroup :
                 
sortSlotSharingGroupsByHighestParallelismRange(slotSharingGroupMetaInfo)) {
             final int minParallelism =
-                    
slotSharingGroupMetaInfo.get(slotSharingGroup).getMinLowerBound();
+                    
slotSharingGroupMetaInfo.get(slotSharingGroup).getMaxLowerBound();
 
             // if we reached this point we know we have more slots than we 
need to fulfill the
             // minimum requirements for each slot sharing group.
@@ -302,26 +302,29 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     private static class SlotSharingGroupMetaInfo {
 
         private final int minLowerBound;
+        private final int maxLowerBound;
         private final int maxUpperBound;
-        private final int maxLowerUpperBoundRange;
 
-        private SlotSharingGroupMetaInfo(
-                int minLowerBound, int maxUpperBound, int 
maxLowerUpperBoundRange) {
+        private SlotSharingGroupMetaInfo(int minLowerBound, int maxLowerBound, 
int maxUpperBound) {
             this.minLowerBound = minLowerBound;
+            this.maxLowerBound = maxLowerBound;
             this.maxUpperBound = maxUpperBound;
-            this.maxLowerUpperBoundRange = maxLowerUpperBoundRange;
         }
 
         public int getMinLowerBound() {
             return minLowerBound;
         }
 
+        public int getMaxLowerBound() {
+            return maxLowerBound;
+        }
+
         public int getMaxUpperBound() {
             return maxUpperBound;
         }
 
         public int getMaxLowerUpperBoundRange() {
-            return maxLowerUpperBoundRange;
+            return maxUpperBound - maxLowerBound;
         }
 
         public static Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> from(
@@ -332,18 +335,19 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
                     vertexInformation ->
                             new SlotSharingGroupMetaInfo(
                                     vertexInformation.getMinParallelism(),
-                                    vertexInformation.getParallelism(),
-                                    vertexInformation.getParallelism()
-                                            - 
vertexInformation.getMinParallelism()),
+                                    vertexInformation.getMinParallelism(),
+                                    vertexInformation.getParallelism()),
                     (metaInfo1, metaInfo2) ->
                             new SlotSharingGroupMetaInfo(
-                                    Math.min(metaInfo1.getMinLowerBound(), 
metaInfo2.minLowerBound),
+                                    Math.min(
+                                            metaInfo1.getMinLowerBound(),
+                                            metaInfo2.getMinLowerBound()),
                                     Math.max(
-                                            metaInfo1.getMaxUpperBound(),
-                                            metaInfo2.getMaxUpperBound()),
+                                            metaInfo1.getMaxLowerBound(),
+                                            metaInfo2.getMaxLowerBound()),
                                     Math.max(
-                                            
metaInfo1.getMaxLowerUpperBoundRange(),
-                                            
metaInfo2.getMaxLowerUpperBoundRange())));
+                                            metaInfo1.getMaxUpperBound(),
+                                            metaInfo2.getMaxUpperBound())));
         }
 
         private static <T> Map<SlotSharingGroupId, T> getPerSlotSharingGroups(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index 82e38a269c3..e6409553aa6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -229,6 +229,27 @@ class SlotSharingSlotAllocatorTest {
         assertThat(vertexParallelism).isNotPresent();
     }
 
+    @Test
+    void 
testDetermineParallelismWithLowerBoundsInsufficientSlotsForPartialVertices() {
+        final SlotSharingSlotAllocator slotAllocator =
+                SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
+                        TEST_RESERVE_SLOT_FUNCTION,
+                        TEST_FREE_SLOT_FUNCTION,
+                        TEST_IS_SLOT_FREE_FUNCTION);
+        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+        final JobInformation.VertexInformation vertex1 =
+                new TestVertexInformation(new JobVertexID(), 2, 2, 
slotSharingGroup);
+        final JobInformation.VertexInformation vertex2 =
+                new TestVertexInformation(new JobVertexID(), 8, 8, 
slotSharingGroup);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2));
+        final Optional<VertexParallelism> vertexParallelism =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(5));
+
+        assertThat(vertexParallelism).isNotPresent();
+    }
+
     @Test
     void testDetermineParallelismWithAllEqualLowerUpperBoundFreSlots() {
         final SlotSharingSlotAllocator slotAllocator =

Reply via email to