This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 86a0d39 [FLINK-27303] Improve config cache settings + add cleanup
86a0d39 is described below
commit 86a0d396866cbd0999b599a1e2088ca765e81bae
Author: Gyula Fora <[email protected]>
AuthorDate: Tue May 3 14:13:06 2022 +0200
[FLINK-27303] Improve config cache settings + add cleanup
---
.../kubernetes_operator_config_configuration.html | 12 ++++++++++
.../operator/config/FlinkConfigManager.java | 27 ++++++++++++++--------
.../config/KubernetesOperatorConfigOptions.java | 12 ++++++++++
3 files changed, 42 insertions(+), 9 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 61bef00..678a649 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -8,6 +8,18 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>kubernetes.operator.config.cache.size</h5></td>
+ <td style="word-wrap: break-word;">1000</td>
+ <td>Integer</td>
+ <td>Max config cache size.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.config.cache.timeout</h5></td>
+ <td style="word-wrap: break-word;">10 min</td>
+ <td>Duration</td>
+ <td>Expiration time for cached configs.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.deployment.readiness.timeout</h5></td>
<td style="word-wrap: break-word;">1 min</td>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index da97fef..dfcfc9e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -55,9 +56,6 @@ public class FlinkConfigManager {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkConfigManager.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
- private static final int MAX_CACHE_SIZE = 1000;
- private static final Duration CACHE_TIMEOUT = Duration.ofMinutes(30);
-
private volatile Configuration defaultConfig;
private volatile FlinkOperatorConfiguration operatorConfiguration;
private final AtomicLong defaultConfigVersion = new AtomicLong(0);
@@ -70,10 +68,14 @@ public class FlinkConfigManager {
}
public FlinkConfigManager(Configuration defaultConfig) {
+ Duration cacheTimeout =
+
defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT);
this.cache =
CacheBuilder.newBuilder()
- .maximumSize(MAX_CACHE_SIZE)
- .expireAfterAccess(CACHE_TIMEOUT)
+ .maximumSize(
+ defaultConfig.get(
+
KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_SIZE))
+ .expireAfterAccess(cacheTimeout)
.removalListener(
removalNotification ->
FlinkConfigBuilder.cleanupTmpFiles(
@@ -87,8 +89,15 @@ public class FlinkConfigManager {
});
updateDefaultConfig(defaultConfig);
+ ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+ executorService.scheduleWithFixedDelay(
+ cache::cleanUp,
+ cacheTimeout.toMillis(),
+ cacheTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
+
if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
- scheduleConfigWatcher();
+ scheduleConfigWatcher(executorService);
}
}
@@ -151,11 +160,11 @@ public class FlinkConfigManager {
}
}
- private void scheduleConfigWatcher() {
+ private void scheduleConfigWatcher(ScheduledExecutorService
executorService) {
var checkInterval =
defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);
var millis = checkInterval.toMillis();
- Executors.newSingleThreadScheduledExecutor()
- .scheduleAtFixedRate(new ConfigUpdater(), millis, millis,
TimeUnit.MILLISECONDS);
+ executorService.scheduleAtFixedRate(
+ new ConfigUpdater(), millis, millis, TimeUnit.MILLISECONDS);
LOG.info("Enabled dynamic config updates, checking config changes
every {}", checkInterval);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index cc6ef7e..cc5e883 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -122,4 +122,16 @@ public class KubernetesOperatorConfigOptions {
.durationType()
.defaultValue(Duration.ofMinutes(5))
.withDescription("Time interval for checking config
changes.");
+
+ public static final ConfigOption<Duration> OPERATOR_CONFIG_CACHE_TIMEOUT =
+ ConfigOptions.key("kubernetes.operator.config.cache.timeout")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(10))
+ .withDescription("Expiration time for cached configs.");
+
+ public static final ConfigOption<Integer> OPERATOR_CONFIG_CACHE_SIZE =
+ ConfigOptions.key("kubernetes.operator.config.cache.size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("Max config cache size.");
}