This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 4342636c [FLINK-34311] Do not change min resource requirements when
rescaling for adaptive scheduler
4342636c is described below
commit 4342636cdb2c3439389e83cb4fe4366156edfbd7
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Jan 30 15:43:46 2024 +0100
[FLINK-34311] Do not change min resource requirements when rescaling for
adaptive scheduler
---
.../kubernetes/operator/service/NativeFlinkService.java | 12 +++++++++---
.../kubernetes/operator/service/NativeFlinkServiceTest.java | 10 +++++++---
2 files changed, 16 insertions(+), 6 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 9781a5b0..41083d91 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -200,9 +200,15 @@ public class NativeFlinkService extends
AbstractFlinkService {
var overrideStr = newOverrides.get(jobId);
if (overrideStr != null) {
- // We have an override for the vertex
- int p = Integer.parseInt(overrideStr);
- var newParallelism = new
JobVertexResourceRequirements.Parallelism(p, p);
+ // We set the parallelism upper bound to the target
parallelism, anything higher
+ // would defeat the purpose of scaling down
+ int upperBound = Integer.parseInt(overrideStr);
+ // We only change the lower bound if the new parallelism
went below it. As we
+ // cannot guarantee that new resources can be acquired,
increasing the lower
+ // bound to the target could potentially cause job failure.
+ int lowerBound = Math.min(upperBound,
parallelism.getLowerBound());
+ var newParallelism =
+ new
JobVertexResourceRequirements.Parallelism(lowerBound, upperBound);
// If the requirements changed we mark this as scaling
triggered
if (!parallelism.equals(newParallelism)) {
entry.setValue(new
JobVertexResourceRequirements(newParallelism));
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index c8c0afb3..693a9019 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -229,7 +229,9 @@ public class NativeFlinkServiceTest {
var reconStatus = flinkDep.getStatus().getReconciliationStatus();
reconStatus.serializeAndSetLastReconciledSpec(spec, flinkDep);
- appConfig.set(PipelineOptions.PARALLELISM_OVERRIDES,
Map.of(v1.toHexString(), "4"));
+ appConfig.set(
+ PipelineOptions.PARALLELISM_OVERRIDES,
+ Map.of(v1.toHexString(), "4", v2.toHexString(), "1"));
spec.setFlinkConfiguration(appConfig.toMap());
flinkDep.getStatus().getJobStatus().setState("RUNNING");
@@ -256,13 +258,15 @@ public class NativeFlinkServiceTest {
Map.of(
v1,
new JobVertexResourceRequirements(
- new
JobVertexResourceRequirements.Parallelism(4, 4)),
+ new
JobVertexResourceRequirements.Parallelism(1, 4)),
v2,
new JobVertexResourceRequirements(
- new
JobVertexResourceRequirements.Parallelism(2, 2))),
+ new
JobVertexResourceRequirements.Parallelism(1, 1))),
updated.get());
// Baseline
+ appConfig.set(PipelineOptions.PARALLELISM_OVERRIDES,
Map.of(v1.toHexString(), "4"));
+ spec.setFlinkConfiguration(appConfig.toMap());
testScaleConditionDep(
flinkDep, service, d -> {},
FlinkService.ScalingResult.SCALING_TRIGGERED);
testScaleConditionLastSpec(