This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 34edcc45 [FLINK-39826][autoscaler] Strengthen autoscaler configuration 
validation (#1129)
34edcc45 is described below

commit 34edcc45851d523e97edb6f5ccc0ceb945a32dd7
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Thu Jun 4 12:25:28 2026 +0300

    [FLINK-39826][autoscaler] Strengthen autoscaler configuration validation 
(#1129)
---
 .../autoscaler/validation/AutoscalerValidator.java |  61 ++++++--
 .../validation/AutoscalerValidatorTest.java        | 159 +++++++++++++++++++++
 .../operator/validation/DefaultValidator.java      |  28 +++-
 .../operator/validation/DefaultValidatorTest.java  | 101 ++++++++-----
 4 files changed, 300 insertions(+), 49 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
index 185d6a20..265449f5 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
@@ -25,11 +25,6 @@ import org.apache.flink.configuration.Configuration;
 
 import java.util.Optional;
 
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
-
 /** Validator for Autoscaler. */
 public class AutoscalerValidator {
 
@@ -41,17 +36,46 @@ public class AutoscalerValidator {
      */
     public Optional<String> validateAutoscalerOptions(Configuration flinkConf) 
{
 
-        if (!flinkConf.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
+        if (!flinkConf.get(AutoScalerOptions.AUTOSCALER_ENABLED)) {
             return Optional.empty();
         }
         return firstPresent(
                 validateNumber(flinkConf, 
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
                 validateNumber(flinkConf, 
AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
-                validateNumber(flinkConf, UTILIZATION_TARGET, 0.0d, 1.0d),
+                validateNumber(flinkConf, 
AutoScalerOptions.UTILIZATION_TARGET, 0.0d, 1.0d),
                 validateNumber(flinkConf, 
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
-                validateNumber(flinkConf, UTILIZATION_MAX, 
flinkConf.get(UTILIZATION_TARGET), 1.0d),
-                validateNumber(flinkConf, UTILIZATION_MIN, 0.0d, 
flinkConf.get(UTILIZATION_TARGET)),
-                validateNumber(flinkConf, 
OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
+                validateNumber(
+                        flinkConf,
+                        AutoScalerOptions.UTILIZATION_MAX,
+                        flinkConf.get(AutoScalerOptions.UTILIZATION_TARGET),
+                        1.0d),
+                validateNumber(
+                        flinkConf,
+                        AutoScalerOptions.UTILIZATION_MIN,
+                        0.0d,
+                        flinkConf.get(AutoScalerOptions.UTILIZATION_TARGET)),
+                validateNumber(flinkConf, 
AutoScalerOptions.GC_PRESSURE_THRESHOLD, 0.0d, 1.0d),
+                validateNumber(flinkConf, 
AutoScalerOptions.HEAP_USAGE_THRESHOLD, 0.0d, 1.0d),
+                // The following options only take effect when their feature 
is enabled, so they are
+                // only validated in that case.
+                validateNumberIfEnabled(
+                        flinkConf,
+                        AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED,
+                        AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN,
+                        0.01d,
+                        1d),
+                validateNumberIfEnabled(
+                        flinkConf,
+                        
AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
+                        AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD,
+                        0.0d,
+                        1.0d),
+                validateNumberIfEnabled(
+                        flinkConf,
+                        AutoScalerOptions.MEMORY_TUNING_ENABLED,
+                        AutoScalerOptions.MEMORY_TUNING_OVERHEAD,
+                        0.0d,
+                        1.0d),
                 CalendarUtils.validateExcludedPeriods(flinkConf));
     }
 
@@ -91,6 +115,23 @@ public class AutoscalerValidator {
         }
     }
 
+    /**
+     * Validates a numeric option only when the feature it belongs to is 
enabled. When the feature
+     * is disabled the option has no effect, so an out-of-range value is 
harmless and is not
+     * reported.
+     */
+    private static <T extends Number> Optional<String> validateNumberIfEnabled(
+            Configuration flinkConfiguration,
+            ConfigOption<Boolean> enabledConfig,
+            ConfigOption<T> autoScalerConfig,
+            Double min,
+            Double max) {
+        if (!flinkConfiguration.get(enabledConfig)) {
+            return Optional.empty();
+        }
+        return validateNumber(flinkConfiguration, autoScalerConfig, min, max);
+    }
+
     private static <T extends Number> Optional<String> validateNumber(
             Configuration flinkConfiguration, ConfigOption<T> 
autoScalerConfig, Double min) {
         return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/validation/AutoscalerValidatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/validation/AutoscalerValidatorTest.java
new file mode 100644
index 00000000..c5c3a328
--- /dev/null
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/validation/AutoscalerValidatorTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.validation;
+
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nullable;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AutoscalerValidator}. */
+class AutoscalerValidatorTest {
+
+    private final AutoscalerValidator validator = new AutoscalerValidator();
+
+    private static Configuration enabledConf() {
+        var conf = new Configuration();
+        conf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+        return conf;
+    }
+
+    @Test
+    void testDisabledAutoscalerSkipsValidation() {
+        // Out-of-range value is ignored when the autoscaler is disabled.
+        var conf = new Configuration();
+        conf.set(AutoScalerOptions.AUTOSCALER_ENABLED, false);
+        conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD, 2.0d);
+
+        assertThat(validator.validateAutoscalerOptions(conf)).isEmpty();
+    }
+
+    @Test
+    void testDefaultsAreValid() {
+        
assertThat(validator.validateAutoscalerOptions(enabledConf())).isEmpty();
+    }
+
+    /**
+     * Numeric options together with the lower/upper bound enforced and, where 
applicable, the
+     * feature flag that must be enabled for the option to be validated.
+     */
+    static Stream<Arguments> boundedOptions() {
+        return Stream.of(
+                // Always validated when the autoscaler is enabled.
+                Arguments.of(AutoScalerOptions.GC_PRESSURE_THRESHOLD, null, 
0.0d, 1.0d),
+                Arguments.of(AutoScalerOptions.HEAP_USAGE_THRESHOLD, null, 
0.0d, 1.0d),
+                // Only validated when their feature is enabled.
+                Arguments.of(
+                        AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD,
+                        
AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
+                        0.0d,
+                        1.0d),
+                Arguments.of(
+                        AutoScalerOptions.MEMORY_TUNING_OVERHEAD,
+                        AutoScalerOptions.MEMORY_TUNING_ENABLED,
+                        0.0d,
+                        1.0d),
+                Arguments.of(
+                        AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN,
+                        AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED,
+                        0.01d,
+                        1.0d));
+    }
+
+    private static Configuration confWithFeatureEnabled(
+            @Nullable ConfigOption<Boolean> enableFlag) {
+        var conf = enabledConf();
+        if (enableFlag != null) {
+            conf.set(enableFlag, true);
+        }
+        return conf;
+    }
+
+    @ParameterizedTest
+    @MethodSource("boundedOptions")
+    void testValueAboveMaxIsRejected(
+            ConfigOption<Double> option,
+            @Nullable ConfigOption<Boolean> enableFlag,
+            double min,
+            double max) {
+        var conf = confWithFeatureEnabled(enableFlag);
+        conf.set(option, max + 0.0001d);
+
+        assertThat(validator.validateAutoscalerOptions(conf))
+                .hasValueSatisfying(error -> 
assertThat(error).contains(option.key()));
+    }
+
+    @ParameterizedTest
+    @MethodSource("boundedOptions")
+    void testValueBelowMinIsRejected(
+            ConfigOption<Double> option,
+            @Nullable ConfigOption<Boolean> enableFlag,
+            double min,
+            double max) {
+        var conf = confWithFeatureEnabled(enableFlag);
+        conf.set(option, min - 0.0001d);
+
+        assertThat(validator.validateAutoscalerOptions(conf))
+                .hasValueSatisfying(error -> 
assertThat(error).contains(option.key()));
+    }
+
+    @ParameterizedTest
+    @MethodSource("boundedOptions")
+    void testBoundaryValuesAreAccepted(
+            ConfigOption<Double> option,
+            @Nullable ConfigOption<Boolean> enableFlag,
+            double min,
+            double max) {
+        var lower = confWithFeatureEnabled(enableFlag);
+        lower.set(option, min);
+        assertThat(validator.validateAutoscalerOptions(lower)).isEmpty();
+
+        var upper = confWithFeatureEnabled(enableFlag);
+        upper.set(option, max);
+        assertThat(validator.validateAutoscalerOptions(upper)).isEmpty();
+    }
+
+    @ParameterizedTest
+    @MethodSource("boundedOptions")
+    void testOutOfRangeIgnoredWhenFeatureDisabled(
+            ConfigOption<Double> option,
+            @Nullable ConfigOption<Boolean> enableFlag,
+            double min,
+            double max) {
+        if (enableFlag == null) {
+            // Not a feature-gated option, nothing to assert here.
+            return;
+        }
+        var conf = enabledConf();
+        // Feature flag left disabled on purpose.
+        conf.set(option, max + 1.0d);
+
+        assertThat(validator.validateAutoscalerOptions(conf)).isEmpty();
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index d0cef03a..046bdac2 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.validation;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.validation.AutoscalerValidator;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -675,6 +676,31 @@ public class DefaultValidator implements 
FlinkResourceValidator {
             return Optional.empty();
         }
         Configuration flinkConfiguration = 
Configuration.fromMap(effectiveConfig);
-        return 
autoscalerValidator.validateAutoscalerOptions(flinkConfiguration);
+        return firstPresent(
+                
autoscalerValidator.validateAutoscalerOptions(flinkConfiguration),
+                validateMetricsWindow(flinkConfiguration));
+    }
+
+    private Optional<String> validateMetricsWindow(Configuration conf) {
+        if (!conf.get(AutoScalerOptions.AUTOSCALER_ENABLED)) {
+            return Optional.empty();
+        }
+        // The autoscaler collects one metric sample per reconcile loop and 
requires at least two
+        // samples within the metric window to evaluate scaling. If the window 
is smaller than the
+        // reconcile interval, the window is trimmed down to a single sample 
on every loop and
+        // autoscaling is never applied.
+        var metricsWindow = conf.get(AutoScalerOptions.METRICS_WINDOW);
+        var reconcileInterval =
+                
conf.get(KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_INTERVAL);
+        if (metricsWindow.compareTo(reconcileInterval) < 0) {
+            return Optional.of(
+                    String.format(
+                            "The autoscaler metric window (%s=%s) must not be 
smaller than the operator reconcile interval (%s=%s), otherwise fewer than two 
metric samples are retained and autoscaling is never applied.",
+                            AutoScalerOptions.METRICS_WINDOW.key(),
+                            metricsWindow,
+                            
KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_INTERVAL.key(),
+                            reconcileInterval));
+        }
+        return Optional.empty();
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 25082625..77d073b1 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -487,15 +487,11 @@ public class DefaultValidatorTest {
                 });
 
         testError(
-                dep -> {
-                    
dep.getSpec().getJobManager().getResource().setEphemeralStorage("abc");
-                },
+                dep -> 
dep.getSpec().getJobManager().getResource().setEphemeralStorage("abc"),
                 "JobManager resource ephemeral storage parse error: Character 
a is neither a decimal digit number, decimal point, nor \"e\" notation 
exponential mark.");
 
         testError(
-                dep -> {
-                    
dep.getSpec().getTaskManager().getResource().setEphemeralStorage("abc");
-                },
+                dep -> 
dep.getSpec().getTaskManager().getResource().setEphemeralStorage("abc"),
                 "TaskManager resource ephemeral storage parse error: Character 
a is neither a decimal digit number, decimal point, nor \"e\" notation 
exponential mark.");
     }
 
@@ -854,9 +850,7 @@ public class DefaultValidatorTest {
                                             
CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
                                                     "test-checkpoint-dir"));
                 },
-                flinkDeployment -> {
-                    flinkDeployment.getSpec().setFlinkConfiguration(Map.of());
-                },
+                flinkDeployment -> 
flinkDeployment.getSpec().setFlinkConfiguration(Map.of()),
                 null);
     }
 
@@ -963,17 +957,34 @@ public class DefaultValidatorTest {
     public void testAutoScalerDeploymentWithInvalidScalingCoefficientMin() {
         var result =
                 testAutoScalerConfiguration(
-                        flinkConf ->
-                                flinkConf.put(
-                                        
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN
-                                                .key(),
-                                        "1.2"));
+                        flinkConf -> {
+                            // The coefficient is only validated when observed 
scalability is on.
+                            flinkConf.put(
+                                    
AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED.key(), "true");
+                            flinkConf.put(
+                                    
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN.key(),
+                                    "1.2");
+                        });
         assertErrorContains(
                 result,
                 getFormattedErrorMessage(
                         
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d));
     }
 
+    @Test
+    public void 
testInvalidScalingCoefficientMinIgnoredWhenObservedScalabilityDisabled() {
+        var result =
+                testAutoScalerConfiguration(
+                        flinkConf -> {
+                            flinkConf.put(
+                                    
AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED.key(), "false");
+                            flinkConf.put(
+                                    
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN.key(),
+                                    "1.2");
+                        });
+        assertErrorNotContains(result);
+    }
+
     @Test
     public void testNonEnabledAutoScalerDeploymentJob() {
         var result =
@@ -1009,6 +1020,37 @@ public class DefaultValidatorTest {
         testSessionJobAutoScalerConfiguration(flinkConf -> 
{}).ifPresent(Assertions::fail);
     }
 
+    @Test
+    public void testMetricsWindowSmallerThanReconcileIntervalIsRejected() {
+        // The default reconcile interval is 60s. A smaller metric window 
means fewer than two
+        // samples are retained per loop and autoscaling never runs, so it 
must be rejected.
+        var result =
+                testAutoScalerConfiguration(
+                        flinkConf -> 
flinkConf.put(AutoScalerOptions.METRICS_WINDOW.key(), "30 s"));
+        Assertions.assertTrue(result.isPresent());
+        
Assertions.assertTrue(result.get().contains(AutoScalerOptions.METRICS_WINDOW.key()));
+    }
+
+    @Test
+    public void testMetricsWindowLargerThanReconcileIntervalIsAccepted() {
+        var result =
+                testAutoScalerConfiguration(
+                        flinkConf ->
+                                
flinkConf.put(AutoScalerOptions.METRICS_WINDOW.key(), "5 min"));
+        assertErrorNotContains(result);
+    }
+
+    @Test
+    public void testSmallMetricsWindowIgnoredWhenAutoscalerDisabled() {
+        var result =
+                testAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
+                            
flinkConf.put(AutoScalerOptions.METRICS_WINDOW.key(), "1 s");
+                        });
+        assertErrorNotContains(result);
+    }
+
     @Test
     public void testValidateSessionJobWithInvalidNegativeScaleDownFactor() {
         var result =
@@ -1130,9 +1172,7 @@ public class DefaultValidatorTest {
 
         deploymentResult =
                 testAutoScalerConfiguration(
-                        flinkConf -> {
-                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.8");
-                        });
+                        flinkConf -> 
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.8"));
         assertErrorContains(
                 deploymentResult,
                 getFormattedErrorMessage(
@@ -1142,9 +1182,8 @@ public class DefaultValidatorTest {
 
         deploymentResult =
                 testAutoScalerConfiguration(
-                        flinkConf -> {
-                            
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5");
-                        });
+                        flinkConf ->
+                                
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5"));
 
         assertErrorContains(
                 deploymentResult,
@@ -1172,9 +1211,7 @@ public class DefaultValidatorTest {
 
         sessionResult =
                 testSessionJobAutoScalerConfiguration(
-                        flinkConf -> {
-                            
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6");
-                        });
+                        flinkConf -> 
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6"));
         assertErrorContains(
                 sessionResult,
                 getFormattedErrorMessage(
@@ -1184,9 +1221,8 @@ public class DefaultValidatorTest {
 
         sessionResult =
                 testSessionJobAutoScalerConfiguration(
-                        flinkConf -> {
-                            
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5");
-                        });
+                        flinkConf ->
+                                
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5"));
 
         assertErrorContains(
                 sessionResult,
@@ -1226,7 +1262,7 @@ public class DefaultValidatorTest {
         conf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "true");
         conf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "100000.0");
         conf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "0.6");
-        
conf.put(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED.key(), 
"0.1");
+        conf.put(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD.key(), 
"0.1");
         conf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.7");
         conf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "1.0");
         conf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3");
@@ -1242,17 +1278,6 @@ public class DefaultValidatorTest {
                 max != null ? max.toString() : "+Infinity");
     }
 
-    private static String getFormattedNumberOrderErrorMessage(
-            ConfigOption<Double> configValueLeft, ConfigOption<Double> 
configValueRight) {
-        return String.format(
-                "The AutoScalerOption %s or %s is invalid, %s must be less 
than or equal to the value of "
-                        + "%s",
-                configValueLeft.key(),
-                configValueRight.key(),
-                configValueLeft.key(),
-                configValueRight.key());
-    }
-
     private static String getFormattedErrorMessage(ConfigOption<Double> 
configValue, Double min) {
         return getFormattedErrorMessage(configValue, min, null);
     }

Reply via email to