This is an automated email from the ASF dual-hosted git repository.

linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6fcf868dd [Feature][Flink-K8s-V2] Add 
KubernetesApplicationClient.shutdown on Flink-Kubernetes-V2 (#3186)
6fcf868dd is described below

commit 6fcf868dd3ea90c97b08eee31004e65cd998e4a0
Author: caicancai <[email protected]>
AuthorDate: Sun Oct 8 18:45:03 2023 +0800

    [Feature][Flink-K8s-V2] Add KubernetesApplicationClient.shutdown on 
Flink-Kubernetes-V2 (#3186)
    
    * [Feature] Add Kubernetes native application client.shutdown on 
Flink-Kubernetes-V2
    
    * mvn spotless:apply
    
    * fix bug
    
    * Fix compilation bug
    
    * fix bug
    
    * throw execution
    
    * mvn spotless:apply
    
    * Manually manage cluster cancellation tracking
    
    * chang log
    
    * Imoprove KubernetesApplicationClientV2.scala at PR#3186
    
    ---------
    
    Co-authored-by: Linying Assad <[email protected]>
---
 .../flink/client/FlinkClientEndpoint.scala         | 16 ++++++++---
 .../impl/KubernetesApplicationClientV2.scala       | 31 +++++++++++++++++++++-
 2 files changed, 42 insertions(+), 5 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
index 3cd633968..752bdf045 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
@@ -80,10 +80,18 @@ object FlinkClientEndpoint {
   def shutdown(request: ShutDownRequest): ShutDownResponse = {
     request.executionMode match {
       case YARN_SESSION => YarnSessionClient.shutdown(request)
-      case KUBERNETES_NATIVE_SESSION => {
-        if (K8sFlinkConfig.isV2Enabled) 
KubernetesSessionClientV2.shutdown(request)
-        else KubernetesNativeSessionClient.shutdown(request)
-      }
+      case KUBERNETES_NATIVE_SESSION =>
+        K8sFlinkConfig.isV2Enabled match {
+          case true => KubernetesSessionClientV2.shutdown(request)
+          case _ => KubernetesNativeSessionClient.shutdown(request)
+        }
+      case KUBERNETES_NATIVE_APPLICATION =>
+        K8sFlinkConfig.isV2Enabled match {
+          case true => KubernetesApplicationClientV2.shutdown(request)
+          case _ =>
+            throw new UnsupportedOperationException(
+              s"Unsupported ${request.executionMode} shutdown application ")
+        }
       case _ =>
         throw new UnsupportedOperationException(
           s"Unsupported ${request.executionMode} shutdown cluster ")
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
index a19368acd..db2b3953c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
@@ -18,11 +18,14 @@
 package org.apache.streampark.flink.client.impl
 
 import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps}
 import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, 
JobManagerDef, TaskManagerDef}
+import 
org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ApplicationJobKey
+import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
 import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import 
org.apache.streampark.flink.kubernetes.v2.operator.OprError.{FlinkResourceNotFound,
 UnsupportedAction}
 import org.apache.streampark.flink.packer.pipeline.K8sAppModeBuildResponse
 
 import org.apache.commons.lang3.StringUtils
@@ -31,6 +34,7 @@ import org.apache.flink.configuration._
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
 import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
+import zio.ZIO
 
 import scala.collection.mutable
 import scala.jdk.CollectionConverters.mapAsScalaMapConverter
@@ -241,4 +245,29 @@ object KubernetesApplicationClientV2 extends 
KubernetesClientV2Trait with Logger
       ))
   }
 
+  @throws[Throwable]
+  def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+    val name = shutDownRequest.clusterId
+    val namespace = shutDownRequest.kubernetesDeployParam.kubernetesNamespace
+    def richMsg: String => String = 
s"[flink-shutdown][clusterId=$name][namespace=$namespace] " + _
+
+    FlinkK8sObserver.trackedKeys
+      .find {
+        case ApplicationJobKey(_, ns, n) => ns == namespace && n == name
+        case _ => false
+      }
+      .someOrUnitZIO(key => FlinkK8sOperator.delete(key.id))
+      .catchSome {
+        case _: FlinkResourceNotFound => ZIO.unit
+        case _: UnsupportedAction => ZIO.unit
+      }
+      .as(ShutDownResponse())
+      .runIOAsTry match {
+      case Success(result) =>
+        logInfo(richMsg("Shutdown Flink Application deployment 
successfully.")); result
+      case Failure(err) =>
+        logError(richMsg(s"Fail to shutdown Flink Application deployment"), 
err); throw err
+    }
+  }
+
 }

Reply via email to