This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 505da97b [FLINK-32134] Autoscaler min/max parallelism configs should
respect current parallelism
505da97b is described below
commit 505da97b73798f5f8134f56354ef6e205bddb4e9
Author: Gyula Fora <[email protected]>
AuthorDate: Fri May 19 15:14:41 2023 +0200
[FLINK-32134] Autoscaler min/max parallelism configs should respect current
parallelism
---
.../operator/autoscaler/JobVertexScaler.java | 4 ++--
.../operator/autoscaler/JobVertexScalerTest.java | 20 ++++++++++++++++++++
2 files changed, 22 insertions(+), 2 deletions(-)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index 90d46a87..f6c6a884 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -120,8 +120,8 @@ public class JobVertexScaler {
currentParallelism,
(int)
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
scaleFactor,
- conf.getInteger(VERTEX_MIN_PARALLELISM),
- conf.getInteger(VERTEX_MAX_PARALLELISM));
+ Math.min(currentParallelism,
conf.getInteger(VERTEX_MIN_PARALLELISM)),
+ Math.max(currentParallelism,
conf.getInteger(VERTEX_MAX_PARALLELISM)));
if (newParallelism == currentParallelism
|| blockScalingBasedOnPastActions(
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index 0548229e..fb4655df 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -187,6 +187,16 @@ public class JobVertexScalerTest {
new JobVertexID(),
evaluated(10, 100, 500),
Collections.emptySortedMap()));
+
+ // Make sure we respect current parallelism in case it's lower
+ assertEquals(
+ 4,
+ vertexScaler.computeScaleTargetParallelism(
+ flinkDep,
+ conf,
+ new JobVertexID(),
+ evaluated(4, 100, 500),
+ Collections.emptySortedMap()));
}
@Test
@@ -201,6 +211,16 @@ public class JobVertexScalerTest {
new JobVertexID(),
evaluated(10, 500, 100),
Collections.emptySortedMap()));
+
+ // Make sure we respect current parallelism in case it's higher
+ assertEquals(
+ 12,
+ vertexScaler.computeScaleTargetParallelism(
+ flinkDep,
+ conf,
+ new JobVertexID(),
+ evaluated(12, 500, 100),
+ Collections.emptySortedMap()));
}
@Test