This is an automated email from the ASF dual-hosted git repository.

linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 373ec18aa [Improve] [Flink-Kubernetes-V2] add observer untrack on 
Flink-Kubernetes-V2-Session (#3216)
373ec18aa is described below

commit 373ec18aad3ddd4d79d0ebf55038cefe19b73cb3
Author: caicancai <[email protected]>
AuthorDate: Sun Oct 8 18:43:55 2023 +0800

    [Improve] [Flink-Kubernetes-V2] add observer untrack on 
Flink-Kubernetes-V2-Session (#3216)
    
    * improve README.md
    
    * [Improve]improve README.md
    
    * [Bug][Flink-Kubernetes-V2] observer.untrack on FLink-Kubernetes-V2-session
    
    * Improve KubernetesSessionClientV2.scala at PR#3216.
    
    ---------
    
    Co-authored-by: Linying Assad <[email protected]>
---
 .../client/impl/KubernetesSessionClientV2.scala    | 27 +++++++++++++++-------
 1 file changed, 19 insertions(+), 8 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index 0b0807a55..176c801f5 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -17,17 +17,21 @@
 package org.apache.streampark.flink.client.impl
 
 import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps}
 import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.kubernetes.v2.model.FlinkSessionJobDef
+import org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ClusterKey
+import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
 import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import 
org.apache.streampark.flink.kubernetes.v2.operator.OprError.{FlinkResourceNotFound,
 UnsupportedAction}
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.configuration._
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+import zio.ZIO
 
 import scala.jdk.CollectionConverters.mapAsScalaMapConverter
 import scala.util.{Failure, Success}
@@ -123,13 +127,20 @@ object KubernetesSessionClientV2 extends 
KubernetesClientV2Trait with Logger {
     val namespace = shutDownRequest.kubernetesDeployParam.kubernetesNamespace
     def richMsg: String => String = 
s"[flink-shutdown][clusterId=$name][namespace=$namespace] " + _
 
-    FlinkK8sOperator.k8sCrOpr.deleteSessionJob(namespace, name).runIOAsTry 
match {
-      case Success(_) =>
-        logInfo(richMsg("Shutdown Flink cluster successfully."))
-        ShutDownResponse()
-      case Failure(err) =>
-        logError(richMsg(s"Fail to shutdown Flink cluster"), err)
-        throw err
+    FlinkK8sObserver.trackedKeys
+      .find {
+        case ClusterKey(_, ns, n) => ns == namespace && n == name
+        case _ => false
+      }
+      .someOrUnitZIO(key => FlinkK8sOperator.delete(key.id))
+      .catchSome {
+        case _: FlinkResourceNotFound => ZIO.unit
+        case _: UnsupportedAction => ZIO.unit
+      }
+      .as(ShutDownResponse())
+      .runIOAsTry match {
+      case Success(result) => logInfo(richMsg("Shutdown Flink cluster 
successfully.")); result
+      case Failure(err) => logError(richMsg(s"Fail to shutdown Flink 
cluster"), err); throw err
     }
   }
 

Reply via email to