This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 31ab87e63 [Improve] k8s session cluster deploy and shutdown improvement
31ab87e63 is described below

commit 31ab87e6334eb7b8f2c77d37618814c22a30a496
Author: benjobs <[email protected]>
AuthorDate: Thu Jan 11 08:46:33 2024 +0800

    [Improve] k8s session cluster deploy and shutdown improvement
---
 .../impl/KubernetesNativeSessionClient.scala       | 89 ++++++++++------------
 1 file changed, 41 insertions(+), 48 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 0aee25bbe..0cffc30ba 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -35,7 +35,7 @@ import org.apache.flink.configuration._
 import org.apache.flink.kubernetes.KubernetesClusterDescriptor
 import org.apache.flink.kubernetes.configuration.{KubernetesConfigOptions, 
KubernetesDeploymentTarget}
 import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.ServiceExposedType
-import org.apache.flink.kubernetes.kubeclient.{FlinkKubeClient, 
FlinkKubeClientFactory}
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory
 
 import java.io.File
 
@@ -152,43 +152,27 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
          |    properties       : ${deployRequest.properties.mkString(" ")}
          
|-------------------------------------------------------------------------------------------
          |""".stripMargin)
+
+    val flinkConfig = getFlinkK8sConfig(deployRequest)
+    val kubeClient = 
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
+
     var clusterDescriptor: KubernetesClusterDescriptor = null
     var client: ClusterClient[String] = null
-    var kubeClient: FlinkKubeClient = null
-    try {
-      val flinkConfig =
-        extractConfiguration(deployRequest.flinkVersion.flinkHome, 
deployRequest.properties)
-      flinkConfig
-        .safeSet(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.SESSION.getName)
-        .safeSet(KubernetesConfigOptions.NAMESPACE, 
deployRequest.kubernetesNamespace)
-        .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, 
deployRequest.serviceAccount)
-        .safeSet(
-          KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-          
ServiceExposedType.valueOf(deployRequest.flinkRestExposedType.getName))
-        .safeSet(KubernetesConfigOptions.CLUSTER_ID, deployRequest.clusterId)
-        .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
deployRequest.flinkImage)
-        .safeSet(
-          KubernetesConfigOptions.KUBE_CONFIG_FILE,
-          getDefaultKubernetesConf(deployRequest.kubeConf))
-        .safeSet(
-          DeploymentOptionsInternal.CONF_DIR,
-          s"${deployRequest.flinkVersion.flinkHome}/conf")
 
+    try {
       val kubernetesClusterDescriptor = 
getK8sClusterDescriptorAndSpecification(flinkConfig)
       clusterDescriptor = kubernetesClusterDescriptor._1
-      kubeClient = 
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
+
       val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
+      val kubeService = kubeClientWrapper.getService(deployRequest.clusterId)
 
-      if (
-        deployRequest.clusterId != null && kubeClientWrapper
-          .getService(deployRequest.clusterId)
-          .isPresent
-      ) {
+      if (kubeService.isPresent) {
         client = 
clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient
       } else {
         client =
           
clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
       }
+
       if (client.getWebInterfaceURL != null) {
         DeployResponse(client.getWebInterfaceURL, client.getClusterId)
       } else {
@@ -220,29 +204,22 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
          |    properties        : ${shutDownRequest.properties.mkString(" ")}
          
|-------------------------------------------------------------------------------------------
          |""".stripMargin)
-    var kubeClient: FlinkKubeClient = null
+
+    val flinkConfig = this.getFlinkK8sConfig(shutDownRequest)
+    val kubeClient = 
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
     try {
-      val flinkConfig = 
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
-      shutDownRequest.properties.foreach(
-        p => {
-          if (p._2 != null) {
-            flinkConfig.setString(p._1, s"${p._2}")
-          }
-        })
-      flinkConfig
-        .safeSet(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.SESSION.getName)
-        .safeSet(KubernetesConfigOptions.NAMESPACE, 
shutDownRequest.kubernetesNamespace)
-        .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, 
shutDownRequest.serviceAccount)
-        .safeSet(
-          KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-          
ServiceExposedType.valueOf(shutDownRequest.flinkRestExposedType.getName))
-        .safeSet(KubernetesConfigOptions.CLUSTER_ID, shutDownRequest.clusterId)
-        .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
shutDownRequest.flinkImage)
-        .safeSet(
-          KubernetesConfigOptions.KUBE_CONFIG_FILE,
-          getDefaultKubernetesConf(shutDownRequest.kubeConf))
-      kubeClient = 
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
-      kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
+      val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
+      val kubeService = kubeClientWrapper.getService(deployRequest.clusterId)
+      if (kubeService.isPresent) {
+        kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
+      } else {
+        val kubernetesClusterDescriptor = 
getK8sClusterDescriptorAndSpecification(flinkConfig)
+        val clusterDescriptor = kubernetesClusterDescriptor._1
+        val client = 
clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient
+        if (client != null) {
+          client.shutDownCluster()
+        }
+      }
       KubernetesDeploymentHelper.delete(
         shutDownRequest.kubernetesNamespace,
         shutDownRequest.clusterId)
@@ -256,6 +233,22 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
     }
   }
 
+  private[this] def getFlinkK8sConfig(deployRequest: KubernetesDeployRequest): 
Configuration = {
+    extractConfiguration(deployRequest.flinkVersion.flinkHome, 
deployRequest.properties)
+      .safeSet(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.SESSION.getName)
+      .safeSet(KubernetesConfigOptions.NAMESPACE, 
deployRequest.kubernetesNamespace)
+      .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, 
deployRequest.serviceAccount)
+      .safeSet(KubernetesConfigOptions.CLUSTER_ID, deployRequest.clusterId)
+      .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
deployRequest.flinkImage)
+      .safeSet(
+        KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
+        ServiceExposedType.valueOf(deployRequest.flinkRestExposedType.getName))
+      .safeSet(
+        KubernetesConfigOptions.KUBE_CONFIG_FILE,
+        getDefaultKubernetesConf(deployRequest.kubeConf))
+      .safeSet(DeploymentOptionsInternal.CONF_DIR, 
s"${deployRequest.flinkVersion.flinkHome}/conf")
+  }
+
   override def doTriggerSavepoint(
       request: TriggerSavepointRequest,
       flinkConfig: Configuration): SavepointResponse = {

Reply via email to