This is an automated email from the ASF dual-hosted git repository.
linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 373ec18aa [Improve] [Flink-Kubernetes-V2] add observer untrack on
Flink-Kubernetes-V2-Session (#3216)
373ec18aa is described below
commit 373ec18aad3ddd4d79d0ebf55038cefe19b73cb3
Author: caicancai <[email protected]>
AuthorDate: Sun Oct 8 18:43:55 2023 +0800
[Improve] [Flink-Kubernetes-V2] add observer untrack on
Flink-Kubernetes-V2-Session (#3216)
* improve README.md
* [Improve]improve README.md
* [Bug][Flink-Kubernetes-V2] observer.untrack on FLink-Kubernetes-V2-session
* Improve KubernetesSessionClientV2.scala at PR#3216.
---------
Co-authored-by: Linying Assad <[email protected]>
---
.../client/impl/KubernetesSessionClientV2.scala | 27 +++++++++++++++-------
1 file changed, 19 insertions(+), 8 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index 0b0807a55..176c801f5 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -17,17 +17,21 @@
package org.apache.streampark.flink.client.impl
import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps}
import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.kubernetes.v2.model.FlinkSessionJobDef
+import org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ClusterKey
+import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import
org.apache.streampark.flink.kubernetes.v2.operator.OprError.{FlinkResourceNotFound,
UnsupportedAction}
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.configuration._
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+import zio.ZIO
import scala.jdk.CollectionConverters.mapAsScalaMapConverter
import scala.util.{Failure, Success}
@@ -123,13 +127,20 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
val namespace = shutDownRequest.kubernetesDeployParam.kubernetesNamespace
def richMsg: String => String =
s"[flink-shutdown][clusterId=$name][namespace=$namespace] " + _
- FlinkK8sOperator.k8sCrOpr.deleteSessionJob(namespace, name).runIOAsTry
match {
- case Success(_) =>
- logInfo(richMsg("Shutdown Flink cluster successfully."))
- ShutDownResponse()
- case Failure(err) =>
- logError(richMsg(s"Fail to shutdown Flink cluster"), err)
- throw err
+ FlinkK8sObserver.trackedKeys
+ .find {
+ case ClusterKey(_, ns, n) => ns == namespace && n == name
+ case _ => false
+ }
+ .someOrUnitZIO(key => FlinkK8sOperator.delete(key.id))
+ .catchSome {
+ case _: FlinkResourceNotFound => ZIO.unit
+ case _: UnsupportedAction => ZIO.unit
+ }
+ .as(ShutDownResponse())
+ .runIOAsTry match {
+ case Success(result) => logInfo(richMsg("Shutdown Flink cluster
successfully.")); result
+ case Failure(err) => logError(richMsg(s"Fail to shutdown Flink
cluster"), err); throw err
}
}