This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 4c2c90c1 [FLINK-37166] Use concurrenthashmap in flink config manager 4c2c90c1 is described below commit 4c2c90c16c351e558d2e0ed4655bf9a2654a96e4 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Fri Jan 17 13:58:32 2025 +0100 [FLINK-37166] Use concurrenthashmap in flink config manager --- .../operator/config/FlinkConfigManager.java | 8 ++--- .../operator/config/FlinkConfigManagerTest.java | 41 ++++++++++++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 93d53fbe..730e1684 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -50,13 +50,13 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -87,7 +87,7 @@ public class FlinkConfigManager { private final AtomicLong defaultConfigVersion = new AtomicLong(0); private final LoadingCache<Key, Configuration> cache; private final Consumer<Set<String>> namespaceListener; - private volatile Map<FlinkVersion, List<String>> relevantFlinkVersionPrefixes; + private volatile ConcurrentHashMap<FlinkVersion, List<String>> relevantFlinkVersionPrefixes; protected static final Pattern FLINK_VERSION_PATTERN = Pattern.compile( @@ -114,7 +114,7 @@ public class FlinkConfigManager { this.namespaceListener = namespaceListener; Duration cacheTimeout = defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT); - this.relevantFlinkVersionPrefixes = new HashMap<>(); + this.relevantFlinkVersionPrefixes = new ConcurrentHashMap<>(); this.cache = CacheBuilder.newBuilder() .maximumSize( @@ -189,7 +189,7 @@ public class FlinkConfigManager { // We clear the cached relevant Flink version prefixes as the base config may include new // version overrides. // This will trigger a regeneration of the prefixes in the next call to getDefaultConfig. - relevantFlinkVersionPrefixes = new HashMap<>(); + relevantFlinkVersionPrefixes = new ConcurrentHashMap<>(); } /** diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java index fcd55157..b6bca056 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES; @@ -380,4 +381,44 @@ public class FlinkConfigManagerTest { assertEquals("v3", observeConfig.get("conf3")); assertEquals("false", observeConfig.get("conf0")); } + + @Test + public void testConcurrentDefaultConfig() throws InterruptedException { + var opConf = new Configuration(); + var configManager = new FlinkConfigManager(opConf); + var completed1 = new AtomicBoolean(); + var completed2 = new AtomicBoolean(); + var completed3 = new AtomicBoolean(); + + var t1 = + new Thread( + () -> { + configManager.getDefaultConfig("ns1", FlinkVersion.v1_18); + completed1.set(true); + }); + var t2 = + new Thread( + () -> { + configManager.getDefaultConfig("ns1", FlinkVersion.v1_18); + completed2.set(true); + }); + var t3 = + new Thread( + () -> { + configManager.getDefaultConfig("ns1", FlinkVersion.v1_18); + completed3.set(true); + }); + + t1.start(); + t2.start(); + t3.start(); + + t1.join(); + t2.join(); + t3.join(); + + assertTrue(completed1.get()); + assertTrue(completed2.get()); + assertTrue(completed3.get()); + } }