This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new b18bdb47 [hotfix] Do not leak autoscaler configs to jobs
b18bdb47 is described below

commit b18bdb47fd18e6f51b79bc1321f7917d95021c04
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Fri Mar 7 14:40:05 2025 +0100

    [hotfix] Do not leak autoscaler configs to jobs
---
 .../kubernetes/operator/service/AbstractFlinkService.java     |  4 +++-
 .../kubernetes/operator/service/AbstractFlinkServiceTest.java | 11 ++++++++---
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 48a70b3a..733ebd3a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.utils.JobStatusUtils;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -984,7 +985,8 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         config.toMap()
                 .forEach(
                         (k, v) -> {
-                            if (!k.startsWith(K8S_OP_CONF_PREFIX)) {
+                            if (!k.startsWith(K8S_OP_CONF_PREFIX)
+                                    && 
!k.startsWith(AutoScalerOptions.AUTOSCALER_CONF_PREFIX)) {
                                 newConfig.setString(k, v);
                             }
                         });
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 9082ff35..a57d60a8 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -987,10 +987,15 @@ public class AbstractFlinkServiceTest {
 
     @Test
     public void removeOperatorConfigTest() {
-        var key = "kubernetes.operator.meyKey";
-        var deployConfig = 
Configuration.fromMap(Map.of("kubernetes.operator.meyKey", "v"));
+        var opKey1 = "kubernetes.operator.meyKey";
+        var opKey2 = "job.autoscaler.";
+        var regularKey = "k";
+        var deployConfig =
+                Configuration.fromMap(Map.of(opKey1, "v", opKey2, "v", 
regularKey, "v1"));
         var newConf = AbstractFlinkService.removeOperatorConfigs(deployConfig);
-        assertFalse(newConf.containsKey(key));
+        assertFalse(newConf.containsKey(opKey1));
+        assertFalse(newConf.containsKey(opKey2));
+        assertTrue(newConf.containsKey(regularKey));
     }
 
     @Test

Reply via email to