This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch k8s_deploy_shutdown
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 7bca3dcf8b6583e3c294775e4692ef0e33c9316e
Author: benjobs <[email protected]>
AuthorDate: Thu Jan 11 19:37:25 2024 +0800

    [Improve] k8s application|session start|stop improvement
---
 .../impl/KubernetesNativeApplicationClient.scala   | 13 ++----
 .../impl/KubernetesNativeSessionClient.scala       | 38 ++++++----------
 .../flink/client/impl/YarnApplicationClient.scala  |  8 +++-
 .../flink/client/impl/YarnPerJobClient.scala       | 14 +++---
 .../flink/client/impl/YarnSessionClient.scala      | 50 +++-------------------
 .../flink/client/trait/FlinkClientTrait.scala      |  6 +--
 .../client/trait/KubernetesNativeClientTrait.scala | 15 ++++---
 .../flink/client/trait/YarnClientTrait.scala       | 16 ++++---
 .../helper/KubernetesDeploymentHelper.scala        | 25 ++++-------
 9 files changed, 64 insertions(+), 121 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index 0d8f27a42..c608687f2 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -21,7 +21,6 @@ import org.apache.streampark.common.enums.ExecutionMode
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
 import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
 
 import com.google.common.collect.Lists
@@ -89,15 +88,9 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
     }
   }
 
-  override def doCancel(
-      cancelRequest: CancelRequest,
-      flinkConfig: Configuration): CancelResponse = {
-    flinkConfig.safeSet(
-      DeploymentOptions.TARGET,
-      ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
-    val resp = super.doCancel(cancelRequest, flinkConfig)
-    KubernetesDeploymentHelper.delete(cancelRequest.kubernetesNamespace, 
cancelRequest.clusterId)
-    resp
+  override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
+    flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
+    super.doCancel(cancelRequest, flinkConf)
   }
 
   override def doTriggerSavepoint(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 0cffc30ba..dce9a7865 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -25,7 +25,6 @@ import 
org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
 import org.apache.streampark.flink.core.FlinkKubernetesClient
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
-import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
 
 import io.fabric8.kubernetes.api.model.{Config => _}
@@ -128,11 +127,9 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
     }
   }
 
-  override def doCancel(
-      cancelRequest: CancelRequest,
-      flinkConfig: Configuration): CancelResponse = {
-    flinkConfig.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
-    super.doCancel(cancelRequest, flinkConfig)
+  override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
+    flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
+    super.doCancel(cancelRequest, flinkConf)
   }
 
   @throws[Exception]
