This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new ba269cbbf [ISSUE-3299][Improve] Improve streampark-flink-client module 
base on [3.1 Naming Style] (#3300)
ba269cbbf is described below

commit ba269cbbf0eb5655efced43dc00fd6a2aac65327
Author: caicancai <[email protected]>
AuthorDate: Mon Oct 30 16:31:50 2023 +0800

    [ISSUE-3299][Improve] Improve streampark-flink-client module base on [3.1 
Naming Style] (#3300)
---
 .../streampark/flink/client/impl/LocalClient.scala  |  4 ++--
 .../streampark/flink/client/impl/RemoteClient.scala | 14 ++++++++------
 .../flink/client/impl/YarnSessionClient.scala       | 21 +++++++++++----------
 .../flink/client/trait/YarnClientTrait.scala        | 14 +++++++-------
 4 files changed, 28 insertions(+), 25 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
index 6baf9d375..5fb663685 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala
@@ -67,9 +67,9 @@ object LocalClient extends FlinkClientTrait {
   }
 
   override def doTriggerSavepoint(
-      request: TriggerSavepointRequest,
+      savepointRequest: TriggerSavepointRequest,
       flinkConfig: Configuration): SavepointResponse = {
-    RemoteClient.doTriggerSavepoint(request, flinkConfig)
+    RemoteClient.doTriggerSavepoint(savepointRequest, flinkConfig)
   }
 
   override def doCancel(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index 8e5c6a5c0..fe4fe454a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -48,12 +48,14 @@ object RemoteClient extends FlinkClientTrait {
 
   }
 
-  override def doCancel(request: CancelRequest, flinkConfig: Configuration): 
CancelResponse = {
+  override def doCancel(
+      cancelRequest: CancelRequest,
+      flinkConfig: Configuration): CancelResponse = {
     executeClientAction(
-      request,
+      cancelRequest,
       flinkConfig,
       (jobID, clusterClient) => {
-        CancelResponse(super.cancelJob(request, jobID, clusterClient))
+        CancelResponse(super.cancelJob(cancelRequest, jobID, clusterClient))
       })
   }
 
@@ -90,13 +92,13 @@ object RemoteClient extends FlinkClientTrait {
   }
 
   override def doTriggerSavepoint(
-      request: TriggerSavepointRequest,
+      savepointRequest: TriggerSavepointRequest,
       flinkConfig: Configuration): SavepointResponse = {
     executeClientAction(
-      request,
+      savepointRequest,
       flinkConfig,
       (jobID, clusterClient) => {
-        SavepointResponse(super.triggerSavepoint(request, jobID, 
clusterClient))
+        SavepointResponse(super.triggerSavepoint(savepointRequest, jobID, 
clusterClient))
       })
   }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 89260632c..c91fc45f0 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -134,11 +134,11 @@ object YarnSessionClient extends YarnClientTrait {
   }
 
   private[this] def executeClientAction[O, R <: SavepointRequestTrait](
-      request: R,
+      savepointRequestTrait: R,
       flinkConfig: Configuration,
       actFunc: (JobID, ClusterClient[_]) => O): O = {
     flinkConfig
-      .safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
+      .safeSet(YarnConfigOptions.APPLICATION_ID, 
savepointRequestTrait.clusterId)
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
     logInfo(s"""
                
|------------------------------------------------------------------
@@ -152,10 +152,10 @@ object YarnSessionClient extends YarnClientTrait {
       val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
       client = 
clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient
-      actFunc(JobID.fromHexString(request.jobId), client)
+      actFunc(JobID.fromHexString(savepointRequestTrait.jobId), client)
     } catch {
       case e: Exception =>
-        logError(s"${request.getClass.getSimpleName} for flink yarn session 
job fail")
+        logError(s"${savepointRequestTrait.getClass.getSimpleName} for flink 
yarn session job fail")
         e.printStackTrace()
         throw e
     } finally {
@@ -176,15 +176,16 @@ object YarnSessionClient extends YarnClientTrait {
   }
 
   override def doTriggerSavepoint(
-      request: TriggerSavepointRequest,
+      savepointRequest: TriggerSavepointRequest,
       flinkConfig: Configuration): SavepointResponse = {
     executeClientAction(
-      request,
+      savepointRequest,
       flinkConfig,
       (jobID, clusterClient) => {
-        val actionResult = super.triggerSavepoint(request, jobID, 
clusterClient)
+        val actionResult = super.triggerSavepoint(savepointRequest, jobID, 
clusterClient)
         SavepointResponse(actionResult)
-      })
+      }
+    )
   }
 
   def deploy(deployRequest: DeployRequest): DeployResponse = {
@@ -195,8 +196,8 @@ object YarnSessionClient extends YarnClientTrait {
          |    flinkVersion     : ${deployRequest.flinkVersion.version}
          |    execMode         : ${deployRequest.executionMode.name()}
          |    clusterId        : ${deployRequest.clusterId}
-         |    properties       : ${deployRequest.properties.mkString(" ")}
-         
|-------------------------------------------------------------------------------------------
+         |    properties       : ${deployRequest.properties.mkString(",")}
+         
|-------------------------------------------------------------------------------------------------------
          |""".stripMargin)
     var clusterDescriptor: YarnClusterDescriptor = null
     var client: ClusterClient[ApplicationId] = null
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 456b260c2..4e58a50f9 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -40,11 +40,11 @@ import scala.util.Try
 trait YarnClientTrait extends FlinkClientTrait {
 
   private[this] def executeClientAction[R <: SavepointRequestTrait, O](
-      request: R,
+      savepointRequestTrait: R,
       flinkConf: Configuration,
       actionFunc: (JobID, ClusterClient[_]) => O): O = {
 
-    flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
+    flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, 
savepointRequestTrait.clusterId)
     val clusterClientFactory = new YarnClusterClientFactory
     val applicationId = clusterClientFactory.getClusterId(flinkConf)
     if (applicationId == null) {
@@ -57,22 +57,22 @@ trait YarnClientTrait extends FlinkClientTrait {
       .getClusterClient
       .autoClose(
         client =>
-          Try(actionFunc(getJobID(request.jobId), client)).recover {
+          Try(actionFunc(getJobID(savepointRequestTrait.jobId), 
client)).recover {
             case e =>
               throw new FlinkException(
-                s"[StreamPark] Do ${request.getClass.getSimpleName} for the 
job ${request.jobId} failed. " +
+                s"[StreamPark] Do 
${savepointRequestTrait.getClass.getSimpleName} for the job 
${savepointRequestTrait.jobId} failed. " +
                   s"detail: ${ExceptionUtils.stringifyException(e)}");
           }.get)
   }
 
   override def doTriggerSavepoint(
-      request: TriggerSavepointRequest,
+      savepointRequest: TriggerSavepointRequest,
       flinkConf: Configuration): SavepointResponse = {
     executeClientAction(
-      request,
+      savepointRequest,
       flinkConf,
       (jid, client) => {
-        SavepointResponse(super.triggerSavepoint(request, jid, client))
+        SavepointResponse(super.triggerSavepoint(savepointRequest, jid, 
client))
       })
   }
 

Reply via email to