This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 81a2d993 [FLINK-33427] Handle new autoscaler config keys like operator 
configs
81a2d993 is described below

commit 81a2d993dffb6b193a582fcc0f08b28e5bb1073d
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Nov 2 18:59:01 2023 +0100

    [FLINK-33427] Handle new autoscaler config keys like operator configs
---
 .../flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java     | 1 +
 .../flink/kubernetes/operator/config/FlinkConfigManager.java      | 4 +++-
 .../flink/kubernetes/operator/config/FlinkConfigManagerTest.java  | 8 ++++++++
 .../flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java   | 5 ++++-
 4 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
index 3646f2ef..d3ed1d70 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
@@ -48,6 +48,7 @@ public abstract class AbstractFlinkSpec implements 
Diffable<AbstractFlinkSpec> {
 
     /** Flink configuration overrides for the Flink deployment or Flink 
session job. */
     @SpecDiff.Config({
+        @SpecDiff.Entry(prefix = "job.autoscaler", type = DiffType.IGNORE),
         @SpecDiff.Entry(prefix = "parallelism.default", type = 
DiffType.IGNORE),
         @SpecDiff.Entry(prefix = "kubernetes.operator", type = 
DiffType.IGNORE),
         @SpecDiff.Entry(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index df7d7ee6..0bca875a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.kubernetes.operator.config;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -245,7 +246,8 @@ public class FlinkConfigManager {
             spec.getFlinkConfiguration()
                     .forEach(
                             (k, v) -> {
-                                if (k.startsWith(K8S_OP_CONF_PREFIX)) {
+                                if (k.startsWith(K8S_OP_CONF_PREFIX)
+                                        || 
k.startsWith(AutoScalerOptions.AUTOSCALER_CONF_PREFIX)) {
                                     conf.setString(k, v);
                                 }
                             });
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
index 47bf78ed..3ec8fa40 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.config;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -80,6 +81,10 @@ public class FlinkConfigManagerTest {
 
         deployment.getSpec().getFlinkConfiguration().put(testConf.key(), 
"latest");
         deployment.getSpec().getFlinkConfiguration().put(opTestConf.key(), 
"latest");
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(AutoScalerOptions.METRICS_WINDOW.key(), "1234m");
 
         assertEquals(
                 "latest",
@@ -93,6 +98,9 @@ public class FlinkConfigManagerTest {
                         .get(opTestConf));
         assertEquals("reconciled", 
configManager.getObserveConfig(deployment).get(testConf));
         assertEquals("latest", 
configManager.getObserveConfig(deployment).get(opTestConf));
+        assertEquals(
+                Duration.ofMinutes(1234),
+                
configManager.getObserveConfig(deployment).get(AutoScalerOptions.METRICS_WINDOW));
 
         deployment.getSpec().getFlinkConfiguration().put(testConf.key(), 
"stable");
         
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
index 254e6001..b9c84fcc 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.diff;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
@@ -80,12 +81,14 @@ public class SpecDiffTest {
         right.getFlinkConfiguration().put(OPERATOR_RECONCILE_INTERVAL.key(), 
"100 SECONDS");
         
right.getFlinkConfiguration().put(SCOPE_NAMING_KUBERNETES_OPERATOR.key(), 
"foo.bar");
         
right.getFlinkConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), "100");
+        
right.getFlinkConfiguration().put(AutoScalerOptions.METRICS_WINDOW.key(), 
"1234m");
 
         diff = new ReflectiveDiffBuilder<>(KubernetesDeploymentMode.NATIVE, 
left, right).build();
         assertEquals(DiffType.IGNORE, diff.getType());
-        assertEquals(7, diff.getNumDiffs());
+        assertEquals(8, diff.getNumDiffs());
 
         
right.getFlinkConfiguration().remove(SCOPE_NAMING_KUBERNETES_OPERATOR.key());
+        
right.getFlinkConfiguration().remove(AutoScalerOptions.METRICS_WINDOW.key());
 
         diff = new ReflectiveDiffBuilder<>(KubernetesDeploymentMode.NATIVE, 
left, right).build();
         assertEquals(DiffType.IGNORE, diff.getType());

Reply via email to