This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new a2be95605 [Improve] flink cluster on k8s session shutdown bug fixed
(#3485)
a2be95605 is described below
commit a2be95605d1412718b26d682f3080c14a1f59a4f
Author: benjobs <[email protected]>
AuthorDate: Thu Jan 11 23:57:36 2024 +0800
[Improve] flink cluster on k8s session shutdown bug fixed (#3485)
* [Improve] k8s application|session start|stop improvement
* [Improve] flink job on k8s-application mode cancel improvement
---------
Co-authored-by: benjobs <[email protected]>
---
.../flink/client/bean/SubmitRequest.scala | 4 +-
.../impl/KubernetesNativeApplicationClient.scala | 21 +++++----
.../impl/KubernetesNativeSessionClient.scala | 38 +++++-----------
.../flink/client/impl/YarnApplicationClient.scala | 8 +++-
.../flink/client/impl/YarnPerJobClient.scala | 10 ++--
.../flink/client/impl/YarnSessionClient.scala | 53 +++-------------------
.../client/tool/FlinkSessionClientHelper.scala | 1 -
.../flink/client/trait/FlinkClientTrait.scala | 6 +--
.../client/trait/KubernetesNativeClientTrait.scala | 17 ++++---
.../flink/client/trait/YarnClientTrait.scala | 12 +++--
.../helper/KubernetesDeploymentHelper.scala | 25 ++++------
11 files changed, 71 insertions(+), 124 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 1d7725fbb..96afd4b08 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -66,7 +66,7 @@ case class SubmitRequest(
@Nullable k8sSubmitParam: KubernetesSubmitParam,
@Nullable extraParameter: JavaMap[String, Any]) {
- lazy val appProperties: Map[String, String] =
getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
+ private lazy val appProperties: Map[String, String] =
getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
lazy val appOption: Map[String, String] =
getParameterMap(KEY_FLINK_OPTION_PREFIX)
@@ -80,7 +80,7 @@ case class SubmitRequest(
lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
- lazy val allowNonRestoredState = Try(
+ lazy val allowNonRestoredState: Boolean = Try(
properties.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean)
.getOrElse(false)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index 0d8f27a42..f5623113e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -21,7 +21,6 @@ import org.apache.streampark.common.enums.ExecutionMode
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
import com.google.common.collect.Lists
@@ -89,15 +88,17 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
}
}
- override def doCancel(
- cancelRequest: CancelRequest,
- flinkConfig: Configuration): CancelResponse = {
- flinkConfig.safeSet(
- DeploymentOptions.TARGET,
- ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
- val resp = super.doCancel(cancelRequest, flinkConfig)
- KubernetesDeploymentHelper.delete(cancelRequest.kubernetesNamespace,
cancelRequest.clusterId)
- resp
+ override def doCancel(cancelRequest: CancelRequest, flinkConf:
Configuration): CancelResponse = {
+ flinkConf.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
+ executeClientAction(
+ cancelRequest,
+ flinkConf,
+ (jobId, client) => {
+ val resp = super.cancelJob(cancelRequest, jobId, client)
+ client.shutDownCluster()
+ CancelResponse(resp)
+ }
+ )
}
override def doTriggerSavepoint(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 0cffc30ba..dce9a7865 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -25,7 +25,6 @@ import
org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
import org.apache.streampark.flink.core.FlinkKubernetesClient
import org.apache.streampark.flink.kubernetes.KubernetesRetriever
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
-import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.streampark.flink.kubernetes.model.ClusterKey
import io.fabric8.kubernetes.api.model.{Config => _}
@@ -128,11 +127,9 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
}
}
- override def doCancel(
- cancelRequest: CancelRequest,
- flinkConfig: Configuration): CancelResponse = {
- flinkConfig.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
- super.doCancel(cancelRequest, flinkConfig)
+ override def doCancel(cancelRequest: CancelRequest, flinkConf:
Configuration): CancelResponse = {
+ flinkConf.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
+ super.doCancel(cancelRequest, flinkConf)
}
@throws[Exception]
@@ -206,30 +203,19 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
|""".stripMargin)
val flinkConfig = this.getFlinkK8sConfig(shutDownRequest)
- val kubeClient =
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
+ val clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
+ val client = clusterDescriptor
+ .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
+ .getClusterClient
try {
- val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
- val kubeService = kubeClientWrapper.getService(deployRequest.clusterId)
- if (kubeService.isPresent) {
- kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
- } else {
- val kubernetesClusterDescriptor =
getK8sClusterDescriptorAndSpecification(flinkConfig)
- val clusterDescriptor = kubernetesClusterDescriptor._1
- val client =
clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient
- if (client != null) {
- client.shutDownCluster()
- }
- }
- KubernetesDeploymentHelper.delete(
- shutDownRequest.kubernetesNamespace,
- shutDownRequest.clusterId)
+ client.shutDownCluster()
ShutDownResponse()
} catch {
case e: Exception =>
logError(s"shutdown flink session fail in
${shutDownRequest.executionMode} mode")
throw e
} finally {
- Utils.close(kubeClient)
+ Utils.close(client)
}
}
@@ -251,8 +237,8 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
override def doTriggerSavepoint(
request: TriggerSavepointRequest,
- flinkConfig: Configuration): SavepointResponse = {
- flinkConfig.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
- super.doTriggerSavepoint(request, flinkConfig)
+ flinkConf: Configuration): SavepointResponse = {
+ flinkConf.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
+ super.doTriggerSavepoint(request, flinkConf)
}
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 1f4fb0ed3..8601c70a3 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -30,7 +30,7 @@ import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration._
import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
import org.apache.flink.runtime.util.HadoopUtils
-import org.apache.flink.yarn.configuration.YarnConfigOptions
+import org.apache.flink.yarn.configuration.{YarnConfigOptions,
YarnDeploymentTarget}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
@@ -135,4 +135,10 @@ object YarnApplicationClient extends YarnClientTrait {
})
}
+ override def doCancel(cancelRequest: CancelRequest, flinkConf:
Configuration): CancelResponse = {
+ flinkConf
+ .safeSet(DeploymentOptions.TARGET,
YarnDeploymentTarget.APPLICATION.getName)
+ super.doCancel(cancelRequest, flinkConf)
+ }
+
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 244737d08..71499e290 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -120,12 +120,12 @@ object YarnPerJobClient extends YarnClientTrait {
}
}
- override def doCancel(
- cancelRequest: CancelRequest,
- flinkConfig: Configuration): CancelResponse = {
- val response = super.doCancel(cancelRequest, flinkConfig)
+ override def doCancel(cancelRequest: CancelRequest, flinkConf:
Configuration): CancelResponse = {
+ flinkConf
+ .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
+ val response = super.doCancel(cancelRequest, flinkConf)
val clusterClientFactory = new YarnClusterClientFactory
- val clusterDescriptor =
clusterClientFactory.createClusterDescriptor(flinkConfig)
+ val clusterDescriptor =
clusterClientFactory.createClusterDescriptor(flinkConf)
clusterDescriptor.killCluster(ApplicationId.fromString(cancelRequest.clusterId))
response
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 91eb1d92d..edcd6ed75 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -21,7 +21,6 @@ import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
-import org.apache.flink.api.common.JobID
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
import org.apache.flink.configuration._
@@ -36,8 +35,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import java.util
import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
/** Submit Job to YARN Session Cluster */
object YarnSessionClient extends YarnClientTrait {
@@ -135,58 +132,20 @@ object YarnSessionClient extends YarnClientTrait {
}
}
- private[this] def executeClientAction[O, R <: SavepointRequestTrait](
- request: R,
- flinkConfig: Configuration,
- actFunc: (JobID, ClusterClient[_]) => O): O = {
- flinkConfig
- .safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
- .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
- logInfo(s"""
-
|------------------------------------------------------------------
- |Effective submit configuration: $flinkConfig
-
|------------------------------------------------------------------
- |""".stripMargin)
-
- var clusterDescriptor: YarnClusterDescriptor = null
- var client: ClusterClient[ApplicationId] = null
- try {
- val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
- clusterDescriptor = yarnClusterDescriptor._2
- client =
clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient
- actFunc(JobID.fromHexString(request.jobId), client)
- } catch {
- case e: Exception =>
- logError(s"${request.getClass.getSimpleName} for flink yarn session
job fail")
- e.printStackTrace()
- throw e
- } finally {
- Utils.close(client, clusterDescriptor)
- }
- }
-
override def doCancel(
cancelRequest: CancelRequest,
flinkConfig: Configuration): CancelResponse = {
- executeClientAction(
- cancelRequest,
- flinkConfig,
- (jobID, clusterClient) => {
- val actionResult = super.cancelJob(cancelRequest, jobID, clusterClient)
- CancelResponse(actionResult)
- })
+ flinkConfig
+ .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+ super.doCancel(cancelRequest, flinkConfig)
}
override def doTriggerSavepoint(
request: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
- executeClientAction(
- request,
- flinkConfig,
- (jobID, clusterClient) => {
- val actionResult = super.triggerSavepoint(request, jobID,
clusterClient)
- SavepointResponse(actionResult)
- })
+ flinkConfig
+ .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+ super.doTriggerSavepoint(request, flinkConfig)
}
def deploy(deployRequest: DeployRequest): DeployResponse = {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
index 63d5e8c64..2e1945e96 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
@@ -27,7 +27,6 @@ import
org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder
import org.apache.hc.client5.http.fluent.Request
import org.apache.hc.core5.http.ContentType
import org.apache.hc.core5.http.io.entity.StringEntity
-import org.apache.hc.core5.util.Timeout
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 0ef862883..5f536a3e9 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -470,10 +470,10 @@ trait FlinkClientTrait extends Logger {
val savePointDir: String = tryGetSavepointPathIfNeed(cancelRequest)
val clientWrapper = new FlinkClusterClient(client)
+ val withSavepoint = Try(cancelRequest.withSavepoint).getOrElse(false)
+ val withDrain = Try(cancelRequest.withDrain).getOrElse(false)
- (
- Try(cancelRequest.withSavepoint).getOrElse(false),
- Try(cancelRequest.withDrain).getOrElse(false)) match {
+ (withSavepoint, withDrain) match {
case (false, false) =>
client.cancel(jobID).get()
null
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 2dd756b8f..ff65a998e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -85,13 +85,17 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
executeClientAction(
cancelRequest,
flinkConfig,
- (jobId, clusterClient) => {
- val actionResult = super.cancelJob(cancelRequest, jobId, clusterClient)
- CancelResponse(actionResult)
- })
+ (jobId, client) => {
+ val resp = super.cancelJob(cancelRequest, jobId, client)
+ if (cancelRequest.executionMode ==
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
+ client.shutDownCluster()
+ }
+ CancelResponse(resp)
+ }
+ )
}
- private[this] def executeClientAction[O, R <: SavepointRequestTrait](
+ private[client] def executeClientAction[O, R <: SavepointRequestTrait](
request: R,
flinkConfig: Configuration,
actFunc: (JobID, ClusterClient[_]) => O): O = {
@@ -157,8 +161,7 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
def getK8sClusterDescriptor(flinkConfig: Configuration):
KubernetesClusterDescriptor = {
val clientFactory = new KubernetesClusterClientFactory()
- val clusterDescriptor = clientFactory.createClusterDescriptor(flinkConfig)
- clusterDescriptor
+ clientFactory.createClusterDescriptor(flinkConfig)
}
protected def flinkConfIdentifierInfo(@Nonnull conf: Configuration): String =
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 32991b109..4a19067e4 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -77,8 +77,8 @@ trait YarnClientTrait extends FlinkClientTrait {
executeClientAction(
request,
flinkConf,
- (jid, client) => {
- SavepointResponse(super.triggerSavepoint(request, jid, client))
+ (jobID, client) => {
+ SavepointResponse(super.triggerSavepoint(request, jobID, client))
})
}
@@ -86,9 +86,11 @@ trait YarnClientTrait extends FlinkClientTrait {
executeClientAction(
cancelRequest,
flinkConf,
- (jid, client) => {
- CancelResponse(super.cancelJob(cancelRequest, jid, client))
- })
+ (jobId, client) => {
+ val resp = super.cancelJob(cancelRequest, jobId, client)
+ CancelResponse(resp)
+ }
+ )
}
private lazy val deployInternalMethod: Method = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 590cd806d..e28922002 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -68,30 +68,21 @@ object KubernetesDeploymentHelper extends Logger {
}.getOrElse(true)
}
- private[this] def deleteDeployment(nameSpace: String, deploymentName:
String): Boolean = {
+ private[this] def deleteDeployment(nameSpace: String, deploymentName:
String): Unit = {
using(KubernetesRetriever.newK8sClient()) {
client =>
- Try {
- val r = client.apps.deployments
- .inNamespace(nameSpace)
- .withName(deploymentName)
- .delete
- Boolean.unbox(r)
- }.getOrElse(false)
+ val map = client.apps.deployments.inNamespace(nameSpace)
+ map.withLabel("app", deploymentName).delete
+ map.withName(deploymentName).delete
}
}
- private[this] def deleteConfigMap(nameSpace: String, deploymentName:
String): Boolean = {
+ private[this] def deleteConfigMap(nameSpace: String, deploymentName:
String): Unit = {
using(KubernetesRetriever.newK8sClient()) {
client =>
- Try {
- val r = client
- .configMaps()
- .inNamespace(nameSpace)
- .withLabel("app", deploymentName)
- .delete
- Boolean.unbox(r)
- }.getOrElse(false)
+ val map = client.configMaps().inNamespace(nameSpace)
+ map.withLabel("app", deploymentName).delete
+ map.withName(deploymentName).delete
}
}