This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch k8s-shutdown
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/k8s-shutdown by this push:
     new 7724a758b [Improve] k8s session cluster shutdown bug fixed.
7724a758b is described below

commit 7724a758b7c114fa83013b7127ef13c4de3cd510
Author: benjobs <[email protected]>
AuthorDate: Tue Jan 9 23:31:28 2024 +0800

    [Improve] k8s session cluster shutdown bug fixed.
---
 .../core/service/impl/ApplicationServiceImpl.java  | 12 +-----
 .../impl/KubernetesNativeApplicationClient.scala   |  5 ++-
 .../impl/KubernetesNativeSessionClient.scala       | 46 +++++++++++++---------
 3 files changed, 32 insertions(+), 31 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index c58fcd5f2..e69189a14 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1147,7 +1147,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (isKubernetesApp(application)) {
       KubernetesDeploymentHelper.watchPodTerminatedLog(
           application.getK8sNamespace(), application.getJobName(), 
application.getJobId());
-      KubernetesDeploymentHelper.delete(application.getK8sNamespace(), 
application.getJobName());
     }
     if (startFuture != null) {
       startFuture.cancel(true);
@@ -1355,7 +1354,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               }
               // re-tracking flink job on kubernetes and logging exception
               if (isKubernetesApp(application)) {
-                KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
                 k8SFlinkTrackMonitor.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(application.getId());
@@ -1382,7 +1380,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           }
 
           if (isKubernetesApp(application)) {
-            KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
             k8SFlinkTrackMonitor.unWatching(trackId);
           }
         });
@@ -1560,13 +1557,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
     }
 
-    TrackId trackId;
-    if (isKubernetesApp(application)) {
-      trackId = toTrackId(application);
-      KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
-    } else {
-      trackId = null;
-    }
+    TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : 
null;
 
     KubernetesSubmitParam kubernetesSubmitParam =
         new KubernetesSubmitParam(
@@ -1764,7 +1755,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // re-tracking flink job on kubernetes and logging exception
     if (isKubernetesApp(application)) {
       TrackId id = toTrackId(application);
-      KubernetesDeploymentHelper.delete(id.namespace(), id.clusterId());
       k8SFlinkTrackMonitor.doWatching(id);
     } else {
       FlinkRESTAPIWatcher.unWatching(application.getId());
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index be57adb41..0d8f27a42 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.enums.ExecutionMode
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
 import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
 
 import com.google.common.collect.Lists
@@ -94,7 +95,9 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
     flinkConfig.safeSet(
       DeploymentOptions.TARGET,
       ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
-    super.doCancel(cancelRequest, flinkConfig)
+    val resp = super.doCancel(cancelRequest, flinkConfig)
+    KubernetesDeploymentHelper.delete(cancelRequest.kubernetesNamespace, 
cancelRequest.clusterId)
+    resp
   }
 
   override def doTriggerSavepoint(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 3f2e99bde..0aee25bbe 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -135,11 +135,12 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
     super.doCancel(cancelRequest, flinkConfig)
   }
 
+  @throws[Exception]
   def deploy(deployReq: DeployRequest): DeployResponse = {
     val deployRequest = deployReq.asInstanceOf[KubernetesDeployRequest]
     logInfo(
       s"""
-         |--------------------------------------- kubernetes session start 
---------------------------------------
+         |--------------------------------------- kubernetes session cluster 
start ---------------------------------------
          |    userFlinkHome    : ${deployRequest.flinkVersion.flinkHome}
          |    flinkVersion     : ${deployRequest.flinkVersion.version}
          |    execMode         : ${deployRequest.executionMode.name()}
@@ -196,24 +197,38 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
     } catch {
       case e: Exception =>
         logError(s"start flink session fail in ${deployRequest.executionMode} 
mode")
-        e.printStackTrace()
         throw e
     } finally {
       Utils.close(client, clusterDescriptor, kubeClient)
     }
   }
 
+  @throws[Exception]
   def shutdown(deployRequest: DeployRequest): ShutDownResponse = {
     val shutDownRequest = deployRequest.asInstanceOf[KubernetesDeployRequest]
+    logInfo(
+      s"""
+         |--------------------------------------- kubernetes session cluster 
shutdown ---------------------------------------
+         |    userFlinkHome     : ${shutDownRequest.flinkVersion.version}
+         |    namespace         : ${shutDownRequest.kubernetesNamespace}
+         |    clusterId         : ${shutDownRequest.clusterId}
+         |    execMode          : ${shutDownRequest.executionMode.getName}
+         |    flinkImage        : ${shutDownRequest.flinkImage}
+         |    exposedType       : 
${shutDownRequest.flinkRestExposedType.getName}
+         |    kubeConf          : ${shutDownRequest.kubeConf}
+         |    serviceAccount    : ${shutDownRequest.serviceAccount}
+         |    properties        : ${shutDownRequest.properties.mkString(" ")}
+         
|-------------------------------------------------------------------------------------------
+         |""".stripMargin)
     var kubeClient: FlinkKubeClient = null
     try {
       val flinkConfig = 
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
       shutDownRequest.properties.foreach(
-        m =>
-          m._2 match {
-            case v if v != null => flinkConfig.setString(m._1, m._2.toString)
-            case _ =>
-          })
+        p => {
+          if (p._2 != null) {
+            flinkConfig.setString(p._1, s"${p._2}")
+          }
+        })
       flinkConfig
         .safeSet(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.SESSION.getName)
         .safeSet(KubernetesConfigOptions.NAMESPACE, 
shutDownRequest.kubernetesNamespace)
@@ -227,21 +242,14 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
           KubernetesConfigOptions.KUBE_CONFIG_FILE,
           getDefaultKubernetesConf(shutDownRequest.kubeConf))
       kubeClient = 
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
-      val flinkKubeClient = new FlinkKubernetesClient(kubeClient)
-      val k8sService = flinkKubeClient.getService(shutDownRequest.clusterId)
-      if (k8sService.isPresent) {
-        kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
-        KubernetesDeploymentHelper.delete(
-          shutDownRequest.kubernetesNamespace,
-          shutDownRequest.clusterId)
-        ShutDownResponse()
-      } else {
-        null
-      }
+      kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
+      KubernetesDeploymentHelper.delete(
+        shutDownRequest.kubernetesNamespace,
+        shutDownRequest.clusterId)
+      ShutDownResponse()
     } catch {
       case e: Exception =>
         logError(s"shutdown flink session fail in 
${shutDownRequest.executionMode} mode")
-        e.printStackTrace()
         throw e
     } finally {
       Utils.close(kubeClient)

Reply via email to