This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 66f7b42f [FLINK-36836][Autoscaler] Supports config the upper and lower limits of target utilization (#921) 66f7b42f is described below commit 66f7b42fb4358a0ac407c498912fcd39aa0be996 Author: big face cat <731030...@qq.com> AuthorDate: Tue Jan 14 10:24:37 2025 +0800 [FLINK-36836][Autoscaler] Supports config the upper and lower limits of target utilization (#921) --- .../generated/auto_scaler_configuration.html | 18 +- .../apache/flink/autoscaler/JobVertexScaler.java | 4 +- .../flink/autoscaler/ScalingMetricEvaluator.java | 16 +- .../flink/autoscaler/config/AutoScalerOptions.java | 24 ++- .../flink/autoscaler/BacklogBasedScalingTest.java | 5 +- .../flink/autoscaler/JobVertexScalerTest.java | 33 ++-- .../MetricsCollectionAndEvaluationTest.java | 10 +- .../autoscaler/RecommendedParallelismTest.java | 5 +- .../flink/autoscaler/ScalingExecutorTest.java | 117 ++++++++++-- .../autoscaler/ScalingMetricEvaluatorTest.java | 13 +- .../operator/validation/DefaultValidator.java | 16 +- .../operator/validation/DefaultValidatorTest.java | 201 +++++++++++++++++---- 12 files changed, 369 insertions(+), 93 deletions(-) diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html index 09baf217..ab2bbcb2 100644 --- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html +++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html @@ -201,16 +201,22 @@ <td>Stabilization period in which no new scaling will be executed</td> </tr> <tr> - <td><h5>job.autoscaler.target.utilization</h5></td> - <td style="word-wrap: break-word;">0.7</td> + <td><h5>job.autoscaler.utilization.max</h5></td> + <td style="word-wrap: break-word;">(none)</td> <td>Double</td> - <td>Target vertex utilization</td> + <td>Max vertex utilization</td> </tr> <tr> - <td><h5>job.autoscaler.target.utilization.boundary</h5></td> - <td style="word-wrap: break-word;">0.3</td> + <td><h5>job.autoscaler.utilization.min</h5></td> + <td style="word-wrap: break-word;">(none)</td> <td>Double</td> - <td>Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]</td> + <td>Min vertex utilization</td> + </tr> + <tr> + <td><h5>job.autoscaler.utilization.target</h5></td> + <td style="word-wrap: break-word;">0.7</td> + <td>Double</td> + <td>Target vertex utilization</td> </tr> <tr> <td><h5>job.autoscaler.vertex.exclude.ids</h5></td> diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 40f25c77..41075b7f 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -49,7 +49,7 @@ import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_ import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; @@ -158,7 +158,7 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> { double targetCapacity = AutoScalerUtils.getTargetProcessingCapacity( - evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true, restartTime); + evaluatedMetrics, conf, conf.get(UTILIZATION_TARGET), true, restartTime); if (Double.isNaN(targetCapacity)) { LOG.warn( "Target data rate is not available for {}, cannot compute new parallelism", diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index 5bbc09a3..58c5dbe4 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -44,8 +44,10 @@ import java.util.Optional; import java.util.SortedMap; import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO; @@ -284,8 +286,8 @@ public class ScalingMetricEvaluator { boolean processingBacklog, Duration restartTime) { - double utilizationBoundary = conf.getDouble(TARGET_UTILIZATION_BOUNDARY); - double targetUtilization = conf.get(TARGET_UTILIZATION); + double targetUtilization = conf.get(UTILIZATION_TARGET); + double utilizationBoundary = conf.get(TARGET_UTILIZATION_BOUNDARY); double upperUtilization; double lowerUtilization; @@ -296,8 +298,12 @@ public class ScalingMetricEvaluator { upperUtilization = 1.0; lowerUtilization = 0.0; } else { - upperUtilization = targetUtilization + utilizationBoundary; - lowerUtilization = targetUtilization - utilizationBoundary; + upperUtilization = + conf.getOptional(UTILIZATION_MAX) + .orElse(targetUtilization + utilizationBoundary); + lowerUtilization = + conf.getOptional(UTILIZATION_MIN) + .orElse(targetUtilization - utilizationBoundary); } double scaleUpThreshold = diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index a5ffb0f9..980db2f4 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -89,13 +89,17 @@ public class AutoScalerOptions { + "seconds suffix, daily expression's formation is startTime-endTime, such as 9:30:30-10:50:20, when exclude from 9:30:30-10:50:20 in Monday and Thursday " + "we can express it as 9:30:30-10:50:20 && * * * ? * 2,5"); - public static final ConfigOption<Double> TARGET_UTILIZATION = - autoScalerConfig("target.utilization") + public static final ConfigOption<Double> UTILIZATION_TARGET = + autoScalerConfig("utilization.target") .doubleType() .defaultValue(0.7) - .withFallbackKeys(oldOperatorConfigKey("target.utilization")) + .withDeprecatedKeys(autoScalerConfigKey("target.utilization")) + .withFallbackKeys( + oldOperatorConfigKey("utilization.target"), + oldOperatorConfigKey("target.utilization")) .withDescription("Target vertex utilization"); + @Deprecated public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() @@ -104,6 +108,20 @@ public class AutoScalerOptions { .withDescription( "Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]"); + public static final ConfigOption<Double> UTILIZATION_MAX = + autoScalerConfig("utilization.max") + .doubleType() + .noDefaultValue() + .withFallbackKeys(oldOperatorConfigKey("utilization.max")) + .withDescription("Max vertex utilization"); + + public static final ConfigOption<Double> UTILIZATION_MIN = + autoScalerConfig("utilization.min") + .doubleType() + .noDefaultValue() + .withFallbackKeys(oldOperatorConfigKey("utilization.min")) + .withDescription("Min vertex utilization"); + public static final ConfigOption<Duration> SCALE_DOWN_INTERVAL = autoScalerConfig("scale-down.interval") .durationType() diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java index 0aae09c9..4599d634 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java @@ -95,8 +95,9 @@ public class BacklogBasedScalingTest { defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true); defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE); - defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8); - defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1); + defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8); + defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9); + defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7); defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); defaultConf.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD, Duration.ofSeconds(1)); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index 3c557ab3..af01ce34 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -49,6 +49,7 @@ import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_MESSAGE_FO import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING; import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT; import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -98,7 +99,7 @@ public class JobVertexScalerTest { @MethodSource("adjustmentInputsProvider") public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies) { var op = new JobVertexID(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); var delayedScaleDown = new DelayedScaleDown(); @@ -113,7 +114,7 @@ public class JobVertexScalerTest { restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); + conf.set(UTILIZATION_TARGET, .8); assertEquals( ParallelismChange.build(8), vertexScaler.computeScaleTargetParallelism( @@ -125,7 +126,7 @@ public class JobVertexScalerTest { restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); + conf.set(UTILIZATION_TARGET, .8); assertEquals( ParallelismChange.noChange(), vertexScaler.computeScaleTargetParallelism( @@ -137,7 +138,7 @@ public class JobVertexScalerTest { restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); + conf.set(UTILIZATION_TARGET, .8); assertEquals( ParallelismChange.build(8), vertexScaler.computeScaleTargetParallelism( @@ -160,7 +161,7 @@ public class JobVertexScalerTest { restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5); + conf.set(UTILIZATION_TARGET, 0.5); assertEquals( ParallelismChange.build(10), vertexScaler.computeScaleTargetParallelism( @@ -172,7 +173,7 @@ public class JobVertexScalerTest { restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); + conf.set(UTILIZATION_TARGET, 0.6); assertEquals( ParallelismChange.build(4), vertexScaler.computeScaleTargetParallelism( @@ -184,7 +185,7 @@ public class JobVertexScalerTest { restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5); assertEquals( ParallelismChange.build(5), @@ -209,7 +210,7 @@ public class JobVertexScalerTest { restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5); assertEquals( ParallelismChange.build(15), @@ -558,7 +559,7 @@ public class JobVertexScalerTest { @Test public void testMaxParallelismLimitIsUsed() { conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); var delayedScaleDown = new DelayedScaleDown(); assertEquals( @@ -587,7 +588,7 @@ public class JobVertexScalerTest { @Test public void testDisableScaleDownInterval() { - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(0)); var delayedScaleDown = new DelayedScaleDown(); @@ -597,7 +598,7 @@ public class JobVertexScalerTest { @Test public void testScaleDownAfterInterval() { - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1)); var instant = Instant.now(); @@ -629,7 +630,7 @@ public class JobVertexScalerTest { @Test public void testImmediateScaleUpWithinScaleDownInterval() { - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1)); var instant = Instant.now(); @@ -655,7 +656,7 @@ public class JobVertexScalerTest { @Test public void testCancelDelayedScaleDownAfterNewParallelismIsSame() { - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1)); var instant = Instant.now(); @@ -701,7 +702,7 @@ public class JobVertexScalerTest { public void testIneffectiveScalingDetection() { var op = new JobVertexID(); conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); var evaluated = evaluated(5, 100, 50); @@ -826,7 +827,7 @@ public class JobVertexScalerTest { public void testSendingIneffectiveScalingEvents(Collection<ShipStrategy> inputShipStrategies) { var jobVertexID = new JobVertexID(); conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0); + conf.set(UTILIZATION_TARGET, 1.0); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); var evaluated = evaluated(5, 100, 50); @@ -1082,7 +1083,7 @@ public class JobVertexScalerTest { @Test public void testSendingScalingLimitedEvents() { var jobVertexID = new JobVertexID(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0); + conf.set(UTILIZATION_TARGET, 1.0); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO); var evaluated = evaluated(10, 200, 100); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index 102586a3..ad2f5c72 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -123,8 +123,9 @@ public class MetricsCollectionAndEvaluationTest { @Test public void testEndToEnd() throws Exception { var conf = context.getConfiguration(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.); setDefaultMetrics(metricsCollector); @@ -344,8 +345,9 @@ public class MetricsCollectionAndEvaluationTest { @Test public void testClearHistoryOnTopoChange() throws Exception { var conf = context.getConfiguration(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.); setDefaultMetrics(metricsCollector); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java index 4b35d72a..8637e126 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java @@ -86,8 +86,9 @@ public class RecommendedParallelismTest { defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true); defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE); - defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8); - defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1); + defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8); + defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9); + defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7); defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); autoscaler = diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index d3fc1674..a3bc6746 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -128,15 +128,17 @@ public class ScalingExecutorTest { var op1 = new JobVertexID(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6); var evaluated = Map.of(op1, evaluated(1, 70, 100)); assertFalse( ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.4); evaluated = Map.of(op1, evaluated(1, 70, 100)); assertTrue( ScalingExecutor.allChangedVerticesWithinUtilizationTarget( @@ -178,8 +180,9 @@ public class ScalingExecutorTest { var op2 = new JobVertexID(); // All vertices are optional - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6); var evaluated = Map.of( @@ -194,7 +197,8 @@ public class ScalingExecutorTest { // One vertex is required, and it's within the range. // The op2 is optional, so it shouldn't affect the scaling even if it is out of range, - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6); evaluated = Map.of( op1, evaluated(1, 65, 100), @@ -206,15 +210,17 @@ public class ScalingExecutorTest { @Test public void testNoScaleDownOnZeroLowerUtilizationBoundary() throws Exception { var conf = context.getConfiguration(); - // Target utilization and boundary are identical + // Utilization min max is set from 0 to 1 // which will set the scale down boundary to infinity - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.2); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.); var vertex = new JobVertexID(); int parallelism = 100; int expectedParallelism = 1; int targetRate = 1000; + // Intentionally also set the true processing rate to infinity // to test the boundaries of the scaling condition. double trueProcessingRate = Double.POSITIVE_INFINITY; @@ -249,6 +255,90 @@ public class ScalingExecutorTest { new DelayedScaleDown())); } + @Test + public void testUtilizationBoundariesAndUtilizationMinMaxCompatibility() { + var conf = context.getConfiguration(); + conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO); + conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO); + var op1 = new JobVertexID(); + var op2 = new JobVertexID(); + + // All vertices are optional + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1); + var evaluated = + Map.of( + op1, evaluated(1, 70, 100), + op2, evaluated(1, 85, 100)); + + // target boundary 0.1, target 0.6, max 0.7, min 0.5 + boolean boundaryOp1 = + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1)); + boolean boundaryOp2 = + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op2)); + + // Remove target boundary and use min max, should get the same result + conf.removeConfig(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.5); + boolean minMaxOp1 = + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1)); + boolean minMaxOp2 = + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op2)); + assertEquals(boundaryOp1, minMaxOp1); + assertEquals(boundaryOp2, minMaxOp2); + + // When the target boundary parameter is used, + // but the min max parameter is also set, + // the min max parameter shall prevail. + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3); + evaluated = + Map.of( + op1, evaluated(2, 150, 100), + op2, evaluated(1, 85, 100)); + assertFalse( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( + evaluated, evaluated.keySet())); + + // When the target boundary parameter is used, + // but the max parameter is also set, + conf.removeConfig(AutoScalerOptions.UTILIZATION_MIN); + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6); + + evaluated = + Map.of( + op1, evaluated(2, 100, 99999), + op2, evaluated(1, 80, 99999)); + assertTrue( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( + evaluated, evaluated.keySet())); + + evaluated = Map.of(op2, evaluated(1, 85, 100)); + assertFalse( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( + evaluated, evaluated.keySet())); + + conf.removeConfig(AutoScalerOptions.UTILIZATION_MAX); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3); + + evaluated = + Map.of( + op1, evaluated(2, 80, 81), + op2, evaluated(1, 100, 101)); + assertTrue( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( + evaluated, evaluated.keySet())); + + evaluated = Map.of(op1, evaluated(1, 80, 79)); + assertFalse( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( + evaluated, evaluated.keySet())); + } + @Test public void testVertexesExclusionForScaling() throws Exception { var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a"; @@ -266,7 +356,7 @@ public class ScalingExecutorTest { var conf = context.getConfiguration(); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofSeconds(0)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, .8); var metrics = new EvaluatedMetrics( Map.of( @@ -720,7 +810,7 @@ public class ScalingExecutorTest { null)); var conf = context.getConfiguration(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.d); // The expected new parallelism is 7 without adjustment by max parallelism. var metrics = @@ -774,8 +864,9 @@ public class ScalingExecutorTest { conf.setString("taskmanager.numberOfTaskSlots", "2"); cpuQuota.ifPresent(v -> conf.set(AutoScalerOptions.CPU_QUOTA, v)); memoryQuota.ifPresent(v -> conf.set(AutoScalerOptions.MEMORY_QUOTA, MemorySize.parse(v))); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6); testQuotaReached(slotSharingGroupId1, slotSharingGroupId2, quotaReached, ctx); } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java index 969d70ee..03e8fce5 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java @@ -41,8 +41,7 @@ import java.util.TreeMap; import static org.apache.flink.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION; import static org.apache.flink.autoscaler.config.AutoScalerOptions.PREFER_TRACKED_RESTART_TIME; import static org.apache.flink.autoscaler.config.AutoScalerOptions.RESTART_TIME; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -265,8 +264,9 @@ public class ScalingMetricEvaluatorTest { public void testUtilizationBoundaryComputation() { var conf = new Configuration(); - conf.set(TARGET_UTILIZATION, 0.8); - conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1); + conf.set(UTILIZATION_TARGET, 0.8); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7); conf.set(RESTART_TIME, Duration.ofSeconds(1)); conf.set(CATCH_UP_DURATION, Duration.ZERO); @@ -287,8 +287,9 @@ public class ScalingMetricEvaluatorTest { public void testUtilizationBoundaryComputationWithRestartTimesTracking() { var conf = new Configuration(); - conf.set(TARGET_UTILIZATION, 0.8); - conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1); + conf.set(UTILIZATION_TARGET, 0.8); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7); conf.set(RESTART_TIME, Duration.ofMinutes(10)); conf.set(CATCH_UP_DURATION, Duration.ZERO); conf.set(PREFER_TRACKED_RESTART_TIME, true); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java index fad4553d..e2b5db85 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java @@ -65,6 +65,10 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; + /** Default validator implementation for {@link FlinkDeployment}. */ public class DefaultValidator implements FlinkResourceValidator { @@ -605,9 +609,19 @@ public class DefaultValidator implements FlinkResourceValidator { return firstPresent( validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d), validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d), - validateNumber(flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION, 0.0d), + validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 1.0d), validateNumber( flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d), + validateNumber( + flinkConfiguration, + UTILIZATION_MAX, + flinkConfiguration.get(UTILIZATION_TARGET), + 1.0d), + validateNumber( + flinkConfiguration, + UTILIZATION_MIN, + 0.0d, + flinkConfiguration.get(UTILIZATION_TARGET)), CalendarUtils.validateExcludedPeriods(flinkConfiguration)); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java index 5bf5033a..8caa24eb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java @@ -802,22 +802,34 @@ public class DefaultValidatorTest { var result = testAutoScalerConfiguration( flinkConf -> - flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6")); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-0.6")); assertErrorContains( - result, getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d)); + result, getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0d, 1.0d)); } @Test public void testAutoScalerDeploymentWithInvalidNegativeUtilizationBoundary() { - var result = + var resultMaxUtilization = testAutoScalerConfiguration( flinkConf -> - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), - "-0.6")); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6")); + assertErrorContains( + resultMaxUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MAX, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue(), + 1.0)); + + var resultMinUtilization = + testAutoScalerConfiguration( + flinkConf -> + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6")); assertErrorContains( - result, - getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d)); + resultMinUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MIN, + 0.0d, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue())); } @Test @@ -838,9 +850,9 @@ public class DefaultValidatorTest { flinkConf.remove(AutoScalerOptions.AUTOSCALER_ENABLED.key()); flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6"); flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6"); - flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6"); - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6"); }); assertErrorNotContains(result); } @@ -853,9 +865,9 @@ public class DefaultValidatorTest { flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false"); flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6"); flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6"); - flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6"); - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6"); }); assertErrorNotContains(result); } @@ -891,35 +903,59 @@ public class DefaultValidatorTest { var result = testSessionJobAutoScalerConfiguration( flinkConf -> - flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6")); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-0.6")); assertErrorContains( - result, getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d)); + result, getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0d, 1.0)); } @Test public void testValidateSessionJobWithInvalidNegativeUtilizationBoundary() { - var result = + var resultMaxUtilization = testSessionJobAutoScalerConfiguration( flinkConf -> - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), - "-0.6")); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6")); assertErrorContains( - result, - getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d)); + resultMaxUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MAX, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue(), + 1.0d)); + + var resultMinUtilization = + testSessionJobAutoScalerConfiguration( + flinkConf -> + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6")); + assertErrorContains( + resultMinUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MIN, + 0.0d, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue())); } @Test public void testValidateSessionJobWithInvalidUtilizationBoundary() { - var result = + var resultMaxUtilization = testSessionJobAutoScalerConfiguration( flinkConf -> - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), - "-1.6")); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6")); + assertErrorContains( + resultMaxUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MAX, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue(), + 1.0d)); + + var resultMinUtilization = + testSessionJobAutoScalerConfiguration( + flinkConf -> + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6")); assertErrorContains( - result, - getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d)); + resultMinUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MIN, + 0.0, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue())); } @Test @@ -940,13 +976,100 @@ public class DefaultValidatorTest { flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false"); flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6"); flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6"); - flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6"); - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6"); }); assertErrorNotContains(result); } + @Test + public void testAutoScalerUtilizationConfiguration() { + var deploymentResult = + testAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.4"); + }); + assertErrorContains( + deploymentResult, + getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_MAX, 0.5, 1.0)); + + deploymentResult = + testAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.8"); + }); + assertErrorContains( + deploymentResult, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MIN, + 0.0, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue())); + + deploymentResult = + testAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5"); + }); + + assertErrorContains( + deploymentResult, + getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0, 1.0)); + + deploymentResult = + testAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.2"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6"); + }); + assertErrorNotContains(deploymentResult); + + var sessionResult = + testSessionJobAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.4"); + }); + assertErrorContains( + sessionResult, + getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_MAX, 0.5, 1.0)); + + sessionResult = + testSessionJobAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6"); + }); + assertErrorContains( + sessionResult, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MAX, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue(), + 1.0)); + + sessionResult = + testSessionJobAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5"); + }); + + assertErrorContains( + sessionResult, + getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0, 1.0)); + + sessionResult = + testSessionJobAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.2"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6"); + }); + assertErrorNotContains(sessionResult); + } + private Optional<String> testSessionJobAutoScalerConfiguration( Consumer<Map<String, String>> flinkConfigurationModifier) { var sessionCluster = TestUtils.buildSessionCluster(); @@ -972,8 +1095,9 @@ public class DefaultValidatorTest { conf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "100000.0"); conf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "0.6"); conf.put(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED.key(), "0.1"); - conf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "0.7"); - conf.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "0.4"); + conf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.7"); + conf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "1.0"); + conf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3"); return conf; } @@ -986,6 +1110,17 @@ public class DefaultValidatorTest { max != null ? max.toString() : "+Infinity"); } + private static String getFormattedNumberOrderErrorMessage( + ConfigOption<Double> configValueLeft, ConfigOption<Double> configValueRight) { + return String.format( + "The AutoScalerOption %s or %s is invalid, %s must be less than or equal to the value of " + + "%s", + configValueLeft.key(), + configValueRight.key(), + configValueLeft.key(), + configValueRight.key()); + } + private static String getFormattedErrorMessage(ConfigOption<Double> configValue, Double min) { return getFormattedErrorMessage(configValue, min, null); }