wangyang0918 commented on a change in pull request #15385:
URL: https://github.com/apache/flink/pull/15385#discussion_r602718973



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java
##########
@@ -83,11 +89,19 @@ public FlinkKubeClient fromConfiguration(Configuration 
flinkConfig) {
         LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
         config.setNamespace(namespace);
 
+        final Tuple2<String, String> cacheKey = 
Tuple2.of(config.getMasterUrl(), namespace);
+        if (CACHE.containsKey(cacheKey)) {
+            return CACHE.get(cacheKey);
+        }
+
         final NamespacedKubernetesClient client = new 
DefaultKubernetesClient(config);
         final int poolSize =
                 
flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
-        return new Fabric8FlinkKubeClient(
-                flinkConfig, client, () -> 
createThreadPoolForAsyncIO(poolSize));
+        final FlinkKubeClient flinkKubeClient =
+                new Fabric8FlinkKubeClient(
+                        flinkConfig, client, () -> 
createThreadPoolForAsyncIO(poolSize));
+        CACHE.put(cacheKey, flinkKubeClient);
+        return flinkKubeClient;

Review comment:
       Thanks for your timely comments. Actually, I am still refining this PR 
about the second commit. The changes are not only about moving the `CACHE` from 
`FlinkKubeClientFactory`(of cause it is not a good place) to 
`Fabric8FlinkKubeClient`, but also about the concurrent issues. 
   
   I just forgot to convert it the draft. That's why I didn't ping you for the 
review :). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to