This is an automated email from the ASF dual-hosted git repository.

cancai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2df7c9003 [ISSUES-3667] KubernetesClusterDescriptor not closed after 
use, cause the file handle resource leaks (#3668)
2df7c9003 is described below

commit 2df7c90030b3e9b79ccc2e67b08c40382c0c1128
Author: jianjun.xu <[email protected]>
AuthorDate: Tue Apr 16 16:40:47 2024 +0800

    [ISSUES-3667] KubernetesClusterDescriptor not closed after use, cause the 
file handle resource leaks (#3668)
    
    [ISSUES-3667] KubernetesClusterDescriptor not closed after use, cause the 
file handle resource leaks
    ---------
    
    Co-authored-by: jianjun.xu <[email protected]>
---
 .../flink/kubernetes/KubernetesRetriever.scala     | 34 +++++++++++-----------
 1 file changed, 17 insertions(+), 17 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 916096f25..fc876c7c3 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -18,8 +18,8 @@
 package org.apache.streampark.flink.kubernetes
 
 import org.apache.streampark.common.conf.ConfigKeys
+import org.apache.streampark.common.util.{Logger, Utils}
 import org.apache.streampark.common.util.ImplicitsUtils._
-import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum
 import org.apache.streampark.flink.kubernetes.ingress.IngressController
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
@@ -83,22 +83,22 @@ object KubernetesRetriever extends Logger {
       flinkConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace)
     }
     // retrieve flink cluster client
-    val clientFactory: ClusterClientFactory[String] =
-      clusterClientServiceLoader.getClusterClientFactory(flinkConfig)
-
-    val clusterProvider: KubernetesClusterDescriptor =
-      
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[KubernetesClusterDescriptor]
-
-    Try {
-      clusterProvider
-        .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
-        .getClusterClient
-    } match {
-      case Success(v) => Some(v)
-      case Failure(e) =>
-        logError(s"Get flinkClient error, the error is: $e")
-        None
-    }
+    clusterClientServiceLoader
+      .getClusterClientFactory(flinkConfig)
+      .createClusterDescriptor(flinkConfig)
+      .asInstanceOf[KubernetesClusterDescriptor]
+      .autoClose(
+        clusterProvider =>
+          Try {
+            clusterProvider
+              
.retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
+              .getClusterClient
+          } match {
+            case Success(v) => Some(v)
+            case Failure(e) =>
+              logError(s"Get flinkClient error, the error is: $e")
+              None
+          })
   }
 
   /**

Reply via email to