tillrohrmann commented on a change in pull request #15480:
URL: https://github.com/apache/flink/pull/15480#discussion_r607705598
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java
##########
@@ -92,13 +94,32 @@ public FlinkKubeClient fromConfiguration(Configuration
flinkConfig, String useCa
LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
config.setNamespace(namespace);
+ // This could be removed after we bump the fabric8 Kubernetes client
version to 4.13.0+ or
+ // use the a shared connection for all ConfigMap watches. See
FLINK-22006 for more
+ // information.
+ trySetMaxConcurrentRequest(config);
+
final NamespacedKubernetesClient client = new
DefaultKubernetesClient(config);
final int poolSize =
flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
return new Fabric8FlinkKubeClient(
flinkConfig, client, createThreadPoolForAsyncIO(poolSize,
useCase));
}
+ @VisibleForTesting
+ static void trySetMaxConcurrentRequest(Config config) {
+ final String configuredMaxConcurrentRequests =
+ Utils.getSystemPropertyOrEnvVar(
+ Config.KUBERNETES_MAX_CONCURRENT_REQUESTS,
+
String.valueOf(Config.DEFAULT_MAX_CONCURRENT_REQUESTS));
Review comment:
I am wondering whether we shouldn't rather make it configurable via a
Flink configuration option? That way it would also be documented for our users
and easier to use.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]