This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e905a1b8 [FLINK-33007] Integrate autoscaler config validation into the 
general validator flow
e905a1b8 is described below

commit e905a1b84421710d9de5a886ecab10834cc24364
Author: srpraneeth <[email protected]>
AuthorDate: Mon Oct 23 23:47:40 2023 -0700

    [FLINK-33007] Integrate autoscaler config validation into the general 
validator flow
---
 e2e-tests/data/autoscaler.yaml                     |   6 +
 .../operator/validation/DefaultValidator.java      |  57 +++++-
 .../operator/validation/DefaultValidatorTest.java  | 215 +++++++++++++++++++++
 3 files changed, 276 insertions(+), 2 deletions(-)

diff --git a/e2e-tests/data/autoscaler.yaml b/e2e-tests/data/autoscaler.yaml
index 6f53bbe9..9deebe92 100644
--- a/e2e-tests/data/autoscaler.yaml
+++ b/e2e-tests/data/autoscaler.yaml
@@ -41,6 +41,12 @@ spec:
     job.autoscaler.stabilization.interval: "5s"
     job.autoscaler.metrics.window: "1m"
 
+#    Invalid Validations for testing autoscaler configurations
+#    kubernetes.operator.job.autoscaler.scale-down.max-factor: "-0.6"
+#    kubernetes.operator.job.autoscaler.scale-up.max-factor: "-1.0"
+#    kubernetes.operator.job.autoscaler.target.utilization: "-10.7"
+#    kubernetes.operator.job.autoscaler.target.utilization.boundary: "-Hundred"
+
     jobmanager.scheduler: adaptive
   serviceAccount: flink
   podTemplate:
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index bce7680f..37dd169a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -17,7 +17,9 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -60,6 +62,7 @@ import java.util.regex.Pattern;
 
 /** Default validator implementation for {@link FlinkDeployment}. */
 public class DefaultValidator implements FlinkResourceValidator {
+
     private static final Pattern DEPLOYMENT_NAME_PATTERN =
             Pattern.compile("[a-z]([-a-z\\d]{0,43}[a-z\\d])?");
     private static final String[] FORBIDDEN_CONF_KEYS =
@@ -102,7 +105,8 @@ public class DefaultValidator implements 
FlinkResourceValidator {
                 validateJmSpec(spec.getJobManager(), effectiveConfig),
                 validateTmSpec(spec.getTaskManager(), effectiveConfig),
                 validateSpecChange(deployment, effectiveConfig),
-                validateServiceAccount(spec.getServiceAccount()));
+                validateServiceAccount(spec.getServiceAccount()),
+                validateAutoScalerFlinkConfiguration(effectiveConfig));
     }
 
     @SafeVarargs
@@ -487,7 +491,8 @@ public class DefaultValidator implements 
FlinkResourceValidator {
         return firstPresent(
                 validateNotApplicationCluster(sessionCluster),
                 validateSessionClusterId(sessionJob, sessionCluster),
-                validateJobSpec(sessionJob.getSpec().getJob(), null, 
effectiveConfig));
+                validateJobSpec(sessionJob.getSpec().getJob(), null, 
effectiveConfig),
+                validateAutoScalerFlinkConfiguration(effectiveConfig));
     }
 
     private Optional<String> validateJobNotEmpty(FlinkSessionJob sessionJob) {
@@ -554,4 +559,52 @@ public class DefaultValidator implements 
FlinkResourceValidator {
         }
         return Optional.empty();
     }
