This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 436b9f320 Naming standardization (#3177)
436b9f320 is described below

commit 436b9f320184dbc85320d6a45b26bda84ddce855
Author: caicancai <[email protected]>
AuthorDate: Sun Sep 24 16:02:31 2023 +0800

    Naming standardization (#3177)
---
 .../impl/KubernetesNativeApplicationClient.scala   |  4 +--
 .../impl/KubernetesNativeSessionClient.scala       |  4 +--
 .../client/trait/KubernetesClientV2Trait.scala     | 32 ++++++++++++----------
 3 files changed, 21 insertions(+), 19 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index ea4905d9c..209a21045 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -99,9 +99,9 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
   }
 
   override def doTriggerSavepoint(
-      request: TriggerSavepointRequest,
+      triggerSavepointRequest: TriggerSavepointRequest,
       flinkConf: Configuration): SavepointResponse = {
     flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
-    super.doTriggerSavepoint(request, flinkConf)
+    super.doTriggerSavepoint(triggerSavepointRequest, flinkConf)
   }
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index c4c9c6e41..a63e91298 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -250,9 +250,9 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
   }
 
   override def doTriggerSavepoint(
-      request: TriggerSavepointRequest,
+      triggerSavepointRequest: TriggerSavepointRequest,
       flinkConfig: Configuration): SavepointResponse = {
     flinkConfig.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
-    super.doTriggerSavepoint(request, flinkConfig)
+    super.doTriggerSavepoint(triggerSavepointRequest, flinkConfig)
   }
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
index 75b979246..eebcaec2e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
@@ -99,23 +99,23 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
   }
 
   @throws[Exception]
-  override def doCancel(request: CancelRequest, flinkConf: Configuration): 
CancelResponse = {
+  override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
     val effect =
-      if (!request.withSavepoint) {
+      if (!cancelRequest.withSavepoint) {
         // cancel job
         FlinkK8sOperator
-          .cancelJob(request.id)
+          .cancelJob(cancelRequest.id)
           .as(CancelResponse(null))
       } else {
         // stop job with savepoint
         val savepointDef = JobSavepointDef(
-          drain = Option(request.withDrain).getOrElse(false),
-          savepointPath = Option(request.savepointPath),
-          formatType = Option(request.nativeFormat)
+          drain = Option(cancelRequest.withDrain).getOrElse(false),
+          savepointPath = Option(cancelRequest.savepointPath),
+          formatType = Option(cancelRequest.nativeFormat)
             .map(if (_) JobSavepointDef.NATIVE_FORMAT else 
JobSavepointDef.CANONICAL_FORMAT)
         )
         FlinkK8sOperator
-          .stopJob(request.id, savepointDef)
+          .stopJob(cancelRequest.id, savepointDef)
           .flatMap {
             result =>
               if (result.isFailed) 
ZIO.fail(StopJobFail(result.failureCause.get))
@@ -123,7 +123,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
           }
       }
 
-    def richMsg: String => String = s"[flink-cancel][appId=${request.id}] " + _
+    def richMsg: String => String = 
s"[flink-cancel][appId=${cancelRequest.id}] " + _
 
     effect.runIOAsTry match {
       case Success(rsp) =>
@@ -131,7 +131,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
         rsp
       case Failure(err) =>
         logError(
-          richMsg(s"Cancel flink job fail in 
${request.executionMode.getName}_V2 mode!"),
+          richMsg(s"Cancel flink job fail in 
${cancelRequest.executionMode.getName}_V2 mode!"),
           err)
         throw err
     }
@@ -139,19 +139,20 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
 
   @throws[Exception]
   override def doTriggerSavepoint(
-      request: TriggerSavepointRequest,
+      triggerSavepointRequest: TriggerSavepointRequest,
       flinkConf: Configuration): SavepointResponse = {
 
     val savepointDef = JobSavepointDef(
-      savepointPath = Option(request.savepointPath),
-      formatType = Option(request.nativeFormat)
+      savepointPath = Option(triggerSavepointRequest.savepointPath),
+      formatType = Option(triggerSavepointRequest.nativeFormat)
         .map(if (_) JobSavepointDef.NATIVE_FORMAT else 
JobSavepointDef.CANONICAL_FORMAT)
     )
 
-    def richMsg: String => String = 
s"[flink-trigger-savepoint][appId=${request.id}] " + _
+    def richMsg: String => String =
+      s"[flink-trigger-savepoint][appId=${triggerSavepointRequest.id}] " + _
 
     FlinkK8sOperator
-      .triggerJobSavepoint(request.id, savepointDef)
+      .triggerJobSavepoint(triggerSavepointRequest.id, savepointDef)
       .flatMap {
         result =>
           if (result.isFailed) 
ZIO.fail(TriggerJobSavepointFail(result.failureCause.get))
@@ -163,7 +164,8 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
         rsp
       case Failure(err) =>
         logError(
-          richMsg(s"Cancel flink job fail in 
${request.executionMode.getName}_V2 mode!"),
+          richMsg(
+            s"Cancel flink job fail in 
${triggerSavepointRequest.executionMode.getName}_V2 mode!"),
           err)
         throw err
     }

Reply via email to