This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new a2be95605 [Improve] flink cluster on k8s session shutdown bug fixed 
(#3485)
a2be95605 is described below

commit a2be95605d1412718b26d682f3080c14a1f59a4f
Author: benjobs <[email protected]>
AuthorDate: Thu Jan 11 23:57:36 2024 +0800

    [Improve] flink cluster on k8s session shutdown bug fixed (#3485)
    
    * [Improve] k8s application|session start|stop improvement
    
    * [Improve] flink job on k8s-application mode cancel improvement
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../flink/client/bean/SubmitRequest.scala          |  4 +-
 .../impl/KubernetesNativeApplicationClient.scala   | 21 +++++----
 .../impl/KubernetesNativeSessionClient.scala       | 38 +++++-----------
 .../flink/client/impl/YarnApplicationClient.scala  |  8 +++-
 .../flink/client/impl/YarnPerJobClient.scala       | 10 ++--
 .../flink/client/impl/YarnSessionClient.scala      | 53 +++-------------------
 .../client/tool/FlinkSessionClientHelper.scala     |  1 -
 .../flink/client/trait/FlinkClientTrait.scala      |  6 +--
 .../client/trait/KubernetesNativeClientTrait.scala | 17 ++++---
 .../flink/client/trait/YarnClientTrait.scala       | 12 +++--
 .../helper/KubernetesDeploymentHelper.scala        | 25 ++++------
 11 files changed, 71 insertions(+), 124 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 1d7725fbb..96afd4b08 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -66,7 +66,7 @@ case class SubmitRequest(
     @Nullable k8sSubmitParam: KubernetesSubmitParam,
     @Nullable extraParameter: JavaMap[String, Any]) {
 
-  lazy val appProperties: Map[String, String] = 
getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
+  private lazy val appProperties: Map[String, String] = 
getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
 
   lazy val appOption: Map[String, String] = 
getParameterMap(KEY_FLINK_OPTION_PREFIX)
 
@@ -80,7 +80,7 @@ case class SubmitRequest(
 
   lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
 
-  lazy val allowNonRestoredState = Try(
+  lazy val allowNonRestoredState: Boolean = Try(
     
properties.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean)
     .getOrElse(false)
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index 0d8f27a42..f5623113e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -21,7 +21,6 @@ import org.apache.streampark.common.enums.ExecutionMode
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
 import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
 
 import com.google.common.collect.Lists
@@ -89,15 +88,17 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
     }
   }
 
-  override def doCancel(
-      cancelRequest: CancelRequest,
-      flinkConfig: Configuration): CancelResponse = {
-    flinkConfig.safeSet(
-      DeploymentOptions.TARGET,
-      ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
-    val resp = super.doCancel(cancelRequest, flinkConfig)
-    KubernetesDeploymentHelper.delete(cancelRequest.kubernetesNamespace, 
cancelRequest.clusterId)
-    resp
+  override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
+    flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
+    executeClientAction(
+      cancelRequest,
+      flinkConf,
+      (jobId, client) => {
+        val resp = super.cancelJob(cancelRequest, jobId, client)
+        client.shutDownCluster()
+        CancelResponse(resp)
+      }
+    )
   }
 
   override def doTriggerSavepoint(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 0cffc30ba..dce9a7865 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -25,7 +25,6 @@ import 
org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
 import org.apache.streampark.flink.core.FlinkKubernetesClient
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
-import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
 
 import io.fabric8.kubernetes.api.model.{Config => _}
@@ -128,11 +127,9 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
     }
   }
 
-  override def doCancel(
-      cancelRequest: CancelRequest,
-      flinkConfig: Configuration): CancelResponse = {
-    flinkConfig.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
-    super.doCancel(cancelRequest, flinkConfig)
+  override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
+    flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
+    super.doCancel(cancelRequest, flinkConf)
   }
 
   @throws[Exception]
@@ -206,30 +203,19 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
          |""".stripMargin)
 
     val flinkConfig = this.getFlinkK8sConfig(shutDownRequest)
-    val kubeClient = 
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
+    val clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
+    val client = clusterDescriptor
+      .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
+      .getClusterClient
     try {
-      val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
-      val kubeService = kubeClientWrapper.getService(deployRequest.clusterId)
-      if (kubeService.isPresent) {
-        kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
-      } else {
-        val kubernetesClusterDescriptor = 
getK8sClusterDescriptorAndSpecification(flinkConfig)
-        val clusterDescriptor = kubernetesClusterDescriptor._1
-        val client = 
clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient
-        if (client != null) {
-          client.shutDownCluster()
-        }
-      }
-      KubernetesDeploymentHelper.delete(
-        shutDownRequest.kubernetesNamespace,
-        shutDownRequest.clusterId)
+      client.shutDownCluster()
       ShutDownResponse()
     } catch {
       case e: Exception =>
         logError(s"shutdown flink session fail in 
${shutDownRequest.executionMode} mode")
         throw e
     } finally {
-      Utils.close(kubeClient)
+      Utils.close(client)
     }
   }
 
@@ -251,8 +237,8 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
 
   override def doTriggerSavepoint(
       request: TriggerSavepointRequest,
-      flinkConfig: Configuration): SavepointResponse = {
-    flinkConfig.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
-    super.doTriggerSavepoint(request, flinkConfig)
+      flinkConf: Configuration): SavepointResponse = {
+    flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
+    super.doTriggerSavepoint(request, flinkConf)
   }
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 1f4fb0ed3..8601c70a3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -30,7 +30,7 @@ import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration._
 import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
 import org.apache.flink.runtime.util.HadoopUtils
-import org.apache.flink.yarn.configuration.YarnConfigOptions
+import org.apache.flink.yarn.configuration.{YarnConfigOptions, 
YarnDeploymentTarget}
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
 
@@ -135,4 +135,10 @@ object YarnApplicationClient extends YarnClientTrait {
     })
   }
 
+  override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
+    flinkConf
+      .safeSet(DeploymentOptions.TARGET, 
YarnDeploymentTarget.APPLICATION.getName)
+    super.doCancel(cancelRequest, flinkConf)
+  }
+
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 244737d08..71499e290 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -120,12 +120,12 @@ object YarnPerJobClient extends YarnClientTrait {
     }
   }
 
-  override def doCancel(
-      cancelRequest: CancelRequest,
-      flinkConfig: Configuration): CancelResponse = {
-    val response = super.doCancel(cancelRequest, flinkConfig)
+  override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
+    flinkConf
+      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
+    val response = super.doCancel(cancelRequest, flinkConf)
     val clusterClientFactory = new YarnClusterClientFactory
-    val clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(flinkConfig)
+    val clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(flinkConf)
     
clusterDescriptor.killCluster(ApplicationId.fromString(cancelRequest.clusterId))
     response
   }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 91eb1d92d..edcd6ed75 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -21,7 +21,6 @@ import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 
-import org.apache.flink.api.common.JobID
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
 import org.apache.flink.configuration._
@@ -36,8 +35,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 import java.util
 
 import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
 
 /** Submit Job to YARN Session Cluster */
 object YarnSessionClient extends YarnClientTrait {
@@ -135,58 +132,20 @@ object YarnSessionClient extends YarnClientTrait {
     }
   }
 
-  private[this] def executeClientAction[O, R <: SavepointRequestTrait](
-      request: R,
-      flinkConfig: Configuration,
-      actFunc: (JobID, ClusterClient[_]) => O): O = {
-    flinkConfig
-      .safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
-      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
-    logInfo(s"""
-               
|------------------------------------------------------------------
-               |Effective submit configuration: $flinkConfig
-               
|------------------------------------------------------------------
-               |""".stripMargin)
-
-    var clusterDescriptor: YarnClusterDescriptor = null
-    var client: ClusterClient[ApplicationId] = null
-    try {
-      val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
-      clusterDescriptor = yarnClusterDescriptor._2
-      client = 
clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient
-      actFunc(JobID.fromHexString(request.jobId), client)
-    } catch {
-      case e: Exception =>
-        logError(s"${request.getClass.getSimpleName} for flink yarn session 
job fail")
-        e.printStackTrace()
-        throw e
-    } finally {
-      Utils.close(client, clusterDescriptor)
-    }
-  }
-
   override def doCancel(
       cancelRequest: CancelRequest,
       flinkConfig: Configuration): CancelResponse = {
-    executeClientAction(
-      cancelRequest,
-      flinkConfig,
-      (jobID, clusterClient) => {
-        val actionResult = super.cancelJob(cancelRequest, jobID, clusterClient)
-        CancelResponse(actionResult)
-      })
+    flinkConfig
+      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+    super.doCancel(cancelRequest, flinkConfig)
   }
 
   override def doTriggerSavepoint(
       request: TriggerSavepointRequest,
       flinkConfig: Configuration): SavepointResponse = {
-    executeClientAction(
-      request,
-      flinkConfig,
-      (jobID, clusterClient) => {
-        val actionResult = super.triggerSavepoint(request, jobID, 
clusterClient)
-        SavepointResponse(actionResult)
-      })
+    flinkConfig
+      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+    super.doTriggerSavepoint(request, flinkConfig)
   }
 
   def deploy(deployRequest: DeployRequest): DeployResponse = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
index 63d5e8c64..2e1945e96 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
@@ -27,7 +27,6 @@ import 
org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder
 import org.apache.hc.client5.http.fluent.Request
 import org.apache.hc.core5.http.ContentType
 import org.apache.hc.core5.http.io.entity.StringEntity
-import org.apache.hc.core5.util.Timeout
 import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods._
 import org.json4s.jackson.Serialization
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 0ef862883..5f536a3e9 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -470,10 +470,10 @@ trait FlinkClientTrait extends Logger {
     val savePointDir: String = tryGetSavepointPathIfNeed(cancelRequest)
 
     val clientWrapper = new FlinkClusterClient(client)
+    val withSavepoint = Try(cancelRequest.withSavepoint).getOrElse(false)
+    val withDrain = Try(cancelRequest.withDrain).getOrElse(false)
 
-    (
-      Try(cancelRequest.withSavepoint).getOrElse(false),
-      Try(cancelRequest.withDrain).getOrElse(false)) match {
+    (withSavepoint, withDrain) match {
       case (false, false) =>
         client.cancel(jobID).get()
         null
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 2dd756b8f..ff65a998e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -85,13 +85,17 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
     executeClientAction(
       cancelRequest,
       flinkConfig,
-      (jobId, clusterClient) => {
-        val actionResult = super.cancelJob(cancelRequest, jobId, clusterClient)
-        CancelResponse(actionResult)
-      })
+      (jobId, client) => {
+        val resp = super.cancelJob(cancelRequest, jobId, client)
+        if (cancelRequest.executionMode == 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
+          client.shutDownCluster()
+        }
+        CancelResponse(resp)
+      }
+    )
   }
 
-  private[this] def executeClientAction[O, R <: SavepointRequestTrait](
+  private[client] def executeClientAction[O, R <: SavepointRequestTrait](
       request: R,
       flinkConfig: Configuration,
       actFunc: (JobID, ClusterClient[_]) => O): O = {
@@ -157,8 +161,7 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
 
   def getK8sClusterDescriptor(flinkConfig: Configuration): 
KubernetesClusterDescriptor = {
     val clientFactory = new KubernetesClusterClientFactory()
-    val clusterDescriptor = clientFactory.createClusterDescriptor(flinkConfig)
-    clusterDescriptor
+    clientFactory.createClusterDescriptor(flinkConfig)
   }
 
   protected def flinkConfIdentifierInfo(@Nonnull conf: Configuration): String =
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 32991b109..4a19067e4 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -77,8 +77,8 @@ trait YarnClientTrait extends FlinkClientTrait {
     executeClientAction(
       request,
       flinkConf,
-      (jid, client) => {
-        SavepointResponse(super.triggerSavepoint(request, jid, client))
+      (jobID, client) => {
+        SavepointResponse(super.triggerSavepoint(request, jobID, client))
       })
   }
 
@@ -86,9 +86,11 @@ trait YarnClientTrait extends FlinkClientTrait {
     executeClientAction(
       cancelRequest,
       flinkConf,
-      (jid, client) => {
-        CancelResponse(super.cancelJob(cancelRequest, jid, client))
-      })
+      (jobId, client) => {
+        val resp = super.cancelJob(cancelRequest, jobId, client)
+        CancelResponse(resp)
+      }
+    )
   }
 
   private lazy val deployInternalMethod: Method = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 590cd806d..e28922002 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -68,30 +68,21 @@ object KubernetesDeploymentHelper extends Logger {
     }.getOrElse(true)
   }
 
-  private[this] def deleteDeployment(nameSpace: String, deploymentName: 
String): Boolean = {
+  private[this] def deleteDeployment(nameSpace: String, deploymentName: 
String): Unit = {
     using(KubernetesRetriever.newK8sClient()) {
       client =>
-        Try {
-          val r = client.apps.deployments
-            .inNamespace(nameSpace)
-            .withName(deploymentName)
-            .delete
-          Boolean.unbox(r)
-        }.getOrElse(false)
+        val map = client.apps.deployments.inNamespace(nameSpace)
+        map.withLabel("app", deploymentName).delete
+        map.withName(deploymentName).delete
     }
   }
 
-  private[this] def deleteConfigMap(nameSpace: String, deploymentName: 
String): Boolean = {
+  private[this] def deleteConfigMap(nameSpace: String, deploymentName: 
String): Unit = {
     using(KubernetesRetriever.newK8sClient()) {
       client =>
-        Try {
-          val r = client
-            .configMaps()
-            .inNamespace(nameSpace)
-            .withLabel("app", deploymentName)
-            .delete
-          Boolean.unbox(r)
-        }.getOrElse(false)
+        val map = client.configMaps().inNamespace(nameSpace)
+        map.withLabel("app", deploymentName).delete
+        map.withName(deploymentName).delete
     }
   }
 

Reply via email to