This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c2c90c1 [FLINK-37166] Use concurrenthashmap in flink config manager
4c2c90c1 is described below

commit 4c2c90c16c351e558d2e0ed4655bf9a2654a96e4
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Fri Jan 17 13:58:32 2025 +0100

    [FLINK-37166] Use concurrenthashmap in flink config manager
---
 .../operator/config/FlinkConfigManager.java        |  8 ++---
 .../operator/config/FlinkConfigManagerTest.java    | 41 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 4 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index 93d53fbe..730e1684 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -50,13 +50,13 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -87,7 +87,7 @@ public class FlinkConfigManager {
     private final AtomicLong defaultConfigVersion = new AtomicLong(0);
     private final LoadingCache<Key, Configuration> cache;
     private final Consumer<Set<String>> namespaceListener;
-    private volatile Map<FlinkVersion, List<String>> 
relevantFlinkVersionPrefixes;
+    private volatile ConcurrentHashMap<FlinkVersion, List<String>> 
relevantFlinkVersionPrefixes;
 
     protected static final Pattern FLINK_VERSION_PATTERN =
             Pattern.compile(
@@ -114,7 +114,7 @@ public class FlinkConfigManager {
         this.namespaceListener = namespaceListener;
         Duration cacheTimeout =
                 
defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT);
-        this.relevantFlinkVersionPrefixes = new HashMap<>();
+        this.relevantFlinkVersionPrefixes = new ConcurrentHashMap<>();
         this.cache =
                 CacheBuilder.newBuilder()
                         .maximumSize(
@@ -189,7 +189,7 @@ public class FlinkConfigManager {
         // We clear the cached relevant Flink version prefixes as the base 
config may include new
         // version overrides.
         // This will trigger a regeneration of the prefixes in the next call 
to getDefaultConfig.
-        relevantFlinkVersionPrefixes = new HashMap<>();
+        relevantFlinkVersionPrefixes = new ConcurrentHashMap<>();
     }
 
     /**
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
index fcd55157..b6bca056 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
@@ -49,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES;
@@ -380,4 +381,44 @@ public class FlinkConfigManagerTest {
         assertEquals("v3", observeConfig.get("conf3"));
         assertEquals("false", observeConfig.get("conf0"));
     }
+
+    @Test
+    public void testConcurrentDefaultConfig() throws InterruptedException {
+        var opConf = new Configuration();
+        var configManager = new FlinkConfigManager(opConf);
+        var completed1 = new AtomicBoolean();
+        var completed2 = new AtomicBoolean();
+        var completed3 = new AtomicBoolean();
+
+        var t1 =
+                new Thread(
+                        () -> {
+                            configManager.getDefaultConfig("ns1", 
FlinkVersion.v1_18);
+                            completed1.set(true);
+                        });
+        var t2 =
+                new Thread(
+                        () -> {
+                            configManager.getDefaultConfig("ns1", 
FlinkVersion.v1_18);
+                            completed2.set(true);
+                        });
+        var t3 =
+                new Thread(
+                        () -> {
+                            configManager.getDefaultConfig("ns1", 
FlinkVersion.v1_18);
+                            completed3.set(true);
+                        });
+
+        t1.start();
+        t2.start();
+        t3.start();
+
+        t1.join();
+        t2.join();
+        t3.join();
+
+        assertTrue(completed1.get());
+        assertTrue(completed2.get());
+        assertTrue(completed3.get());
+    }
 }

Reply via email to