This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 66f7b42f [FLINK-36836][Autoscaler] Supports config the upper and lower 
limits of target utilization (#921)
66f7b42f is described below

commit 66f7b42fb4358a0ac407c498912fcd39aa0be996
Author: big face cat <731030...@qq.com>
AuthorDate: Tue Jan 14 10:24:37 2025 +0800

    [FLINK-36836][Autoscaler] Supports config the upper and lower limits of 
target utilization (#921)
---
 .../generated/auto_scaler_configuration.html       |  18 +-
 .../apache/flink/autoscaler/JobVertexScaler.java   |   4 +-
 .../flink/autoscaler/ScalingMetricEvaluator.java   |  16 +-
 .../flink/autoscaler/config/AutoScalerOptions.java |  24 ++-
 .../flink/autoscaler/BacklogBasedScalingTest.java  |   5 +-
 .../flink/autoscaler/JobVertexScalerTest.java      |  33 ++--
 .../MetricsCollectionAndEvaluationTest.java        |  10 +-
 .../autoscaler/RecommendedParallelismTest.java     |   5 +-
 .../flink/autoscaler/ScalingExecutorTest.java      | 117 ++++++++++--
 .../autoscaler/ScalingMetricEvaluatorTest.java     |  13 +-
 .../operator/validation/DefaultValidator.java      |  16 +-
 .../operator/validation/DefaultValidatorTest.java  | 201 +++++++++++++++++----
 12 files changed, 369 insertions(+), 93 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 09baf217..ab2bbcb2 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -201,16 +201,22 @@
             <td>Stabilization period in which no new scaling will be 
executed</td>
         </tr>
         <tr>
-            <td><h5>job.autoscaler.target.utilization</h5></td>
-            <td style="word-wrap: break-word;">0.7</td>
+            <td><h5>job.autoscaler.utilization.max</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
             <td>Double</td>
-            <td>Target vertex utilization</td>
+            <td>Max vertex utilization</td>
         </tr>
         <tr>
-            <td><h5>job.autoscaler.target.utilization.boundary</h5></td>
-            <td style="word-wrap: break-word;">0.3</td>
+            <td><h5>job.autoscaler.utilization.min</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
             <td>Double</td>
-            <td>Target vertex utilization boundary. Scaling won't be performed 
if the processing capacity is within [target_rate / (target_utilization - 
boundary), (target_rate / (target_utilization + boundary)]</td>
+            <td>Min vertex utilization</td>
+        </tr>
+        <tr>
+            <td><h5>job.autoscaler.utilization.target</h5></td>
+            <td style="word-wrap: break-word;">0.7</td>
+            <td>Double</td>
+            <td>Target vertex utilization</td>
         </tr>
         <tr>
             <td><h5>job.autoscaler.vertex.exclude.ids</h5></td>
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 40f25c77..41075b7f 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -49,7 +49,7 @@ import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
@@ -158,7 +158,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
         double targetCapacity =
                 AutoScalerUtils.getTargetProcessingCapacity(
-                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), 
true, restartTime);
+                        evaluatedMetrics, conf, conf.get(UTILIZATION_TARGET), 
true, restartTime);
         if (Double.isNaN(targetCapacity)) {
             LOG.warn(
                     "Target data rate is not available for {}, cannot compute 
new parallelism",
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index 5bbc09a3..58c5dbe4 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -44,8 +44,10 @@ import java.util.Optional;
 import java.util.SortedMap;
 
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO;
@@ -284,8 +286,8 @@ public class ScalingMetricEvaluator {
             boolean processingBacklog,
             Duration restartTime) {
 
-        double utilizationBoundary = 
conf.getDouble(TARGET_UTILIZATION_BOUNDARY);
-        double targetUtilization = conf.get(TARGET_UTILIZATION);
+        double targetUtilization = conf.get(UTILIZATION_TARGET);
+        double utilizationBoundary = conf.get(TARGET_UTILIZATION_BOUNDARY);
 
         double upperUtilization;
         double lowerUtilization;
@@ -296,8 +298,12 @@ public class ScalingMetricEvaluator {
             upperUtilization = 1.0;
             lowerUtilization = 0.0;
         } else {
-            upperUtilization = targetUtilization + utilizationBoundary;
-            lowerUtilization = targetUtilization - utilizationBoundary;
+            upperUtilization =
+                    conf.getOptional(UTILIZATION_MAX)
+                            .orElse(targetUtilization + utilizationBoundary);
+            lowerUtilization =
+                    conf.getOptional(UTILIZATION_MIN)
+                            .orElse(targetUtilization - utilizationBoundary);
         }
 
         double scaleUpThreshold =
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index a5ffb0f9..980db2f4 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -89,13 +89,17 @@ public class AutoScalerOptions {
                                     + "seconds suffix, daily expression's 
formation is startTime-endTime, such as 9:30:30-10:50:20, when exclude from 
9:30:30-10:50:20 in Monday and Thursday "
                                     + "we can express it as 9:30:30-10:50:20 
&& * * * ? * 2,5");
 
-    public static final ConfigOption<Double> TARGET_UTILIZATION =
-            autoScalerConfig("target.utilization")
+    public static final ConfigOption<Double> UTILIZATION_TARGET =
+            autoScalerConfig("utilization.target")
                     .doubleType()
                     .defaultValue(0.7)
-                    
.withFallbackKeys(oldOperatorConfigKey("target.utilization"))
+                    
.withDeprecatedKeys(autoScalerConfigKey("target.utilization"))
+                    .withFallbackKeys(
+                            oldOperatorConfigKey("utilization.target"),
+                            oldOperatorConfigKey("target.utilization"))
                     .withDescription("Target vertex utilization");
 
+    @Deprecated
     public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY =
             autoScalerConfig("target.utilization.boundary")
                     .doubleType()
@@ -104,6 +108,20 @@ public class AutoScalerOptions {
                     .withDescription(
                             "Target vertex utilization boundary. Scaling won't 
be performed if the processing capacity is within [target_rate / 
(target_utilization - boundary), (target_rate / (target_utilization + 
boundary)]");
 
+    public static final ConfigOption<Double> UTILIZATION_MAX =
+            autoScalerConfig("utilization.max")
+                    .doubleType()
+                    .noDefaultValue()
+                    .withFallbackKeys(oldOperatorConfigKey("utilization.max"))
+                    .withDescription("Max vertex utilization");
+
+    public static final ConfigOption<Double> UTILIZATION_MIN =
+            autoScalerConfig("utilization.min")
+                    .doubleType()
+                    .noDefaultValue()
+                    .withFallbackKeys(oldOperatorConfigKey("utilization.min"))
+                    .withDescription("Min vertex utilization");
+
     public static final ConfigOption<Duration> SCALE_DOWN_INTERVAL =
             autoScalerConfig("scale-down.interval")
                     .durationType()
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 0aae09c9..4599d634 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -95,8 +95,9 @@ public class BacklogBasedScalingTest {
         defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
         defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
         defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) 
Integer.MAX_VALUE);
-        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
-        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+        defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8);
+        defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
+        defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
         defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
         defaultConf.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD, 
Duration.ofSeconds(1));
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 3c557ab3..af01ce34 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -49,6 +49,7 @@ import static 
org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_MESSAGE_FO
 import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING;
 import static 
org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT;
 import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -98,7 +99,7 @@ public class JobVertexScalerTest {
     @MethodSource("adjustmentInputsProvider")
     public void testParallelismScaling(Collection<ShipStrategy> 
inputShipStrategies) {
         var op = new JobVertexID();
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
         var delayedScaleDown = new DelayedScaleDown();
 
@@ -113,7 +114,7 @@ public class JobVertexScalerTest {
                         restartTime,
                         delayedScaleDown));
 
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        conf.set(UTILIZATION_TARGET, .8);
         assertEquals(
                 ParallelismChange.build(8),
                 vertexScaler.computeScaleTargetParallelism(
@@ -125,7 +126,7 @@ public class JobVertexScalerTest {
                         restartTime,
                         delayedScaleDown));
 
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        conf.set(UTILIZATION_TARGET, .8);
         assertEquals(
                 ParallelismChange.noChange(),
                 vertexScaler.computeScaleTargetParallelism(
@@ -137,7 +138,7 @@ public class JobVertexScalerTest {
                         restartTime,
                         delayedScaleDown));
 
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        conf.set(UTILIZATION_TARGET, .8);
         assertEquals(
                 ParallelismChange.build(8),
                 vertexScaler.computeScaleTargetParallelism(
@@ -160,7 +161,7 @@ public class JobVertexScalerTest {
                         restartTime,
                         delayedScaleDown));
 
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
+        conf.set(UTILIZATION_TARGET, 0.5);
         assertEquals(
                 ParallelismChange.build(10),
                 vertexScaler.computeScaleTargetParallelism(
@@ -172,7 +173,7 @@ public class JobVertexScalerTest {
                         restartTime,
                         delayedScaleDown));
 
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+        conf.set(UTILIZATION_TARGET, 0.6);
         assertEquals(
                 ParallelismChange.build(4),
                 vertexScaler.computeScaleTargetParallelism(
@@ -184,7 +185,7 @@ public class JobVertexScalerTest {
                         restartTime,
                         delayedScaleDown));
 
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
         assertEquals(
                 ParallelismChange.build(5),
@@ -209,7 +210,7 @@ public class JobVertexScalerTest {
                         restartTime,
                         delayedScaleDown));
 
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
         assertEquals(
                 ParallelismChange.build(15),
@@ -558,7 +559,7 @@ public class JobVertexScalerTest {
     @Test
     public void testMaxParallelismLimitIsUsed() {
         conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(UTILIZATION_TARGET, 1.);
         var delayedScaleDown = new DelayedScaleDown();
 
         assertEquals(
@@ -587,7 +588,7 @@ public class JobVertexScalerTest {
 
     @Test
     public void testDisableScaleDownInterval() {
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(0));
 
         var delayedScaleDown = new DelayedScaleDown();
@@ -597,7 +598,7 @@ public class JobVertexScalerTest {
 
     @Test
     public void testScaleDownAfterInterval() {
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
         var instant = Instant.now();
 
@@ -629,7 +630,7 @@ public class JobVertexScalerTest {
 
     @Test
     public void testImmediateScaleUpWithinScaleDownInterval() {
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
         var instant = Instant.now();
 
@@ -655,7 +656,7 @@ public class JobVertexScalerTest {
 
     @Test
     public void testCancelDelayedScaleDownAfterNewParallelismIsSame() {
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
         var instant = Instant.now();
 
@@ -701,7 +702,7 @@ public class JobVertexScalerTest {
     public void testIneffectiveScalingDetection() {
         var op = new JobVertexID();
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
true);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
 
         var evaluated = evaluated(5, 100, 50);
@@ -826,7 +827,7 @@ public class JobVertexScalerTest {
     public void testSendingIneffectiveScalingEvents(Collection<ShipStrategy> 
inputShipStrategies) {
         var jobVertexID = new JobVertexID();
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
true);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
+        conf.set(UTILIZATION_TARGET, 1.0);
         conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
 
         var evaluated = evaluated(5, 100, 50);
@@ -1082,7 +1083,7 @@ public class JobVertexScalerTest {
     @Test
     public void testSendingScalingLimitedEvents() {
         var jobVertexID = new JobVertexID();
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
+        conf.set(UTILIZATION_TARGET, 1.0);
         conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
         conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
         var evaluated = evaluated(10, 200, 100);
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index 102586a3..ad2f5c72 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -123,8 +123,9 @@ public class MetricsCollectionAndEvaluationTest {
     @Test
     public void testEndToEnd() throws Exception {
         var conf = context.getConfiguration();
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.);
 
         setDefaultMetrics(metricsCollector);
 
@@ -344,8 +345,9 @@ public class MetricsCollectionAndEvaluationTest {
     @Test
     public void testClearHistoryOnTopoChange() throws Exception {
         var conf = context.getConfiguration();
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.);
 
         setDefaultMetrics(metricsCollector);
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index 4b35d72a..8637e126 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -86,8 +86,9 @@ public class RecommendedParallelismTest {
         defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
         defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
         defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) 
Integer.MAX_VALUE);
-        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
-        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+        defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8);
+        defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
+        defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
         defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
 
         autoscaler =
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index d3fc1674..a3bc6746 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -128,15 +128,17 @@ public class ScalingExecutorTest {
 
         var op1 = new JobVertexID();
 
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
 
         var evaluated = Map.of(op1, evaluated(1, 70, 100));
         assertFalse(
                 ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
                         evaluated, evaluated.keySet()));
 
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.4);
         evaluated = Map.of(op1, evaluated(1, 70, 100));
         assertTrue(
                 ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
@@ -178,8 +180,9 @@ public class ScalingExecutorTest {
         var op2 = new JobVertexID();
 
         // All vertices are optional
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
 
         var evaluated =
                 Map.of(
@@ -194,7 +197,8 @@ public class ScalingExecutorTest {
 
         // One vertex is required, and it's within the range.
         // The op2 is optional, so it shouldn't affect the scaling even if it 
is out of range,
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
         evaluated =
                 Map.of(
                         op1, evaluated(1, 65, 100),
@@ -206,15 +210,17 @@ public class ScalingExecutorTest {
     @Test
     public void testNoScaleDownOnZeroLowerUtilizationBoundary() throws 
Exception {
         var conf = context.getConfiguration();
-        // Target utilization and boundary are identical
+        // Utilization min max is set from 0 to 1
         // which will set the scale down boundary to infinity
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.6);
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.2);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.);
 
         var vertex = new JobVertexID();
         int parallelism = 100;
         int expectedParallelism = 1;
         int targetRate = 1000;
+
         // Intentionally also set the true processing rate to infinity
         // to test the boundaries of the scaling condition.
         double trueProcessingRate = Double.POSITIVE_INFINITY;
@@ -249,6 +255,90 @@ public class ScalingExecutorTest {
                         new DelayedScaleDown()));
     }
 
+    @Test
+    public void testUtilizationBoundariesAndUtilizationMinMaxCompatibility() {
+        var conf = context.getConfiguration();
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+        var op1 = new JobVertexID();
+        var op2 = new JobVertexID();
+
+        // All vertices are optional
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+        var evaluated =
+                Map.of(
+                        op1, evaluated(1, 70, 100),
+                        op2, evaluated(1, 85, 100));
+
+        // target boundary 0.1, target 0.6, max 0.7, min 0.5
+        boolean boundaryOp1 =
+                
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, 
Set.of(op1));
+        boolean boundaryOp2 =
+                
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, 
Set.of(op2));
+
+        // Remove target boundary and use min max, should get the same result
+        conf.removeConfig(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.5);
+        boolean minMaxOp1 =
+                
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, 
Set.of(op1));
+        boolean minMaxOp2 =
+                
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, 
Set.of(op2));
+        assertEquals(boundaryOp1, minMaxOp1);
+        assertEquals(boundaryOp2, minMaxOp2);
+
+        // When the target boundary parameter is used,
+        // but the min max parameter is also set,
+        // the min max parameter shall prevail.
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3);
+        evaluated =
+                Map.of(
+                        op1, evaluated(2, 150, 100),
+                        op2, evaluated(1, 85, 100));
+        assertFalse(
+                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
+                        evaluated, evaluated.keySet()));
+
+        // When the target boundary parameter is used,
+        // but the max parameter is also set,
+        conf.removeConfig(AutoScalerOptions.UTILIZATION_MIN);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
+
+        evaluated =
+                Map.of(
+                        op1, evaluated(2, 100, 99999),
+                        op2, evaluated(1, 80, 99999));
+        assertTrue(
+                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
+                        evaluated, evaluated.keySet()));
+
+        evaluated = Map.of(op2, evaluated(1, 85, 100));
+        assertFalse(
+                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
+                        evaluated, evaluated.keySet()));
+
+        conf.removeConfig(AutoScalerOptions.UTILIZATION_MAX);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3);
+
+        evaluated =
+                Map.of(
+                        op1, evaluated(2, 80, 81),
+                        op2, evaluated(1, 100, 101));
+        assertTrue(
+                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
+                        evaluated, evaluated.keySet()));
+
+        evaluated = Map.of(op1, evaluated(1, 80, 79));
+        assertFalse(
+                ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
+                        evaluated, evaluated.keySet()));
+    }
+
     @Test
     public void testVertexesExclusionForScaling() throws Exception {
         var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
@@ -266,7 +356,7 @@ public class ScalingExecutorTest {
 
         var conf = context.getConfiguration();
         conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofSeconds(0));
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, .8);
         var metrics =
                 new EvaluatedMetrics(
                         Map.of(
@@ -720,7 +810,7 @@ public class ScalingExecutorTest {
                                 null));
 
         var conf = context.getConfiguration();
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d);
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.d);
 
         // The expected new parallelism is 7 without adjustment by max 
parallelism.
         var metrics =
@@ -774,8 +864,9 @@ public class ScalingExecutorTest {
         conf.setString("taskmanager.numberOfTaskSlots", "2");
         cpuQuota.ifPresent(v -> conf.set(AutoScalerOptions.CPU_QUOTA, v));
         memoryQuota.ifPresent(v -> conf.set(AutoScalerOptions.MEMORY_QUOTA, 
MemorySize.parse(v)));
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+        conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
 
         testQuotaReached(slotSharingGroupId1, slotSharingGroupId2, 
quotaReached, ctx);
     }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 969d70ee..03e8fce5 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -41,8 +41,7 @@ import java.util.TreeMap;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.PREFER_TRACKED_RESTART_TIME;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.RESTART_TIME;
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
 import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -265,8 +264,9 @@ public class ScalingMetricEvaluatorTest {
     public void testUtilizationBoundaryComputation() {
 
         var conf = new Configuration();
-        conf.set(TARGET_UTILIZATION, 0.8);
-        conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1);
+        conf.set(UTILIZATION_TARGET, 0.8);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
         conf.set(RESTART_TIME, Duration.ofSeconds(1));
         conf.set(CATCH_UP_DURATION, Duration.ZERO);
 
@@ -287,8 +287,9 @@ public class ScalingMetricEvaluatorTest {
     public void testUtilizationBoundaryComputationWithRestartTimesTracking() {
 
         var conf = new Configuration();
-        conf.set(TARGET_UTILIZATION, 0.8);
-        conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1);
+        conf.set(UTILIZATION_TARGET, 0.8);
+        conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
+        conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
         conf.set(RESTART_TIME, Duration.ofMinutes(10));
         conf.set(CATCH_UP_DURATION, Duration.ZERO);
         conf.set(PREFER_TRACKED_RESTART_TIME, true);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index fad4553d..e2b5db85 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -65,6 +65,10 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
+
 /** Default validator implementation for {@link FlinkDeployment}. */
 public class DefaultValidator implements FlinkResourceValidator {
 
@@ -605,9 +609,19 @@ public class DefaultValidator implements 
FlinkResourceValidator {
         return firstPresent(
                 validateNumber(flinkConfiguration, 
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
                 validateNumber(flinkConfiguration, 
AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
-                validateNumber(flinkConfiguration, 
AutoScalerOptions.TARGET_UTILIZATION, 0.0d),
+                validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 
1.0d),
                 validateNumber(
                         flinkConfiguration, 
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
+                validateNumber(
+                        flinkConfiguration,
+                        UTILIZATION_MAX,
+                        flinkConfiguration.get(UTILIZATION_TARGET),
+                        1.0d),
+                validateNumber(
+                        flinkConfiguration,
+                        UTILIZATION_MIN,
+                        0.0d,
+                        flinkConfiguration.get(UTILIZATION_TARGET)),
                 CalendarUtils.validateExcludedPeriods(flinkConfiguration));
     }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 5bf5033a..8caa24eb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -802,22 +802,34 @@ public class DefaultValidatorTest {
         var result =
                 testAutoScalerConfiguration(
                         flinkConf ->
-                                
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6"));
+                                
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-0.6"));
         assertErrorContains(
-                result, 
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d));
+                result, 
getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0d, 1.0d));
     }
 
     @Test
     public void 
testAutoScalerDeploymentWithInvalidNegativeUtilizationBoundary() {
-        var result =
+        var resultMaxUtilization =
                 testAutoScalerConfiguration(
                         flinkConf ->
-                                flinkConf.put(
-                                        
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
-                                        "-0.6"));
+                                
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6"));
+        assertErrorContains(
+                resultMaxUtilization,
+                getFormattedErrorMessage(
+                        AutoScalerOptions.UTILIZATION_MAX,
+                        AutoScalerOptions.UTILIZATION_TARGET.defaultValue(),
+                        1.0));
+
+        var resultMinUtilization =
+                testAutoScalerConfiguration(
+                        flinkConf ->
+                                
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6"));
         assertErrorContains(
-                result,
-                
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+                resultMinUtilization,
+                getFormattedErrorMessage(
+                        AutoScalerOptions.UTILIZATION_MIN,
+                        0.0d,
+                        AutoScalerOptions.UTILIZATION_TARGET.defaultValue()));
     }
 
     @Test
@@ -838,9 +850,9 @@ public class DefaultValidatorTest {
                             
flinkConf.remove(AutoScalerOptions.AUTOSCALER_ENABLED.key());
                             
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
                             
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
-                            
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
-                            flinkConf.put(
-                                    
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6");
                         });
         assertErrorNotContains(result);
     }
@@ -853,9 +865,9 @@ public class DefaultValidatorTest {
                             
flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
                             
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
                             
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
-                            
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
-                            flinkConf.put(
-                                    
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6");
                         });
         assertErrorNotContains(result);
     }
@@ -891,35 +903,59 @@ public class DefaultValidatorTest {
         var result =
                 testSessionJobAutoScalerConfiguration(
                         flinkConf ->
-                                
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6"));
+                                
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-0.6"));
         assertErrorContains(
-                result, 
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d));
+                result, 
getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0d, 1.0));
     }
 
     @Test
     public void testValidateSessionJobWithInvalidNegativeUtilizationBoundary() 
{
-        var result =
+        var resultMaxUtilization =
                 testSessionJobAutoScalerConfiguration(
                         flinkConf ->
-                                flinkConf.put(
-                                        
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
-                                        "-0.6"));
+                                
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6"));
         assertErrorContains(
-                result,
-                
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+                resultMaxUtilization,
+                getFormattedErrorMessage(
+                        AutoScalerOptions.UTILIZATION_MAX,
+                        AutoScalerOptions.UTILIZATION_TARGET.defaultValue(),
+                        1.0d));
+
+        var resultMinUtilization =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf ->
+                                
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6"));
+        assertErrorContains(
+                resultMinUtilization,
+                getFormattedErrorMessage(
+                        AutoScalerOptions.UTILIZATION_MIN,
+                        0.0d,
+                        AutoScalerOptions.UTILIZATION_TARGET.defaultValue()));
     }
 
     @Test
     public void testValidateSessionJobWithInvalidUtilizationBoundary() {
-        var result =
+        var resultMaxUtilization =
                 testSessionJobAutoScalerConfiguration(
                         flinkConf ->
-                                flinkConf.put(
-                                        
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
-                                        "-1.6"));
+                                
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6"));
+        assertErrorContains(
+                resultMaxUtilization,
+                getFormattedErrorMessage(
+                        AutoScalerOptions.UTILIZATION_MAX,
+                        AutoScalerOptions.UTILIZATION_TARGET.defaultValue(),
+                        1.0d));
+
+        var resultMinUtilization =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf ->
+                                
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6"));
         assertErrorContains(
-                result,
-                
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+                resultMinUtilization,
+                getFormattedErrorMessage(
+                        AutoScalerOptions.UTILIZATION_MIN,
+                        0.0,
+                        AutoScalerOptions.UTILIZATION_TARGET.defaultValue()));
     }
 
     @Test
@@ -940,13 +976,100 @@ public class DefaultValidatorTest {
                             
flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
                             
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
                             
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
-                            
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
-                            flinkConf.put(
-                                    
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6");
                         });
         assertErrorNotContains(result);
     }
 
+    @Test
+    public void testAutoScalerUtilizationConfiguration() {
+        var deploymentResult =
+                testAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.4");
+                        });
+        assertErrorContains(
+                deploymentResult,
+                getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_MAX, 
0.5, 1.0));
+
+        deploymentResult =
+                testAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.8");
+                        });
+        assertErrorContains(
+                deploymentResult,
+                getFormattedErrorMessage(
+                        AutoScalerOptions.UTILIZATION_MIN,
+                        0.0,
+                        AutoScalerOptions.UTILIZATION_TARGET.defaultValue()));
+
+        deploymentResult =
+                testAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5");
+                        });
+
+        assertErrorContains(
+                deploymentResult,
+                getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 
0.0, 1.0));
+
+        deploymentResult =
+                testAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.2");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6");
+                        });
+        assertErrorNotContains(deploymentResult);
+
+        var sessionResult =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.4");
+                        });
+        assertErrorContains(
+                sessionResult,
+                getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_MAX, 
0.5, 1.0));
+
+        sessionResult =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6");
+                        });
+        assertErrorContains(
+                sessionResult,
+                getFormattedErrorMessage(
+                        AutoScalerOptions.UTILIZATION_MAX,
+                        AutoScalerOptions.UTILIZATION_TARGET.defaultValue(),
+                        1.0));
+
+        sessionResult =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5");
+                        });
+
+        assertErrorContains(
+                sessionResult,
+                getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 
0.0, 1.0));
+
+        sessionResult =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.2");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5");
+                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6");
+                        });
+        assertErrorNotContains(sessionResult);
+    }
+
     private Optional<String> testSessionJobAutoScalerConfiguration(
             Consumer<Map<String, String>> flinkConfigurationModifier) {
         var sessionCluster = TestUtils.buildSessionCluster();
@@ -972,8 +1095,9 @@ public class DefaultValidatorTest {
         conf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "100000.0");
         conf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "0.6");
         
conf.put(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED.key(), 
"0.1");
-        conf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "0.7");
-        conf.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "0.4");
+        conf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.7");
+        conf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "1.0");
+        conf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3");
         return conf;
     }
 
@@ -986,6 +1110,17 @@ public class DefaultValidatorTest {
                 max != null ? max.toString() : "+Infinity");
     }
 
+    private static String getFormattedNumberOrderErrorMessage(
+            ConfigOption<Double> configValueLeft, ConfigOption<Double> 
configValueRight) {
+        return String.format(
+                "The AutoScalerOption %s or %s is invalid, %s must be less 
than or equal to the value of "
+                        + "%s",
+                configValueLeft.key(),
+                configValueRight.key(),
+                configValueLeft.key(),
+                configValueRight.key());
+    }
+
     private static String getFormattedErrorMessage(ConfigOption<Double> 
configValue, Double min) {
         return getFormattedErrorMessage(configValue, min, null);
     }

Reply via email to