This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6b3d291f6a5 [FLINK-32623] Return correct vertex resource lower bound
6b3d291f6a5 is described below
commit 6b3d291f6a573fb34a528313e5683d3a48a66771
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Jul 18 15:55:29 2023 +0200
[FLINK-32623] Return correct vertex resource lower bound
---
.../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java | 3 ++-
.../runtime/scheduler/adaptive/AdaptiveSchedulerTest.java | 10 ++++++++++
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 8d026e31596..ddd6fd5ea95 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -798,7 +798,8 @@ public class AdaptiveScheduler
public JobResourceRequirements requestJobResourceRequirements() {
final JobResourceRequirements.Builder builder =
JobResourceRequirements.newBuilder();
for (JobInformation.VertexInformation vertex :
jobInformation.getVertices()) {
- builder.setParallelismForJobVertex(vertex.getJobVertexID(), 1,
vertex.getParallelism());
+ builder.setParallelismForJobVertex(
+ vertex.getJobVertexID(), vertex.getMinParallelism(),
vertex.getParallelism());
}
return builder.build();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index dfa292c313e..be927fe81bf 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -2062,6 +2062,16 @@ public class AdaptiveSchedulerTest {
scheduler.updateJobResourceRequirements(newJobResourceRequirements);
assertThat(scheduler.requestJobResourceRequirements())
.isEqualTo(newJobResourceRequirements);
+
+ final JobResourceRequirements newJobResourceRequirements2 =
+ JobResourceRequirements.newBuilder()
+ .setParallelismForJobVertex(JOB_VERTEX.getID(), 4, 12)
+ .build();
+ assertThat(scheduler.requestJobResourceRequirements())
+ .isNotEqualTo(newJobResourceRequirements2);
+ scheduler.updateJobResourceRequirements(newJobResourceRequirements2);
+ assertThat(scheduler.requestJobResourceRequirements())
+ .isEqualTo(newJobResourceRequirements2);
}
//
---------------------------------------------------------------------------------------------