This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 81a2d993 [FLINK-33427] Handle new autoscaler config keys like operator
configs
81a2d993 is described below
commit 81a2d993dffb6b193a582fcc0f08b28e5bb1073d
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Nov 2 18:59:01 2023 +0100
[FLINK-33427] Handle new autoscaler config keys like operator configs
---
.../flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java | 1 +
.../flink/kubernetes/operator/config/FlinkConfigManager.java | 4 +++-
.../flink/kubernetes/operator/config/FlinkConfigManagerTest.java | 8 ++++++++
.../flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java | 5 ++++-
4 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
index 3646f2ef..d3ed1d70 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
@@ -48,6 +48,7 @@ public abstract class AbstractFlinkSpec implements
Diffable<AbstractFlinkSpec> {
/** Flink configuration overrides for the Flink deployment or Flink
session job. */
@SpecDiff.Config({
+ @SpecDiff.Entry(prefix = "job.autoscaler", type = DiffType.IGNORE),
@SpecDiff.Entry(prefix = "parallelism.default", type =
DiffType.IGNORE),
@SpecDiff.Entry(prefix = "kubernetes.operator", type =
DiffType.IGNORE),
@SpecDiff.Entry(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index df7d7ee6..0bca875a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -19,6 +19,7 @@
package org.apache.flink.kubernetes.operator.config;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -245,7 +246,8 @@ public class FlinkConfigManager {
spec.getFlinkConfiguration()
.forEach(
(k, v) -> {
- if (k.startsWith(K8S_OP_CONF_PREFIX)) {
+ if (k.startsWith(K8S_OP_CONF_PREFIX)
+ ||
k.startsWith(AutoScalerOptions.AUTOSCALER_CONF_PREFIX)) {
conf.setString(k, v);
}
});
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
index 47bf78ed..3ec8fa40 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.config;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
@@ -80,6 +81,10 @@ public class FlinkConfigManagerTest {
deployment.getSpec().getFlinkConfiguration().put(testConf.key(),
"latest");
deployment.getSpec().getFlinkConfiguration().put(opTestConf.key(),
"latest");
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(AutoScalerOptions.METRICS_WINDOW.key(), "1234m");
assertEquals(
"latest",
@@ -93,6 +98,9 @@ public class FlinkConfigManagerTest {
.get(opTestConf));
assertEquals("reconciled",
configManager.getObserveConfig(deployment).get(testConf));
assertEquals("latest",
configManager.getObserveConfig(deployment).get(opTestConf));
+ assertEquals(
+ Duration.ofMinutes(1234),
+
configManager.getObserveConfig(deployment).get(AutoScalerOptions.METRICS_WINDOW));
deployment.getSpec().getFlinkConfiguration().put(testConf.key(),
"stable");
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
index 254e6001..b9c84fcc 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.reconciler.diff;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
@@ -80,12 +81,14 @@ public class SpecDiffTest {
right.getFlinkConfiguration().put(OPERATOR_RECONCILE_INTERVAL.key(),
"100 SECONDS");
right.getFlinkConfiguration().put(SCOPE_NAMING_KUBERNETES_OPERATOR.key(),
"foo.bar");
right.getFlinkConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), "100");
+
right.getFlinkConfiguration().put(AutoScalerOptions.METRICS_WINDOW.key(),
"1234m");
diff = new ReflectiveDiffBuilder<>(KubernetesDeploymentMode.NATIVE,
left, right).build();
assertEquals(DiffType.IGNORE, diff.getType());
- assertEquals(7, diff.getNumDiffs());
+ assertEquals(8, diff.getNumDiffs());
right.getFlinkConfiguration().remove(SCOPE_NAMING_KUBERNETES_OPERATOR.key());
+
right.getFlinkConfiguration().remove(AutoScalerOptions.METRICS_WINDOW.key());
diff = new ReflectiveDiffBuilder<>(KubernetesDeploymentMode.NATIVE,
left, right).build();
assertEquals(DiffType.IGNORE, diff.getType());