+
+    public static Optional<String> validateAutoScalerFlinkConfiguration(
+            Map<String, String> effectiveConfig) {
+        if (effectiveConfig == null) {
+            return Optional.empty();
+        }
+        Configuration flinkConfiguration = 
Configuration.fromMap(effectiveConfig);
+        if 
(!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
+            return Optional.empty();
+        }
+        return firstPresent(
+                validateNumber(flinkConfiguration, 
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
+                validateNumber(flinkConfiguration, 
AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
+                validateNumber(flinkConfiguration, 
AutoScalerOptions.TARGET_UTILIZATION, 0.0d),
+                validateNumber(
+                        flinkConfiguration, 
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+    }
+
+    private static <T extends Number> Optional<String> validateNumber(
+            Configuration flinkConfiguration,
+            ConfigOption<T> autoScalerConfig,
+            Double min,
+            Double max) {
+        try {
+            var configValue = flinkConfiguration.get(autoScalerConfig);
+            if (configValue != null) {
+                double value = configValue.doubleValue();
+                if ((min != null && value < min) || (max != null && value > 
max)) {
+                    return Optional.of(
+                            String.format(
+                                    "The AutoScalerOption %s is invalid, it 
should be a value within the range [%s, %s]",
+                                    autoScalerConfig.key(),
+                                    min != null ? min.toString() : "-Infinity",
+                                    max != null ? max.toString() : 
"+Infinity"));
+                }
+            }
+            return Optional.empty();
+        } catch (IllegalArgumentException e) {
+            return Optional.of(
+                    String.format(
+                            "Invalid value in the autoscaler config %s", 
autoScalerConfig.key()));
+        }
+    }
+
+    private static <T extends Number> Optional<String> validateNumber(
+            Configuration flinkConfiguration, ConfigOption<T> 
autoScalerConfig, Double min) {
+        return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 506f15d3..8a3a802a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -684,4 +686,217 @@ public class DefaultValidatorTest {
             }
         }
     }
+
+    @Test
+    public void testAutoScalerDeployment() {
+        testAutoScalerConfiguration(flinkConf -> 
{}).ifPresent(Assertions::fail);
+    }
+
+    @Test
+    public void testAutoScalerDeploymentWithInvalidNegativeScaleDownFactor() {
+        var result =
+                testAutoScalerConfiguration(
+                        flinkConf ->
+                                flinkConf.put(
+                                        
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-0.1"));
+        assertErrorContains(
+                result, 
getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d));
+    }
+
+    @Test
+    public void testAutoScalerDeploymentWithInvalidNegativeScaleUpFactor() {
+        var result =
+                testAutoScalerConfiguration(
+                        flinkConf ->
+                                
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-0.1"));
+        assertErrorContains(
+                result, 
getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d));
+    }
+
+    @Test
+    public void testAutoScalerDeploymentWithInvalidNegativeUtilization() {
+        var result =
+                testAutoScalerConfiguration(
+                        flinkConf ->
+                                
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6"));
+        assertErrorContains(
+                result, 
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d));
+    }
+
+    @Test
+    public void 
testAutoScalerDeploymentWithInvalidNegativeUtilizationBoundary() {
+        var result =
+                testAutoScalerConfiguration(
+                        flinkConf ->
+                                flinkConf.put(
+                                        
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
+                                        "-0.6"));
+        assertErrorContains(
+                result,
+                
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+    }
+
+    @Test
+    public void testNonEnabledAutoScalerDeploymentJob() {
+        var result =
+                testAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.remove(AutoScalerOptions.AUTOSCALER_ENABLED.key());
+                            
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
+                            flinkConf.put(
+                                    
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+                        });
+        assertErrorNotContains(result);
+    }
+
+    @Test
+    public void testDisabledEnabledAutoScalerDeploymentJob() {
+        var result =
+                testAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
+                            
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
+                            flinkConf.put(
+                                    
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+                        });
+        assertErrorNotContains(result);
+    }
+
+    @Test
+    public void testValidateSessionJob() {
+        testSessionJobAutoScalerConfiguration(flinkConf -> 
{}).ifPresent(Assertions::fail);
+    }
+
+    @Test
+    public void testValidateSessionJobWithInvalidNegativeScaleDownFactor() {
+        var result =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf ->
+                                flinkConf.put(
+                                        
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-0.1"));
+        assertErrorContains(
+                result, 
getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d));
+    }
+
+    @Test
+    public void testValidateSessionJobWithInvalidNegativeScaleUpFactor() {
+        var result =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf ->
+                                
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-0.1"));
+        assertErrorContains(
+                result, 
getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d));
+    }
+
+    @Test
+    public void testValidateSessionJobWithInvalidNegativeUtilization() {
+        var result =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf ->
+                                
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6"));
+        assertErrorContains(
+                result, 
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d));
+    }
+
+    @Test
+    public void testValidateSessionJobWithInvalidNegativeUtilizationBoundary() 
{
+        var result =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf ->
+                                flinkConf.put(
+                                        
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
+                                        "-0.6"));
+        assertErrorContains(
+                result,
+                
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+    }
+
+    @Test
+    public void testValidateSessionJobWithInvalidUtilizationBoundary() {
+        var result =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf ->
+                                flinkConf.put(
+                                        
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
+                                        "-1.6"));
+        assertErrorContains(
+                result,
+                
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+    }
+
+    @Test
+    public void testNonEnabledAutoScalerSessionJob() {
+        var result =
+                testSessionJobAutoScalerConfiguration(
+                        flinkConf -> {
+                            
flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
+                            
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
+                            
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
+                            flinkConf.put(
+                                    
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+                        });
+        assertErrorNotContains(result);
+    }
+
+    private Optional<String> testSessionJobAutoScalerConfiguration(
+            Consumer<Map<String, String>> flinkConfigurationModifier) {
+        var sessionCluster = TestUtils.buildSessionCluster();
+        var sessionJob = TestUtils.buildSessionJob();
+        var flinkConfiguration = 
getDefaultTestAutoScalerFlinkConfigurationMap();
+        flinkConfigurationModifier.accept(flinkConfiguration);
+        sessionCluster.getSpec().setFlinkConfiguration(flinkConfiguration);
+        return validator.validateSessionJob(sessionJob, 
Optional.of(sessionCluster));
+    }
+
+    public Optional<String> testAutoScalerConfiguration(
+            Consumer<Map<String, String>> flinkConfigurationModifier) {
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        var flinkConfiguration = 
getDefaultTestAutoScalerFlinkConfigurationMap();
+        flinkConfigurationModifier.accept(flinkConfiguration);
+        deployment.getSpec().setFlinkConfiguration(flinkConfiguration);
+        return validator.validateDeployment(deployment);
+    }
+
+    private Map<String, String> 
getDefaultTestAutoScalerFlinkConfigurationMap() {
+        Map<String, String> conf = new HashMap<>();
+        conf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "true");
+        conf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "100000.0");
+        conf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "0.6");
+        
conf.put(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED.key(), 
"0.1");
+        conf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "0.7");
+        conf.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "0.4");
+        return conf;
+    }
+
+    private static String getFormattedErrorMessage(
+            ConfigOption<Double> configValue, Double min, Double max) {
+        return String.format(
+                "The AutoScalerOption %s is invalid, it should be a value 
within the range [%s, %s]",
+                configValue.key(),
+                min != null ? min.toString() : "-Infinity",
+                max != null ? max.toString() : "+Infinity");
+    }
+
+    private static String getFormattedErrorMessage(ConfigOption<Double> 
configValue, Double min) {
+        return getFormattedErrorMessage(configValue, min, null);
+    }
+
+    private static void assertErrorContains(Optional<String> result, String 
error) {
+        if (result.isEmpty()) {
+            Assertions.fail("Invalid Configuration not caught in the tests");
+        } else {
+            Assertions.assertEquals(error, result.get());
+        }
+    }
+
+    private static void assertErrorNotContains(Optional<String> result) {
+        if (result.isPresent()) {
+            Assertions.fail("Invalid Configuration not caught in the tests");
+        }
+    }
 }

Reply via email to