This is an automated email from the ASF dual-hosted git repository.
linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 6fcf868dd [Feature][Flink-K8s-V2] Add
KubernetesApplicationClient.shutdown on Flink-Kubernetes-V2 (#3186)
6fcf868dd is described below
commit 6fcf868dd3ea90c97b08eee31004e65cd998e4a0
Author: caicancai <[email protected]>
AuthorDate: Sun Oct 8 18:45:03 2023 +0800
[Feature][Flink-K8s-V2] Add KubernetesApplicationClient.shutdown on
Flink-Kubernetes-V2 (#3186)
* [Feature] Add Kubernetes native application client.shutdown on
Flink-Kubernetes-V2
* mvn spotless:apply
* fix bug
* Fix compilation bug
* fix bug
* throw execution
* mvn spotless:apply
* Manually manage cluster cancellation tracking
* chang log
* Imoprove KubernetesApplicationClientV2.scala at PR#3186
---------
Co-authored-by: Linying Assad <[email protected]>
---
.../flink/client/FlinkClientEndpoint.scala | 16 ++++++++---
.../impl/KubernetesApplicationClientV2.scala | 31 +++++++++++++++++++++-
2 files changed, 42 insertions(+), 5 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
index 3cd633968..752bdf045 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientEndpoint.scala
@@ -80,10 +80,18 @@ object FlinkClientEndpoint {
def shutdown(request: ShutDownRequest): ShutDownResponse = {
request.executionMode match {
case YARN_SESSION => YarnSessionClient.shutdown(request)
- case KUBERNETES_NATIVE_SESSION => {
- if (K8sFlinkConfig.isV2Enabled)
KubernetesSessionClientV2.shutdown(request)
- else KubernetesNativeSessionClient.shutdown(request)
- }
+ case KUBERNETES_NATIVE_SESSION =>
+ K8sFlinkConfig.isV2Enabled match {
+ case true => KubernetesSessionClientV2.shutdown(request)
+ case _ => KubernetesNativeSessionClient.shutdown(request)
+ }
+ case KUBERNETES_NATIVE_APPLICATION =>
+ K8sFlinkConfig.isV2Enabled match {
+ case true => KubernetesApplicationClientV2.shutdown(request)
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Unsupported ${request.executionMode} shutdown application ")
+ }
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ${request.executionMode} shutdown cluster ")
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
index a19368acd..db2b3953c 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
@@ -18,11 +18,14 @@
package org.apache.streampark.flink.client.impl
import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps}
import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef,
JobManagerDef, TaskManagerDef}
+import
org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ApplicationJobKey
+import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import
org.apache.streampark.flink.kubernetes.v2.operator.OprError.{FlinkResourceNotFound,
UnsupportedAction}
import org.apache.streampark.flink.packer.pipeline.K8sAppModeBuildResponse
import org.apache.commons.lang3.StringUtils
@@ -31,6 +34,7 @@ import org.apache.flink.configuration._
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
+import zio.ZIO
import scala.collection.mutable
import scala.jdk.CollectionConverters.mapAsScalaMapConverter
@@ -241,4 +245,29 @@ object KubernetesApplicationClientV2 extends
KubernetesClientV2Trait with Logger
))
}
+ @throws[Throwable]
+ def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+ val name = shutDownRequest.clusterId
+ val namespace = shutDownRequest.kubernetesDeployParam.kubernetesNamespace
+ def richMsg: String => String =
s"[flink-shutdown][clusterId=$name][namespace=$namespace] " + _
+
+ FlinkK8sObserver.trackedKeys
+ .find {
+ case ApplicationJobKey(_, ns, n) => ns == namespace && n == name
+ case _ => false
+ }
+ .someOrUnitZIO(key => FlinkK8sOperator.delete(key.id))
+ .catchSome {
+ case _: FlinkResourceNotFound => ZIO.unit
+ case _: UnsupportedAction => ZIO.unit
+ }
+ .as(ShutDownResponse())
+ .runIOAsTry match {
+ case Success(result) =>
+ logInfo(richMsg("Shutdown Flink Application deployment
successfully.")); result
+ case Failure(err) =>
+ logError(richMsg(s"Fail to shutdown Flink Application deployment"),
err); throw err
+ }
+ }
+
}