This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 4342636c [FLINK-34311] Do not change min resource requirements when 
rescaling for adaptive scheduler
4342636c is described below

commit 4342636cdb2c3439389e83cb4fe4366156edfbd7
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Jan 30 15:43:46 2024 +0100

    [FLINK-34311] Do not change min resource requirements when rescaling for 
adaptive scheduler
---
 .../kubernetes/operator/service/NativeFlinkService.java      | 12 +++++++++---
 .../kubernetes/operator/service/NativeFlinkServiceTest.java  | 10 +++++++---
 2 files changed, 16 insertions(+), 6 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 9781a5b0..41083d91 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -200,9 +200,15 @@ public class NativeFlinkService extends 
AbstractFlinkService {
                 var overrideStr = newOverrides.get(jobId);
 
                 if (overrideStr != null) {
-                    // We have an override for the vertex
-                    int p = Integer.parseInt(overrideStr);
-                    var newParallelism = new 
JobVertexResourceRequirements.Parallelism(p, p);
+                    // We set the parallelism upper bound to the target 
parallelism, anything higher
+                    // would defeat the purpose of scaling down
+                    int upperBound = Integer.parseInt(overrideStr);
+                    // We only change the lower bound if the new parallelism 
went below it. As we
+                    // cannot guarantee that new resources can be acquired, 
increasing the lower
+                    // bound to the target could potentially cause job failure.
+                    int lowerBound = Math.min(upperBound, 
parallelism.getLowerBound());
+                    var newParallelism =
+                            new 
JobVertexResourceRequirements.Parallelism(lowerBound, upperBound);
                     // If the requirements changed we mark this as scaling 
triggered
                     if (!parallelism.equals(newParallelism)) {
                         entry.setValue(new 
JobVertexResourceRequirements(newParallelism));
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index c8c0afb3..693a9019 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -229,7 +229,9 @@ public class NativeFlinkServiceTest {
         var reconStatus = flinkDep.getStatus().getReconciliationStatus();
         reconStatus.serializeAndSetLastReconciledSpec(spec, flinkDep);
 
-        appConfig.set(PipelineOptions.PARALLELISM_OVERRIDES, 
Map.of(v1.toHexString(), "4"));
+        appConfig.set(
+                PipelineOptions.PARALLELISM_OVERRIDES,
+                Map.of(v1.toHexString(), "4", v2.toHexString(), "1"));
         spec.setFlinkConfiguration(appConfig.toMap());
 
         flinkDep.getStatus().getJobStatus().setState("RUNNING");
@@ -256,13 +258,15 @@ public class NativeFlinkServiceTest {
                 Map.of(
                         v1,
                                 new JobVertexResourceRequirements(
-                                        new 
JobVertexResourceRequirements.Parallelism(4, 4)),
+                                        new 
JobVertexResourceRequirements.Parallelism(1, 4)),
                         v2,
                                 new JobVertexResourceRequirements(
-                                        new 
JobVertexResourceRequirements.Parallelism(2, 2))),
+                                        new 
JobVertexResourceRequirements.Parallelism(1, 1))),
                 updated.get());
 
         // Baseline
+        appConfig.set(PipelineOptions.PARALLELISM_OVERRIDES, 
Map.of(v1.toHexString(), "4"));
+        spec.setFlinkConfiguration(appConfig.toMap());
         testScaleConditionDep(
                 flinkDep, service, d -> {}, 
FlinkService.ScalingResult.SCALING_TRIGGERED);
         testScaleConditionLastSpec(

Reply via email to