@@ -206,30 +203,19 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
          |""".stripMargin)
 
     val flinkConfig = this.getFlinkK8sConfig(shutDownRequest)
-    val kubeClient = 
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
+    val clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
+    val client = clusterDescriptor
+      .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
+      .getClusterClient
     try {
-      val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
-      val kubeService = kubeClientWrapper.getService(deployRequest.clusterId)
-      if (kubeService.isPresent) {
-        kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
-      } else {
-        val kubernetesClusterDescriptor = 
getK8sClusterDescriptorAndSpecification(flinkConfig)
-        val clusterDescriptor = kubernetesClusterDescriptor._1
-        val client = 
clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient
-        if (client != null) {
-          client.shutDownCluster()
-        }
-      }
-      KubernetesDeploymentHelper.delete(
-        shutDownRequest.kubernetesNamespace,
-        shutDownRequest.clusterId)
+      client.shutDownCluster()
       ShutDownResponse()
     } catch {
       case e: Exception =>
         logError(s"shutdown flink session fail in 
${shutDownRequest.executionMode} mode")
         throw e
     } finally {
-      Utils.close(kubeClient)
+      Utils.close(client)
     }
   }
 
@@ -251,8 +237,8 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
 
   override def doTriggerSavepoint(
       request: TriggerSavepointRequest,
-      flinkConfig: Configuration): SavepointResponse = {
-    flinkConfig.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
-    super.doTriggerSavepoint(request, flinkConfig)
+      flinkConf: Configuration): SavepointResponse = {
+    flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
+    super.doTriggerSavepoint(request, flinkConf)
   }
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 1f4fb0ed3..8601c70a3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -30,7 +30,7 @@ import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration._
 import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
 import org.apache.flink.runtime.util.HadoopUtils
-import org.apache.flink.yarn.configuration.YarnConfigOptions
+import org.apache.flink.yarn.configuration.{YarnConfigOptions, 
YarnDeploymentTarget}
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
 
@@ -135,4 +135,10 @@ object YarnApplicationClient extends YarnClientTrait {
     })
   }
 
+  override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
+    flinkConf
+      .safeSet(DeploymentOptions.TARGET, 
YarnDeploymentTarget.APPLICATION.getName)
+    super.doCancel(cancelRequest, flinkConf)
+  }
+
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 244737d08..6ccf7001f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -25,7 +25,7 @@ import org.apache.streampark.flink.util.FlinkUtils
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
 import org.apache.flink.configuration.{Configuration, DeploymentOptions}
-import org.apache.flink.yarn.{YarnClusterClientFactory, YarnClusterDescriptor}
+import org.apache.flink.yarn.YarnClusterDescriptor
 import org.apache.flink.yarn.configuration.YarnDeploymentTarget
 import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
 import org.apache.hadoop.fs.{Path => HadoopPath}
@@ -120,14 +120,10 @@ object YarnPerJobClient extends YarnClientTrait {
     }
   }
 
-  override def doCancel(
-      cancelRequest: CancelRequest,
-      flinkConfig: Configuration): CancelResponse = {
-    val response = super.doCancel(cancelRequest, flinkConfig)
-    val clusterClientFactory = new YarnClusterClientFactory
-    val clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(flinkConfig)
-    
clusterDescriptor.killCluster(ApplicationId.fromString(cancelRequest.clusterId))
-    response
+  override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
+    flinkConf
+      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
+    super.doCancel(cancelRequest, flinkConf)
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 91eb1d92d..0032e8eca 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -135,58 +135,20 @@ object YarnSessionClient extends YarnClientTrait {
     }
   }
 
-  private[this] def executeClientAction[O, R <: SavepointRequestTrait](
-      request: R,
-      flinkConfig: Configuration,
-      actFunc: (JobID, ClusterClient[_]) => O): O = {
-    flinkConfig
-      .safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
-      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
-    logInfo(s"""
-               
|------------------------------------------------------------------
-               |Effective submit configuration: $flinkConfig
-               
|------------------------------------------------------------------
-               |""".stripMargin)
-
-    var clusterDescriptor: YarnClusterDescriptor = null
-    var client: ClusterClient[ApplicationId] = null
-    try {
-      val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
-      clusterDescriptor = yarnClusterDescriptor._2
-      client = 
clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient
-      actFunc(JobID.fromHexString(request.jobId), client)
-    } catch {
-      case e: Exception =>
-        logError(s"${request.getClass.getSimpleName} for flink yarn session 
job fail")
-        e.printStackTrace()
-        throw e
-    } finally {
-      Utils.close(client, clusterDescriptor)
-    }
-  }
-
   override def doCancel(
       cancelRequest: CancelRequest,
       flinkConfig: Configuration): CancelResponse = {
-    executeClientAction(
-      cancelRequest,
-      flinkConfig,
-      (jobID, clusterClient) => {
-        val actionResult = super.cancelJob(cancelRequest, jobID, clusterClient)
-        CancelResponse(actionResult)
-      })
+    flinkConfig
+      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+    super.doCancel(cancelRequest, flinkConfig)
   }
 
   override def doTriggerSavepoint(
       request: TriggerSavepointRequest,
       flinkConfig: Configuration): SavepointResponse = {
-    executeClientAction(
-      request,
-      flinkConfig,
-      (jobID, clusterClient) => {
-        val actionResult = super.triggerSavepoint(request, jobID, 
clusterClient)
-        SavepointResponse(actionResult)
-      })
+    flinkConfig
+      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+    super.doTriggerSavepoint(request, flinkConfig)
   }
 
   def deploy(deployRequest: DeployRequest): DeployResponse = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 0ef862883..5f536a3e9 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -470,10 +470,10 @@ trait FlinkClientTrait extends Logger {
     val savePointDir: String = tryGetSavepointPathIfNeed(cancelRequest)
 
     val clientWrapper = new FlinkClusterClient(client)
+    val withSavepoint = Try(cancelRequest.withSavepoint).getOrElse(false)
+    val withDrain = Try(cancelRequest.withDrain).getOrElse(false)
 
-    (
-      Try(cancelRequest.withSavepoint).getOrElse(false),
-      Try(cancelRequest.withDrain).getOrElse(false)) match {
+    (withSavepoint, withDrain) match {
       case (false, false) =>
         client.cancel(jobID).get()
         null
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 2dd756b8f..bf3b1767b 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -85,10 +85,14 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
     executeClientAction(
       cancelRequest,
       flinkConfig,
-      (jobId, clusterClient) => {
-        val actionResult = super.cancelJob(cancelRequest, jobId, clusterClient)
-        CancelResponse(actionResult)
-      })
+      (jobId, client) => {
+        val resp = super.cancelJob(cancelRequest, jobId, client)
+        if (cancelRequest.executionMode == 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
+          client.shutDownCluster()
+        }
+        CancelResponse(resp)
+      }
+    )
   }
 
   private[this] def executeClientAction[O, R <: SavepointRequestTrait](
@@ -157,8 +161,7 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
 
   def getK8sClusterDescriptor(flinkConfig: Configuration): 
KubernetesClusterDescriptor = {
     val clientFactory = new KubernetesClusterClientFactory()
-    val clusterDescriptor = clientFactory.createClusterDescriptor(flinkConfig)
-    clusterDescriptor
+    clientFactory.createClusterDescriptor(flinkConfig)
   }
 
   protected def flinkConfIdentifierInfo(@Nonnull conf: Configuration): String =
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 32991b109..445f45e7f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.client.`trait`
 
+import org.apache.streampark.common.enums.ExecutionMode
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.bean._
 
@@ -77,8 +78,8 @@ trait YarnClientTrait extends FlinkClientTrait {
     executeClientAction(
       request,
       flinkConf,
-      (jid, client) => {
-        SavepointResponse(super.triggerSavepoint(request, jid, client))
+      (jobID, client) => {
+        SavepointResponse(super.triggerSavepoint(request, jobID, client))
       })
   }
 
@@ -86,9 +87,14 @@ trait YarnClientTrait extends FlinkClientTrait {
     executeClientAction(
       cancelRequest,
       flinkConf,
-      (jid, client) => {
-        CancelResponse(super.cancelJob(cancelRequest, jid, client))
-      })
+      (jobId, client) => {
+        val resp = super.cancelJob(cancelRequest, jobId, client)
+        if (cancelRequest.executionMode == ExecutionMode.YARN_PER_JOB) {
+          client.shutDownCluster()
+        }
+        CancelResponse(resp)
+      }
+    )
   }
 
   private lazy val deployInternalMethod: Method = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 590cd806d..e28922002 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -68,30 +68,21 @@ object KubernetesDeploymentHelper extends Logger {
     }.getOrElse(true)
   }
 
-  private[this] def deleteDeployment(nameSpace: String, deploymentName: 
String): Boolean = {
+  private[this] def deleteDeployment(nameSpace: String, deploymentName: 
String): Unit = {
     using(KubernetesRetriever.newK8sClient()) {
       client =>
-        Try {
-          val r = client.apps.deployments
-            .inNamespace(nameSpace)
-            .withName(deploymentName)
-            .delete
-          Boolean.unbox(r)
-        }.getOrElse(false)
+        val map = client.apps.deployments.inNamespace(nameSpace)
+        map.withLabel("app", deploymentName).delete
+        map.withName(deploymentName).delete
     }
   }
 
-  private[this] def deleteConfigMap(nameSpace: String, deploymentName: 
String): Boolean = {
+  private[this] def deleteConfigMap(nameSpace: String, deploymentName: 
String): Unit = {
     using(KubernetesRetriever.newK8sClient()) {
       client =>
-        Try {
-          val r = client
-            .configMaps()
-            .inNamespace(nameSpace)
-            .withLabel("app", deploymentName)
-            .delete
-          Boolean.unbox(r)
-        }.getOrElse(false)
+        val map = client.configMaps().inNamespace(nameSpace)
+        map.withLabel("app", deploymentName).delete
+        map.withName(deploymentName).delete
     }
   }
 

Reply via email to