This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 2006d9e6b [Improve] flink k8s session cluster shutdown bug fixed 
(#3476)
2006d9e6b is described below

commit 2006d9e6b98a1b14367af60999a5efe01e11956a
Author: benjobs <[email protected]>
AuthorDate: Tue Jan 9 23:37:45 2024 +0800

    [Improve] flink k8s session cluster shutdown bug fixed (#3476)
    
    * [Imprpve] k8s cluster shutdown bug fixed.
    
    * [hotfix][ci] Exclude twitter website from dead link check
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .dlc.json                                          |   3 +
 .../core/controller/FlinkClusterController.java    |  12 +--
 .../console/core/service/FlinkClusterService.java  |   6 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  12 +--
 .../core/service/impl/FlinkClusterServiceImpl.java | 100 ++++++++-------------
 .../streampark/flink/client/FlinkClient.scala      |   6 +-
 .../flink/client/bean/DeployRequest.scala          |  47 +++++++---
 .../flink/client/bean/ShutDownRequest.scala        |  32 -------
 .../flink/client/FlinkClientHandler.scala          |   2 +-
 .../impl/KubernetesNativeApplicationClient.scala   |   5 +-
 .../impl/KubernetesNativeSessionClient.scala       |  93 ++++++++++---------
 .../flink/client/impl/YarnSessionClient.scala      |   2 +-
 12 files changed, 141 insertions(+), 179 deletions(-)

diff --git a/.dlc.json b/.dlc.json
index e80f2c3f0..99c96f25f 100644
--- a/.dlc.json
+++ b/.dlc.json
@@ -17,6 +17,9 @@
     },
     {
       "pattern": "^https://opencollective.com";
+    },
+    {
+      "pattern": "^https://twitter.com*";
     }
   ],
   "timeout": "30s",
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 148cd554b..2b01e2a54 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -91,22 +91,22 @@ public class FlinkClusterController {
 
   @Operation(summary = "Start flink cluster")
   @PostMapping("start")
-  public RestResponse start(FlinkCluster cluster) {
-    flinkClusterService.start(cluster);
+  public RestResponse start(Long id) {
+    flinkClusterService.start(id);
     return RestResponse.success();
   }
 
   @Operation(summary = "Shutdown flink cluster")
   @PostMapping("shutdown")
-  public RestResponse shutdown(FlinkCluster cluster) {
-    flinkClusterService.shutdown(cluster);
+  public RestResponse shutdown(Long id) {
+    flinkClusterService.shutdown(id);
     return RestResponse.success();
   }
 
   @Operation(summary = "Delete flink cluster")
   @PostMapping("delete")
-  public RestResponse delete(FlinkCluster cluster) {
-    flinkClusterService.delete(cluster);
+  public RestResponse delete(Long id) {
+    flinkClusterService.delete(id);
     return RestResponse.success();
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index d149152af..e9d0397ef 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -32,13 +32,13 @@ public interface FlinkClusterService extends 
IService<FlinkCluster> {
 
   Boolean create(FlinkCluster flinkCluster);
 
-  void delete(FlinkCluster flinkCluster);
+  void delete(Long id);
 
   void update(FlinkCluster flinkCluster);
 
-  void start(FlinkCluster flinkCluster);
+  void start(Long id);
 
-  void shutdown(FlinkCluster flinkCluster);
+  void shutdown(Long id);
 
   Boolean existsByClusterId(String clusterId, Long id);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 605c7fe0e..ed55c27cc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1147,7 +1147,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (isKubernetesApp(application)) {
       KubernetesDeploymentHelper.watchPodTerminatedLog(
           application.getK8sNamespace(), application.getJobName(), 
application.getJobId());
-      KubernetesDeploymentHelper.delete(application.getK8sNamespace(), 
application.getJobName());
     }
     if (startFuture != null) {
       startFuture.cancel(true);
@@ -1355,7 +1354,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               }
               // re-tracking flink job on kubernetes and logging exception
               if (isKubernetesApp(application)) {
-                KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
                 k8SFlinkTrackMonitor.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(application.getId());
@@ -1382,7 +1380,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           }
 
           if (isKubernetesApp(application)) {
-            KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
             k8SFlinkTrackMonitor.unWatching(trackId);
           }
         });
@@ -1560,13 +1557,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
     }
 
-    TrackId trackId;
-    if (isKubernetesApp(application)) {
-      trackId = toTrackId(application);
-      KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
-    } else {
-      trackId = null;
-    }
+    TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : 
null;
 
     KubernetesSubmitParam kubernetesSubmitParam =
         new KubernetesSubmitParam(
@@ -1764,7 +1755,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // re-tracking flink job on kubernetes and logging exception
     if (isKubernetesApp(application)) {
       TrackId id = toTrackId(application);
-      KubernetesDeploymentHelper.delete(id.namespace(), id.clusterId());
       k8SFlinkTrackMonitor.doWatching(id);
     } else {
       FlinkRESTAPIWatcher.unWatching(application.getId());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index fde4c00e6..ef1f4a283 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -35,8 +35,7 @@ import 
org.apache.streampark.console.core.service.YarnQueueService;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.DeployRequest;
 import org.apache.streampark.flink.client.bean.DeployResponse;
-import org.apache.streampark.flink.client.bean.KubernetesDeployParam;
-import org.apache.streampark.flink.client.bean.ShutDownRequest;
+import org.apache.streampark.flink.client.bean.KubernetesDeployRequest;
 import org.apache.streampark.flink.client.bean.ShutDownResponse;
 
 import org.apache.commons.lang3.StringUtils;
@@ -149,37 +148,12 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
 
   @Override
   @Transactional(rollbackFor = {Exception.class})
-  public void start(FlinkCluster cluster) {
-    FlinkCluster flinkCluster = getById(cluster.getId());
+  public void start(Long id) {
+    FlinkCluster flinkCluster = getById(id);
     try {
       ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
-      KubernetesDeployParam kubernetesDeployParam = null;
-      switch (executionModeEnum) {
-        case YARN_SESSION:
-          break;
-        case KUBERNETES_NATIVE_SESSION:
-          kubernetesDeployParam =
-              new KubernetesDeployParam(
-                  flinkCluster.getClusterId(),
-                  flinkCluster.getK8sNamespace(),
-                  flinkCluster.getK8sConf(),
-                  flinkCluster.getServiceAccount(),
-                  flinkCluster.getFlinkImage(),
-                  flinkCluster.getK8sRestExposedTypeEnum());
-          break;
-        default:
-          throw new ApiAlertException(
-              "the ExecutionModeEnum " + executionModeEnum.getName() + "can't 
start!");
-      }
-      FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
-      DeployRequest deployRequest =
-          new DeployRequest(
-              flinkEnv.getFlinkVersion(),
-              executionModeEnum,
-              flinkCluster.getProperties(),
-              flinkCluster.getClusterId(),
-              kubernetesDeployParam);
-      log.info("deploy cluster request " + deployRequest);
+      DeployRequest deployRequest = getDeployRequest(flinkCluster);
+      log.info("deploy cluster request: " + deployRequest);
       Future<DeployResponse> future =
           executorService.submit(() -> FlinkClient.deploy(deployRequest));
       DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
@@ -208,6 +182,33 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     }
   }
 
+  private DeployRequest getDeployRequest(FlinkCluster flinkCluster) {
+    ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
+    FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
+    switch (executionModeEnum) {
+      case YARN_SESSION:
+        return DeployRequest.apply(
+            flinkEnv.getFlinkVersion(),
+            executionModeEnum,
+            flinkCluster.getProperties(),
+            flinkCluster.getClusterId());
+      case KUBERNETES_NATIVE_SESSION:
+        return KubernetesDeployRequest.apply(
+            flinkEnv.getFlinkVersion(),
+            executionModeEnum,
+            flinkCluster.getProperties(),
+            flinkCluster.getClusterId(),
+            flinkCluster.getK8sNamespace(),
+            flinkCluster.getK8sConf(),
+            flinkCluster.getServiceAccount(),
+            flinkCluster.getFlinkImage(),
+            flinkCluster.getK8sRestExposedTypeEnum());
+      default:
+        throw new ApiAlertException(
+            "the ExecutionModeEnum " + executionModeEnum.getName() + "can't 
start!");
+    }
+  }
+
   @Override
   public void update(FlinkCluster cluster) {
     FlinkCluster flinkCluster = getById(cluster.getId());
@@ -242,29 +243,11 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   }
 
   @Override
-  public void shutdown(FlinkCluster cluster) {
-    FlinkCluster flinkCluster = this.getById(cluster.getId());
+  public void shutdown(Long id) {
+    FlinkCluster flinkCluster = this.getById(id);
     // 1) check mode
     ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
     String clusterId = flinkCluster.getClusterId();
-    KubernetesDeployParam kubernetesDeployParam = null;
-    switch (executionModeEnum) {
-      case YARN_SESSION:
-        break;
-      case KUBERNETES_NATIVE_SESSION:
-        kubernetesDeployParam =
-            new KubernetesDeployParam(
-                flinkCluster.getClusterId(),
-                flinkCluster.getK8sNamespace(),
-                flinkCluster.getK8sConf(),
-                flinkCluster.getServiceAccount(),
-                flinkCluster.getFlinkImage(),
-                flinkCluster.getK8sRestExposedTypeEnum());
-        break;
-      default:
-        throw new ApiAlertException(
-            "the ExecutionModeEnum " + executionModeEnum.getName() + "can't 
shutdown!");
-    }
     if (StringUtils.isBlank(clusterId)) {
       throw new ApiAlertException("the clusterId can not be empty!");
     }
@@ -291,18 +274,10 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     }
 
     // 4) shutdown
-    FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
-    ShutDownRequest stopRequest =
-        new ShutDownRequest(
-            flinkEnv.getFlinkVersion(),
-            executionModeEnum,
-            flinkCluster.getProperties(),
-            clusterId,
-            kubernetesDeployParam);
-
+    DeployRequest deployRequest = getDeployRequest(flinkCluster);
     try {
       Future<ShutDownResponse> future =
-          executorService.submit(() -> FlinkClient.shutdown(stopRequest));
+          executorService.submit(() -> FlinkClient.shutdown(deployRequest));
       ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
       if (shutDownResponse != null) {
         flinkCluster.setAddress(null);
@@ -350,8 +325,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   }
 
   @Override
-  public void delete(FlinkCluster cluster) {
-    Long id = cluster.getId();
+  public void delete(Long id) {
     FlinkCluster flinkCluster = getById(id);
     if (flinkCluster == null) {
       throw new ApiAlertException("flink cluster not exist, please check.");
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
index c2ea9ba48..3c7c65dcb 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
@@ -40,7 +40,7 @@ object FlinkClient extends Logger {
     "org.apache.streampark.flink.client.bean.CancelRequest" -> "cancel"
 
   private[this] val SHUTDOWN_REQUEST =
-    "org.apache.streampark.flink.client.bean.ShutDownRequest" -> "shutdown"
+    "org.apache.streampark.flink.client.bean.DeployRequest" -> "shutdown"
 
   private[this] val SAVEPOINT_REQUEST =
     "org.apache.streampark.flink.client.bean.TriggerSavepointRequest" -> 
"triggerSavepoint"
@@ -57,8 +57,8 @@ object FlinkClient extends Logger {
     proxy[DeployResponse](deployRequest, deployRequest.flinkVersion, 
DEPLOY_REQUEST)
   }
 
-  def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
-    proxy[ShutDownResponse](shutDownRequest, shutDownRequest.flinkVersion, 
SHUTDOWN_REQUEST)
+  def shutdown(deployRequest: DeployRequest): ShutDownResponse = {
+    proxy[ShutDownResponse](deployRequest, deployRequest.flinkVersion, 
SHUTDOWN_REQUEST)
   }
 
   def triggerSavepoint(savepointRequest: TriggerSavepointRequest): 
SavepointResponse = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
index db7bbcd3d..81b36a17b 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
@@ -24,8 +24,6 @@ import org.apache.streampark.flink.util.FlinkUtils
 import org.apache.commons.io.FileUtils
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 
-import javax.annotation.Nullable
-
 import java.io.File
 import java.util.{Map => JavaMap}
 
@@ -33,8 +31,7 @@ case class DeployRequest(
     flinkVersion: FlinkVersion,
     executionMode: ExecutionMode,
     properties: JavaMap[String, Any],
-    clusterId: String,
-    @Nullable k8sDeployParam: KubernetesDeployParam) {
+    clusterId: String) {
 
   private[client] lazy val hdfsWorkspace = {
 
@@ -63,10 +60,38 @@ case class DeployRequest(
   }
 }
 
-case class KubernetesDeployParam(
-    clusterId: String,
-    kubernetesNamespace: String = 
KubernetesConfigOptions.NAMESPACE.defaultValue(),
-    kubeConf: String = "~/.kube/config",
-    serviceAccount: String = 
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(),
-    flinkImage: String = 
KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
-    @Nullable flinkRestExposedType: FlinkK8sRestExposedType = 
FlinkK8sRestExposedType.CLUSTER_IP)
+class KubernetesDeployRequest(
+    override val flinkVersion: FlinkVersion,
+    override val executionMode: ExecutionMode,
+    override val properties: JavaMap[String, Any],
+    override val clusterId: String,
+    val kubernetesNamespace: String = 
KubernetesConfigOptions.NAMESPACE.defaultValue(),
+    val kubeConf: String = "~/.kube/config",
+    val serviceAccount: String = 
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(),
+    val flinkImage: String = 
KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
+    val flinkRestExposedType: FlinkK8sRestExposedType = 
FlinkK8sRestExposedType.CLUSTER_IP)
+  extends DeployRequest(flinkVersion, executionMode, properties, clusterId)
+
+object KubernetesDeployRequest {
+  def apply(
+      flinkVersion: FlinkVersion,
+      executionMode: ExecutionMode,
+      properties: JavaMap[String, Any],
+      clusterId: String,
+      kubernetesNamespace: String,
+      kubeConf: String,
+      serviceAccount: String,
+      flinkImage: String,
+      flinkRestExposedType: FlinkK8sRestExposedType): KubernetesDeployRequest 
= {
+    new KubernetesDeployRequest(
+      flinkVersion,
+      executionMode,
+      properties,
+      clusterId,
+      kubernetesNamespace,
+      kubeConf,
+      serviceAccount,
+      flinkImage,
+      flinkRestExposedType)
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala
deleted file mode 100644
index a6b75bbdf..000000000
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.client.bean
-
-import org.apache.streampark.common.conf.FlinkVersion
-import org.apache.streampark.common.enums.ExecutionMode
-
-import javax.annotation.Nullable
-
-import java.util.{Map => JavaMap}
-
-case class ShutDownRequest(
-    flinkVersion: FlinkVersion,
-    executionMode: ExecutionMode,
-    @Nullable properties: JavaMap[String, Any],
-    clusterId: String,
-    @Nullable kubernetesDeployParam: KubernetesDeployParam)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
index e85ceb6ee..c6fa402e9 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
@@ -70,7 +70,7 @@ object FlinkClientHandler {
     }
   }
 
-  def shutdown(request: ShutDownRequest): ShutDownResponse = {
+  def shutdown(request: DeployRequest): ShutDownResponse = {
     request.executionMode match {
       case YARN_SESSION => YarnSessionClient.shutdown(request)
       case KUBERNETES_NATIVE_SESSION => 
KubernetesNativeSessionClient.shutdown(request)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index be57adb41..0d8f27a42 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.enums.ExecutionMode
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
 import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
 
 import com.google.common.collect.Lists
@@ -94,7 +95,9 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
     flinkConfig.safeSet(
       DeploymentOptions.TARGET,
       ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
-    super.doCancel(cancelRequest, flinkConfig)
+    val resp = super.doCancel(cancelRequest, flinkConfig)
+    KubernetesDeploymentHelper.delete(cancelRequest.kubernetesNamespace, 
cancelRequest.clusterId)
+    resp
   }
 
   override def doTriggerSavepoint(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 621b7ac8a..0aee25bbe 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -25,6 +25,7 @@ import 
org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
 import org.apache.streampark.flink.core.FlinkKubernetesClient
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
 
 import io.fabric8.kubernetes.api.model.{Config => _}
@@ -134,18 +135,20 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
     super.doCancel(cancelRequest, flinkConfig)
   }
 
-  def deploy(deployRequest: DeployRequest): DeployResponse = {
+  @throws[Exception]
+  def deploy(deployReq: DeployRequest): DeployResponse = {
+    val deployRequest = deployReq.asInstanceOf[KubernetesDeployRequest]
     logInfo(
       s"""
-         |--------------------------------------- kubernetes session start 
---------------------------------------
+         |--------------------------------------- kubernetes session cluster 
start ---------------------------------------
          |    userFlinkHome    : ${deployRequest.flinkVersion.flinkHome}
          |    flinkVersion     : ${deployRequest.flinkVersion.version}
          |    execMode         : ${deployRequest.executionMode.name()}
          |    clusterId        : ${deployRequest.clusterId}
-         |    namespace        : 
${deployRequest.k8sDeployParam.kubernetesNamespace}
-         |    exposedType      : 
${deployRequest.k8sDeployParam.flinkRestExposedType}
-         |    serviceAccount   : ${deployRequest.k8sDeployParam.serviceAccount}
-         |    flinkImage       : ${deployRequest.k8sDeployParam.flinkImage}
+         |    namespace        : ${deployRequest.kubernetesNamespace}
+         |    exposedType      : ${deployRequest.flinkRestExposedType}
+         |    serviceAccount   : ${deployRequest.serviceAccount}
+         |    flinkImage       : ${deployRequest.flinkImage}
          |    properties       : ${deployRequest.properties.mkString(" ")}
          
|-------------------------------------------------------------------------------------------
          |""".stripMargin)
@@ -157,20 +160,16 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
         extractConfiguration(deployRequest.flinkVersion.flinkHome, 
deployRequest.properties)
       flinkConfig
         .safeSet(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.SESSION.getName)
-        .safeSet(
-          KubernetesConfigOptions.NAMESPACE,
-          deployRequest.k8sDeployParam.kubernetesNamespace)
-        .safeSet(
-          KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
-          deployRequest.k8sDeployParam.serviceAccount)
+        .safeSet(KubernetesConfigOptions.NAMESPACE, 
deployRequest.kubernetesNamespace)
+        .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, 
deployRequest.serviceAccount)
         .safeSet(
           KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-          
ServiceExposedType.valueOf(deployRequest.k8sDeployParam.flinkRestExposedType.getName))
+          
ServiceExposedType.valueOf(deployRequest.flinkRestExposedType.getName))
         .safeSet(KubernetesConfigOptions.CLUSTER_ID, deployRequest.clusterId)
-        .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
deployRequest.k8sDeployParam.flinkImage)
+        .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
deployRequest.flinkImage)
         .safeSet(
           KubernetesConfigOptions.KUBE_CONFIG_FILE,
-          getDefaultKubernetesConf(deployRequest.k8sDeployParam.kubeConf))
+          getDefaultKubernetesConf(deployRequest.kubeConf))
         .safeSet(
           DeploymentOptionsInternal.CONF_DIR,
           s"${deployRequest.flinkVersion.flinkHome}/conf")
@@ -198,59 +197,59 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
     } catch {
       case e: Exception =>
         logError(s"start flink session fail in ${deployRequest.executionMode} 
mode")
-        e.printStackTrace()
         throw e
     } finally {
       Utils.close(client, clusterDescriptor, kubeClient)
     }
   }
 
-  def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+  @throws[Exception]
+  def shutdown(deployRequest: DeployRequest): ShutDownResponse = {
+    val shutDownRequest = deployRequest.asInstanceOf[KubernetesDeployRequest]
+    logInfo(
+      s"""
+         |--------------------------------------- kubernetes session cluster 
shutdown ---------------------------------------
+         |    userFlinkHome     : ${shutDownRequest.flinkVersion.version}
+         |    namespace         : ${shutDownRequest.kubernetesNamespace}
+         |    clusterId         : ${shutDownRequest.clusterId}
+         |    execMode          : ${shutDownRequest.executionMode.getName}
+         |    flinkImage        : ${shutDownRequest.flinkImage}
+         |    exposedType       : 
${shutDownRequest.flinkRestExposedType.getName}
+         |    kubeConf          : ${shutDownRequest.kubeConf}
+         |    serviceAccount    : ${shutDownRequest.serviceAccount}
+         |    properties        : ${shutDownRequest.properties.mkString(" ")}
+         
|-------------------------------------------------------------------------------------------
+         |""".stripMargin)
     var kubeClient: FlinkKubeClient = null
     try {
       val flinkConfig = 
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
       shutDownRequest.properties.foreach(
-        m =>
-          m._2 match {
-            case v if v != null => flinkConfig.setString(m._1, m._2.toString)
-            case _ =>
-          })
+        p => {
+          if (p._2 != null) {
+            flinkConfig.setString(p._1, s"${p._2}")
+          }
+        })
       flinkConfig
         .safeSet(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.SESSION.getName)
-        .safeSet(
-          KubernetesConfigOptions.NAMESPACE,
-          shutDownRequest.kubernetesDeployParam.kubernetesNamespace)
-        .safeSet(
-          KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
-          shutDownRequest.kubernetesDeployParam.serviceAccount)
+        .safeSet(KubernetesConfigOptions.NAMESPACE, 
shutDownRequest.kubernetesNamespace)
+        .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, 
shutDownRequest.serviceAccount)
         .safeSet(
           KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-          ServiceExposedType.valueOf(
-            
shutDownRequest.kubernetesDeployParam.flinkRestExposedType.getName))
+          
ServiceExposedType.valueOf(shutDownRequest.flinkRestExposedType.getName))
         .safeSet(KubernetesConfigOptions.CLUSTER_ID, shutDownRequest.clusterId)
-        .safeSet(
-          KubernetesConfigOptions.CONTAINER_IMAGE,
-          shutDownRequest.kubernetesDeployParam.flinkImage)
+        .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, 
shutDownRequest.flinkImage)
         .safeSet(
           KubernetesConfigOptions.KUBE_CONFIG_FILE,
-          
getDefaultKubernetesConf(shutDownRequest.kubernetesDeployParam.kubeConf))
+          getDefaultKubernetesConf(shutDownRequest.kubeConf))
       kubeClient = 
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
-      val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
-
-      if (
-        shutDownRequest.clusterId != null && kubeClientWrapper
-          .getService(shutDownRequest.clusterId)
-          .isPresent
-      ) {
-        kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
-        ShutDownResponse()
-      } else {
-        null
-      }
+      kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
+      KubernetesDeploymentHelper.delete(
+        shutDownRequest.kubernetesNamespace,
+        shutDownRequest.clusterId)
+      ShutDownResponse()
     } catch {
       case e: Exception =>
         logError(s"shutdown flink session fail in 
${shutDownRequest.executionMode} mode")
-        e.printStackTrace()
         throw e
     } finally {
       Utils.close(kubeClient)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index d2a98b740..91eb1d92d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -245,7 +245,7 @@ object YarnSessionClient extends YarnClientTrait {
     }
   }
 
-  def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+  def shutdown(shutDownRequest: DeployRequest): ShutDownResponse = {
     var clusterDescriptor: YarnClusterDescriptor = null
     var client: ClusterClient[ApplicationId] = null
     try {

Reply via email to