This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new e905a1b8 [FLINK-33007] Integrate autoscaler config validation into the
general validator flow
e905a1b8 is described below
commit e905a1b84421710d9de5a886ecab10834cc24364
Author: srpraneeth <[email protected]>
AuthorDate: Mon Oct 23 23:47:40 2023 -0700
[FLINK-33007] Integrate autoscaler config validation into the general
validator flow
---
e2e-tests/data/autoscaler.yaml | 6 +
.../operator/validation/DefaultValidator.java | 57 +++++-
.../operator/validation/DefaultValidatorTest.java | 215 +++++++++++++++++++++
3 files changed, 276 insertions(+), 2 deletions(-)
diff --git a/e2e-tests/data/autoscaler.yaml b/e2e-tests/data/autoscaler.yaml
index 6f53bbe9..9deebe92 100644
--- a/e2e-tests/data/autoscaler.yaml
+++ b/e2e-tests/data/autoscaler.yaml
@@ -41,6 +41,12 @@ spec:
job.autoscaler.stabilization.interval: "5s"
job.autoscaler.metrics.window: "1m"
+# Invalid Validations for testing autoscaler configurations
+# kubernetes.operator.job.autoscaler.scale-down.max-factor: "-0.6"
+# kubernetes.operator.job.autoscaler.scale-up.max-factor: "-1.0"
+# kubernetes.operator.job.autoscaler.target.utilization: "-10.7"
+# kubernetes.operator.job.autoscaler.target.utilization.boundary: "-Hundred"
+
jobmanager.scheduler: adaptive
serviceAccount: flink
podTemplate:
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index bce7680f..37dd169a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -17,7 +17,9 @@
package org.apache.flink.kubernetes.operator.validation;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
@@ -60,6 +62,7 @@ import java.util.regex.Pattern;
/** Default validator implementation for {@link FlinkDeployment}. */
public class DefaultValidator implements FlinkResourceValidator {
+
private static final Pattern DEPLOYMENT_NAME_PATTERN =
Pattern.compile("[a-z]([-a-z\\d]{0,43}[a-z\\d])?");
private static final String[] FORBIDDEN_CONF_KEYS =
@@ -102,7 +105,8 @@ public class DefaultValidator implements
FlinkResourceValidator {
validateJmSpec(spec.getJobManager(), effectiveConfig),
validateTmSpec(spec.getTaskManager(), effectiveConfig),
validateSpecChange(deployment, effectiveConfig),
- validateServiceAccount(spec.getServiceAccount()));
+ validateServiceAccount(spec.getServiceAccount()),
+ validateAutoScalerFlinkConfiguration(effectiveConfig));
}
@SafeVarargs
@@ -487,7 +491,8 @@ public class DefaultValidator implements
FlinkResourceValidator {
return firstPresent(
validateNotApplicationCluster(sessionCluster),
validateSessionClusterId(sessionJob, sessionCluster),
- validateJobSpec(sessionJob.getSpec().getJob(), null,
effectiveConfig));
+ validateJobSpec(sessionJob.getSpec().getJob(), null,
effectiveConfig),
+ validateAutoScalerFlinkConfiguration(effectiveConfig));
}
private Optional<String> validateJobNotEmpty(FlinkSessionJob sessionJob) {
@@ -554,4 +559,52 @@ public class DefaultValidator implements
FlinkResourceValidator {
}
return Optional.empty();
}
+
+ public static Optional<String> validateAutoScalerFlinkConfiguration(
+ Map<String, String> effectiveConfig) {
+ if (effectiveConfig == null) {
+ return Optional.empty();
+ }
+ Configuration flinkConfiguration =
Configuration.fromMap(effectiveConfig);
+ if
(!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
+ return Optional.empty();
+ }
+ return firstPresent(
+ validateNumber(flinkConfiguration,
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
+ validateNumber(flinkConfiguration,
AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
+ validateNumber(flinkConfiguration,
AutoScalerOptions.TARGET_UTILIZATION, 0.0d),
+ validateNumber(
+ flinkConfiguration,
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+ }
+
+ private static <T extends Number> Optional<String> validateNumber(
+ Configuration flinkConfiguration,
+ ConfigOption<T> autoScalerConfig,
+ Double min,
+ Double max) {
+ try {
+ var configValue = flinkConfiguration.get(autoScalerConfig);
+ if (configValue != null) {
+ double value = configValue.doubleValue();
+ if ((min != null && value < min) || (max != null && value >
max)) {
+ return Optional.of(
+ String.format(
+ "The AutoScalerOption %s is invalid, it
should be a value within the range [%s, %s]",
+ autoScalerConfig.key(),
+ min != null ? min.toString() : "-Infinity",
+ max != null ? max.toString() :
"+Infinity"));
+ }
+ }
+ return Optional.empty();
+ } catch (IllegalArgumentException e) {
+ return Optional.of(
+ String.format(
+ "Invalid value in the autoscaler config %s",
autoScalerConfig.key()));
+ }
+ }
+
+ private static <T extends Number> Optional<String> validateNumber(
+ Configuration flinkConfiguration, ConfigOption<T>
autoScalerConfig, Double min) {
+ return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 506f15d3..8a3a802a 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -17,7 +17,9 @@
package org.apache.flink.kubernetes.operator.validation;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -684,4 +686,217 @@ public class DefaultValidatorTest {
}
}
}
+
+ @Test
+ public void testAutoScalerDeployment() {
+ testAutoScalerConfiguration(flinkConf ->
{}).ifPresent(Assertions::fail);
+ }
+
+ @Test
+ public void testAutoScalerDeploymentWithInvalidNegativeScaleDownFactor() {
+ var result =
+ testAutoScalerConfiguration(
+ flinkConf ->
+ flinkConf.put(
+
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-0.1"));
+ assertErrorContains(
+ result,
getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d));
+ }
+
+ @Test
+ public void testAutoScalerDeploymentWithInvalidNegativeScaleUpFactor() {
+ var result =
+ testAutoScalerConfiguration(
+ flinkConf ->
+
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-0.1"));
+ assertErrorContains(
+ result,
getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d));
+ }
+
+ @Test
+ public void testAutoScalerDeploymentWithInvalidNegativeUtilization() {
+ var result =
+ testAutoScalerConfiguration(
+ flinkConf ->
+
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6"));
+ assertErrorContains(
+ result,
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d));
+ }
+
+ @Test
+ public void
testAutoScalerDeploymentWithInvalidNegativeUtilizationBoundary() {
+ var result =
+ testAutoScalerConfiguration(
+ flinkConf ->
+ flinkConf.put(
+
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
+ "-0.6"));
+ assertErrorContains(
+ result,
+
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+ }
+
+ @Test
+ public void testNonEnabledAutoScalerDeploymentJob() {
+ var result =
+ testAutoScalerConfiguration(
+ flinkConf -> {
+
flinkConf.remove(AutoScalerOptions.AUTOSCALER_ENABLED.key());
+
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
+
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
+
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
+ flinkConf.put(
+
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+ });
+ assertErrorNotContains(result);
+ }
+
+ @Test
+ public void testDisabledEnabledAutoScalerDeploymentJob() {
+ var result =
+ testAutoScalerConfiguration(
+ flinkConf -> {
+
flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
+
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
+
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
+
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
+ flinkConf.put(
+
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+ });
+ assertErrorNotContains(result);
+ }
+
+ @Test
+ public void testValidateSessionJob() {
+ testSessionJobAutoScalerConfiguration(flinkConf ->
{}).ifPresent(Assertions::fail);
+ }
+
+ @Test
+ public void testValidateSessionJobWithInvalidNegativeScaleDownFactor() {
+ var result =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf ->
+ flinkConf.put(
+
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-0.1"));
+ assertErrorContains(
+ result,
getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d));
+ }
+
+ @Test
+ public void testValidateSessionJobWithInvalidNegativeScaleUpFactor() {
+ var result =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf ->
+
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-0.1"));
+ assertErrorContains(
+ result,
getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d));
+ }
+
+ @Test
+ public void testValidateSessionJobWithInvalidNegativeUtilization() {
+ var result =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf ->
+
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6"));
+ assertErrorContains(
+ result,
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d));
+ }
+
+ @Test
+ public void testValidateSessionJobWithInvalidNegativeUtilizationBoundary()
{
+ var result =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf ->
+ flinkConf.put(
+
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
+ "-0.6"));
+ assertErrorContains(
+ result,
+
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+ }
+
+ @Test
+ public void testValidateSessionJobWithInvalidUtilizationBoundary() {
+ var result =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf ->
+ flinkConf.put(
+
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
+ "-1.6"));
+ assertErrorContains(
+ result,
+
getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+ }
+
+ @Test
+ public void testNonEnabledAutoScalerSessionJob() {
+ var result =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf -> {
+
flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
+
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
+
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
+
flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
+ flinkConf.put(
+
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+ });
+ assertErrorNotContains(result);
+ }
+
+ private Optional<String> testSessionJobAutoScalerConfiguration(
+ Consumer<Map<String, String>> flinkConfigurationModifier) {
+ var sessionCluster = TestUtils.buildSessionCluster();
+ var sessionJob = TestUtils.buildSessionJob();
+ var flinkConfiguration =
getDefaultTestAutoScalerFlinkConfigurationMap();
+ flinkConfigurationModifier.accept(flinkConfiguration);
+ sessionCluster.getSpec().setFlinkConfiguration(flinkConfiguration);
+ return validator.validateSessionJob(sessionJob,
Optional.of(sessionCluster));
+ }
+
+ public Optional<String> testAutoScalerConfiguration(
+ Consumer<Map<String, String>> flinkConfigurationModifier) {
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ var flinkConfiguration =
getDefaultTestAutoScalerFlinkConfigurationMap();
+ flinkConfigurationModifier.accept(flinkConfiguration);
+ deployment.getSpec().setFlinkConfiguration(flinkConfiguration);
+ return validator.validateDeployment(deployment);
+ }
+
+ private Map<String, String>
getDefaultTestAutoScalerFlinkConfigurationMap() {
+ Map<String, String> conf = new HashMap<>();
+ conf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "true");
+ conf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "100000.0");
+ conf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "0.6");
+
conf.put(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED.key(),
"0.1");
+ conf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "0.7");
+ conf.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "0.4");
+ return conf;
+ }
+
+ private static String getFormattedErrorMessage(
+ ConfigOption<Double> configValue, Double min, Double max) {
+ return String.format(
+ "The AutoScalerOption %s is invalid, it should be a value
within the range [%s, %s]",
+ configValue.key(),
+ min != null ? min.toString() : "-Infinity",
+ max != null ? max.toString() : "+Infinity");
+ }
+
+ private static String getFormattedErrorMessage(ConfigOption<Double>
configValue, Double min) {
+ return getFormattedErrorMessage(configValue, min, null);
+ }
+
+ private static void assertErrorContains(Optional<String> result, String
error) {
+ if (result.isEmpty()) {
+ Assertions.fail("Invalid Configuration not caught in the tests");
+ } else {
+ Assertions.assertEquals(error, result.get());
+ }
+ }
+
+ private static void assertErrorNotContains(Optional<String> result) {
+ if (result.isPresent()) {
+ Assertions.fail("Invalid Configuration not caught in the tests");
+ }
+ }
}