This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new b71e094fe The YarnClientTrait add clusterClient close (#2906)
b71e094fe is described below
commit b71e094fed373116949d471d0541d5a3182d199d
Author: ChengJie1053 <[email protected]>
AuthorDate: Sat Jul 29 15:10:01 2023 +0800
The YarnClientTrait add clusterClient close (#2906)
* The YarnClientTrait add clusterClient close
* Optimized code
---
.../flink/client/trait/YarnClientTrait.scala | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index fa88df7dd..0ce7cb9c7 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -42,8 +42,8 @@ trait YarnClientTrait extends FlinkClientTrait {
request: R,
flinkConf: Configuration,
actionFunc: (JobID, ClusterClient[_]) => O): O = {
- val jobID = getJobID(request.jobId)
- val clusterClient = {
+
+ Utils.using {
flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
val clusterClientFactory = new YarnClusterClientFactory
val applicationId = clusterClientFactory.getClusterId(flinkConf)
@@ -53,15 +53,15 @@ trait YarnClientTrait extends FlinkClientTrait {
}
val clusterDescriptor =
clusterClientFactory.createClusterDescriptor(flinkConf)
clusterDescriptor.retrieve(applicationId).getClusterClient
+ } {
+ client =>
+ Try(actionFunc(getJobID(request.jobId), client)).recover {
+ case e =>
+ throw new FlinkException(
+ s"[StreamPark] Do ${request.getClass.getSimpleName} for the job
${request.jobId} failed. " +
+ s"detail: ${Utils.stringifyException(e)}");
+ }.get
}
- Try {
- actionFunc(jobID, clusterClient)
- }.recover {
- case e =>
- throw new FlinkException(
- s"[StreamPark] Do ${request.getClass.getSimpleName} for the job
${request.jobId} failed. " +
- s"detail: ${Utils.stringifyException(e)}");
- }.get
}
override def doTriggerSavepoint(