This is an automated email from the ASF dual-hosted git repository.
cancai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 2df7c9003 [ISSUES-3667] KubernetesClusterDescriptor not closed after
use, cause the file handle resource leaks (#3668)
2df7c9003 is described below
commit 2df7c90030b3e9b79ccc2e67b08c40382c0c1128
Author: jianjun.xu <[email protected]>
AuthorDate: Tue Apr 16 16:40:47 2024 +0800
[ISSUES-3667] KubernetesClusterDescriptor not closed after use, cause the
file handle resource leaks (#3668)
[ISSUES-3667] KubernetesClusterDescriptor not closed after use, cause the
file handle resource leaks
---------
Co-authored-by: jianjun.xu <[email protected]>
---
.../flink/kubernetes/KubernetesRetriever.scala | 34 +++++++++++-----------
1 file changed, 17 insertions(+), 17 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 916096f25..fc876c7c3 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -18,8 +18,8 @@
package org.apache.streampark.flink.kubernetes
import org.apache.streampark.common.conf.ConfigKeys
+import org.apache.streampark.common.util.{Logger, Utils}
import org.apache.streampark.common.util.ImplicitsUtils._
-import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum
import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.kubernetes.model.ClusterKey
@@ -83,22 +83,22 @@ object KubernetesRetriever extends Logger {
flinkConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace)
}
// retrieve flink cluster client
- val clientFactory: ClusterClientFactory[String] =
- clusterClientServiceLoader.getClusterClientFactory(flinkConfig)
-
- val clusterProvider: KubernetesClusterDescriptor =
-
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[KubernetesClusterDescriptor]
-
- Try {
- clusterProvider
- .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
- .getClusterClient
- } match {
- case Success(v) => Some(v)
- case Failure(e) =>
- logError(s"Get flinkClient error, the error is: $e")
- None
- }
+ clusterClientServiceLoader
+ .getClusterClientFactory(flinkConfig)
+ .createClusterDescriptor(flinkConfig)
+ .asInstanceOf[KubernetesClusterDescriptor]
+ .autoClose(
+ clusterProvider =>
+ Try {
+ clusterProvider
+
.retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
+ .getClusterClient
+ } match {
+ case Success(v) => Some(v)
+ case Failure(e) =>
+ logError(s"Get flinkClient error, the error is: $e")
+ None
+ })
}
/**