This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new b7eacf8ee [Improve] Improve streampark-flink-client-core module base 
on Naming Style and add log (#3231)
b7eacf8ee is described below

commit b7eacf8ee9332e18d9558599d411aeb3c34130e0
Author: caicancai <[email protected]>
AuthorDate: Tue Oct 10 20:05:58 2023 +0800

    [Improve] Improve streampark-flink-client-core module base on Naming Style 
and add log (#3231)
---
 .../flink/client/FlinkClientEndpoint.scala         | 56 ++++++++++++----------
 .../client/impl/KubernetesSessionClientV2.scala    | 16 ++++++-
 .../flink/client/trait/FlinkClientTrait.scala      |  2 +-
 .../client/trait/KubernetesClientV2Trait.scala     | 12 ++---
 .../client/trait/KubernetesNativeClientTrait.scala |  9 ++--
 5 files changed, 58 insertions(+), 37 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
index 752bdf045..609d5cf66 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
@@ -42,59 +42,65 @@ object FlinkClientEndpoint {
     }
   )
 
-  def submit(request: SubmitRequest): SubmitResponse = {
-    clients.get(request.executionMode) match {
-      case Some(client) => client.submit(request)
+  def submit(submitRequest: SubmitRequest): SubmitResponse = {
+    clients.get(submitRequest.executionMode) match {
+      case Some(client) => client.submit(submitRequest)
       case _ =>
-        throw new UnsupportedOperationException(s"Unsupported 
${request.executionMode} submit ")
+        throw new UnsupportedOperationException(
+          s"Unsupported ${submitRequest.executionMode} submit ")
     }
   }
 
-  def cancel(request: CancelRequest): CancelResponse = {
-    clients.get(request.executionMode) match {
-      case Some(client) => client.cancel(request)
+  def cancel(cancelRequest: CancelRequest): CancelResponse = {
+    clients.get(cancelRequest.executionMode) match {
+      case Some(client) => client.cancel(cancelRequest)
       case _ =>
-        throw new UnsupportedOperationException(s"Unsupported 
${request.executionMode} cancel ")
+        throw new UnsupportedOperationException(
+          s"Unsupported ${cancelRequest.executionMode} cancel ")
     }
   }
 
-  def triggerSavepoint(request: TriggerSavepointRequest): SavepointResponse = {
-    clients.get(request.executionMode) match {
-      case Some(client) => client.triggerSavepoint(request)
+  def triggerSavepoint(savepointRequest: TriggerSavepointRequest): 
SavepointResponse = {
+    clients.get(savepointRequest.executionMode) match {
+      case Some(client) => client.triggerSavepoint(savepointRequest)
       case _ =>
         throw new UnsupportedOperationException(
-          s"Unsupported ${request.executionMode} triggerSavepoint ")
+          s"Unsupported ${savepointRequest.executionMode} triggerSavepoint ")
     }
   }
 
-  def deploy(request: DeployRequest): DeployResponse = {
-    request.executionMode match {
-      case YARN_SESSION => YarnSessionClient.deploy(request)
-      case KUBERNETES_NATIVE_SESSION => 
KubernetesNativeSessionClient.deploy(request)
+  def deploy(deployRequest: DeployRequest): DeployResponse = {
+    deployRequest.executionMode match {
+      case YARN_SESSION => YarnSessionClient.deploy(deployRequest)
+      case KUBERNETES_NATIVE_SESSION =>
+        K8sFlinkConfig.isV2Enabled match {
+          case true => KubernetesSessionClientV2.deploy(deployRequest)
+          case _ => KubernetesNativeSessionClient.deploy(deployRequest)
+        }
       case _ =>
         throw new UnsupportedOperationException(
-          s"Unsupported ${request.executionMode} deploy cluster ")
+          s"Unsupported ${deployRequest.executionMode} deploy cluster ")
     }
   }
 
-  def shutdown(request: ShutDownRequest): ShutDownResponse = {
-    request.executionMode match {
-      case YARN_SESSION => YarnSessionClient.shutdown(request)
+  def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+    shutDownRequest.executionMode match {
+      case YARN_SESSION => YarnSessionClient.shutdown(shutDownRequest)
       case KUBERNETES_NATIVE_SESSION =>
         K8sFlinkConfig.isV2Enabled match {
-          case true => KubernetesSessionClientV2.shutdown(request)
-          case _ => KubernetesNativeSessionClient.shutdown(request)
+          case true => KubernetesSessionClientV2.shutdown(shutDownRequest)
+          case _ => KubernetesNativeSessionClient.shutdown(shutDownRequest)
         }
       case KUBERNETES_NATIVE_APPLICATION =>
         K8sFlinkConfig.isV2Enabled match {
-          case true => KubernetesApplicationClientV2.shutdown(request)
+          case true => KubernetesApplicationClientV2.shutdown(shutDownRequest)
           case _ =>
             throw new UnsupportedOperationException(
-              s"Unsupported ${request.executionMode} shutdown application ")
+              s"Unsupported ${shutDownRequest.executionMode} shutdown 
application ")
         }
       case _ =>
         throw new UnsupportedOperationException(
-          s"Unsupported ${request.executionMode} shutdown cluster ")
+          s"Unsupported ${shutDownRequest.executionMode} shutdown cluster ")
     }
   }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index c40fba899..de4dbe6cf 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.jobgraph.SavepointConfigOptions
 import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
 import zio.ZIO
 
+import scala.collection.convert.ImplicitConversions.`map AsScala`
 import scala.collection.mutable
 import scala.jdk.CollectionConverters.mapAsScalaMapConverter
 import scala.util.{Failure, Success, Try}
@@ -125,7 +126,20 @@ object KubernetesSessionClientV2 extends 
KubernetesClientV2Trait with Logger {
 
   @throws[Throwable]
   def deploy(deployRequest: DeployRequest): DeployResponse = {
-
+    logInfo(
+      s"""
+         |--------------------------------------- kubernetes session start 
---------------------------------------
+         |    userFlinkHome    : ${deployRequest.flinkVersion.flinkHome}
+         |    flinkVersion     : ${deployRequest.flinkVersion.version}
+         |    execMode         : ${deployRequest.executionMode.name()}
+         |    clusterId        : ${deployRequest.clusterId}
+         |    namespace        : 
${deployRequest.k8sDeployParam.kubernetesNamespace}
+         |    exposedType      : 
${deployRequest.k8sDeployParam.flinkRestExposedType}
+         |    serviceAccount   : ${deployRequest.k8sDeployParam.serviceAccount}
+         |    flinkImage       : ${deployRequest.k8sDeployParam.flinkImage}
+         |    properties       : ${deployRequest.properties.mkString(" ")}
+         
|-------------------------------------------------------------------------------------------
+         |""".stripMargin)
     val richMsg: String => String = 
s"[flink-submit][appId=${deployRequest.id}] " + _
 
     val flinkConfig =
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 092488c47..7fb6aa148 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -209,7 +209,7 @@ trait FlinkClientTrait extends Logger {
 
   @throws[Exception]
   def doTriggerSavepoint(
-      request: TriggerSavepointRequest,
+      savepointRequest: TriggerSavepointRequest,
       flinkConf: Configuration): SavepointResponse
 
   @throws[Exception]
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
index 17949cb37..e853acdd1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
@@ -139,20 +139,20 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
 
   @throws[Exception]
   override def doTriggerSavepoint(
-      triggerSavepointRequest: TriggerSavepointRequest,
+      savepointRequest: TriggerSavepointRequest,
       flinkConf: Configuration): SavepointResponse = {
 
     val savepointDef = JobSavepointDef(
-      savepointPath = Option(triggerSavepointRequest.savepointPath),
-      formatType = Option(triggerSavepointRequest.nativeFormat)
+      savepointPath = Option(savepointRequest.savepointPath),
+      formatType = Option(savepointRequest.nativeFormat)
         .map(if (_) JobSavepointDef.NATIVE_FORMAT else 
JobSavepointDef.CANONICAL_FORMAT)
     )
 
     def richMsg: String => String =
-      s"[flink-trigger-savepoint][appId=${triggerSavepointRequest.id}] " + _
+      s"[flink-trigger-savepoint][appId=${savepointRequest.id}] " + _
 
     FlinkK8sOperator
-      .triggerJobSavepoint(triggerSavepointRequest.id, savepointDef)
+      .triggerJobSavepoint(savepointRequest.id, savepointDef)
       .flatMap {
         result =>
           if (result.isFailed) 
ZIO.fail(TriggerJobSavepointFail(result.failureCause.get))
@@ -165,7 +165,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
       case Failure(err) =>
         logError(
           richMsg(
-            s"Trigger flink job savepoint failed in 
${triggerSavepointRequest.executionMode.getName}_V2 mode!"),
+            s"Trigger flink job savepoint failed in 
${savepointRequest.executionMode.getName}_V2 mode!"),
           err)
         throw err
     }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 712fd4bc8..49958a20c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -129,15 +129,16 @@ trait KubernetesNativeClientTrait extends 
FlinkClientTrait {
 
   @throws[Exception]
   override def doTriggerSavepoint(
-      request: TriggerSavepointRequest,
+      savepointRequest: TriggerSavepointRequest,
       flinkConfig: Configuration): SavepointResponse = {
     executeClientAction(
-      request,
+      savepointRequest,
       flinkConfig,
       (jobId, clusterClient) => {
-        val actionResult = super.triggerSavepoint(request, jobId, 
clusterClient)
+        val actionResult = super.triggerSavepoint(savepointRequest, jobId, 
clusterClient)
         SavepointResponse(actionResult)
-      })
+      }
+    )
   }
 
   // noinspection DuplicatedCode

Reply via email to