This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new b71e094fe The YarnClientTrait add clusterClient close (#2906)
b71e094fe is described below

commit b71e094fed373116949d471d0541d5a3182d199d
Author: ChengJie1053 <[email protected]>
AuthorDate: Sat Jul 29 15:10:01 2023 +0800

    The YarnClientTrait add clusterClient close (#2906)
    
    * The YarnClientTrait add clusterClient close
    
    * Optimized code
---
 .../flink/client/trait/YarnClientTrait.scala         | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index fa88df7dd..0ce7cb9c7 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -42,8 +42,8 @@ trait YarnClientTrait extends FlinkClientTrait {
       request: R,
       flinkConf: Configuration,
       actionFunc: (JobID, ClusterClient[_]) => O): O = {
-    val jobID = getJobID(request.jobId)
-    val clusterClient = {
+
+    Utils.using {
       flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
       val clusterClientFactory = new YarnClusterClientFactory
       val applicationId = clusterClientFactory.getClusterId(flinkConf)
@@ -53,15 +53,15 @@ trait YarnClientTrait extends FlinkClientTrait {
       }
       val clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(flinkConf)
       clusterDescriptor.retrieve(applicationId).getClusterClient
+    } {
+      client =>
+        Try(actionFunc(getJobID(request.jobId), client)).recover {
+          case e =>
+            throw new FlinkException(
+              s"[StreamPark] Do ${request.getClass.getSimpleName} for the job 
${request.jobId} failed. " +
+                s"detail: ${Utils.stringifyException(e)}");
+        }.get
     }
-    Try {
-      actionFunc(jobID, clusterClient)
-    }.recover {
-      case e =>
-        throw new FlinkException(
-          s"[StreamPark] Do ${request.getClass.getSimpleName} for the job 
${request.jobId} failed. " +
-            s"detail: ${Utils.stringifyException(e)}");
-    }.get
   }
 
   override def doTriggerSavepoint(

Reply via email to