This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new babb49482 [Improve] Improve streampark-flink-client-core module base 
on [Naming Style] (#3228)
babb49482 is described below

commit babb4948287774de4b891716e3c7d35eedf6db26
Author: caicancai <[email protected]>
AuthorDate: Tue Oct 10 12:00:50 2023 +0800

    [Improve] Improve streampark-flink-client-core module base on [Naming 
Style] (#3228)
---
 .../flink/client/impl/KubernetesSessionClientV2.scala   | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index 3cbfbdd2e..c40fba899 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -176,15 +176,15 @@ object KubernetesSessionClientV2 extends 
KubernetesClientV2Trait with Logger {
   }
 
   private def genFlinkDeployDef(
-      deployRequest: DeployRequest,
+      deployReq: DeployRequest,
       originFlinkConfig: Configuration): Either[FailureMessage, 
FlinkDeploymentDef] = {
     val flinkConfObj = originFlinkConfig.clone()
     val flinkConfMap = originFlinkConfig.toMap.asScala.toMap
 
-    val namespace = Option(deployRequest.k8sDeployParam.kubernetesNamespace)
+    val namespace = Option(deployReq.k8sDeployParam.kubernetesNamespace)
       .getOrElse("default")
 
-    val name = Option(deployRequest.k8sDeployParam.clusterId)
+    val name = Option(deployReq.k8sDeployParam.clusterId)
       .filter(str => StringUtils.isNotBlank(str))
       .getOrElse(return Left("Kubernetes CR name should not be empty"))
 
@@ -192,18 +192,17 @@ object KubernetesSessionClientV2 extends 
KubernetesClientV2Trait with Logger {
       .getOption(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
       .map(_.toString)
 
-    val image = Option(deployRequest.k8sDeployParam.flinkImage)
+    val image = Option(deployReq.k8sDeployParam.flinkImage)
       .filter(str => StringUtils.isNotBlank(str))
       .getOrElse(return Left("Flink base image should not be empty"))
 
-    val serviceAccount = Option(deployRequest.k8sDeployParam.serviceAccount)
+    val serviceAccount = Option(deployReq.k8sDeployParam.serviceAccount)
       .getOrElse(FlinkDeploymentDef.DEFAULT_SERVICE_ACCOUNT)
 
-    val flinkVersion = Option(deployRequest.flinkVersion.majorVersion)
+    val flinkVersion = Option(deployReq.flinkVersion.majorVersion)
       .map(majorVer => "V" + majorVer.replace(".", "_"))
       .flatMap(v => FlinkVersion.values().find(_.name() == v))
-      .getOrElse(
-        return Left(s"Unsupported Flink 
version:${deployRequest.flinkVersion.majorVersion}"))
+      .getOrElse(return Left(s"Unsupported Flink 
version:${deployReq.flinkVersion.majorVersion}"))
 
     val jobManager = {
       val cpu = flinkConfMap
@@ -255,7 +254,7 @@ object KubernetesSessionClientV2 extends 
KubernetesClientV2Trait with Logger {
         .removeKey(KUBERNETES_TM_CPU_KEY)
         .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
         .removeKey(KUBERNETES_JM_CPU_KEY)
-      Option(deployRequest.k8sDeployParam.flinkRestExposedType).foreach {
+      Option(deployReq.k8sDeployParam.flinkRestExposedType).foreach {
         exposedType => result += KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY -> 
exposedType.getName
       }
       result.toMap

Reply via email to