This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b3d291f6a5 [FLINK-32623] Return correct vertex resource lower bound
6b3d291f6a5 is described below

commit 6b3d291f6a573fb34a528313e5683d3a48a66771
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Jul 18 15:55:29 2023 +0200

    [FLINK-32623] Return correct vertex resource lower bound
---
 .../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java    |  3 ++-
 .../runtime/scheduler/adaptive/AdaptiveSchedulerTest.java      | 10 ++++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 8d026e31596..ddd6fd5ea95 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -798,7 +798,8 @@ public class AdaptiveScheduler
     public JobResourceRequirements requestJobResourceRequirements() {
         final JobResourceRequirements.Builder builder = 
JobResourceRequirements.newBuilder();
         for (JobInformation.VertexInformation vertex : 
jobInformation.getVertices()) {
-            builder.setParallelismForJobVertex(vertex.getJobVertexID(), 1, 
vertex.getParallelism());
+            builder.setParallelismForJobVertex(
+                    vertex.getJobVertexID(), vertex.getMinParallelism(), 
vertex.getParallelism());
         }
         return builder.build();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index dfa292c313e..be927fe81bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -2062,6 +2062,16 @@ public class AdaptiveSchedulerTest {
         scheduler.updateJobResourceRequirements(newJobResourceRequirements);
         assertThat(scheduler.requestJobResourceRequirements())
                 .isEqualTo(newJobResourceRequirements);
+
+        final JobResourceRequirements newJobResourceRequirements2 =
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(JOB_VERTEX.getID(), 4, 12)
+                        .build();
+        assertThat(scheduler.requestJobResourceRequirements())
+                .isNotEqualTo(newJobResourceRequirements2);
+        scheduler.updateJobResourceRequirements(newJobResourceRequirements2);
+        assertThat(scheduler.requestJobResourceRequirements())
+                .isEqualTo(newJobResourceRequirements2);
     }
 
     // 
---------------------------------------------------------------------------------------------

Reply via email to