This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new ae08fae52 [Enhance] Optimize the validation logic of flinkDeploymentCR
at PR#2966. (#3032)
ae08fae52 is described below
commit ae08fae52b001059b31ecde397f5a74391915ebb
Author: Linying Assad <[email protected]>
AuthorDate: Thu Sep 7 06:40:05 2023 -0500
[Enhance] Optimize the validation logic of flinkDeploymentCR at PR#2966.
(#3032)
---
.../flink/kubernetes/v2/operator/CROperator.scala | 15 ++++++---------
.../flink/kubernetes/v2/operator/OprError.scala | 2 ++
2 files changed, 8 insertions(+), 9 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
index 3170c3e22..b57d7f478 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
@@ -22,6 +22,7 @@ import
org.apache.streampark.flink.kubernetes.v2.K8sTools.usingK8sClient
import org.apache.streampark.flink.kubernetes.v2.fs.FileMirror
import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef,
FlinkSessionJobDef, JobDef}
import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
+import
org.apache.streampark.flink.kubernetes.v2.operator.OprError.FlinkDeploymentCRDNotFound
import io.fabric8.kubernetes.api.model._
import
io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList
@@ -62,8 +63,8 @@ object CROperator extends CROperator {
override def applyDeployment(spec: FlinkDeploymentDef): IO[Throwable, Unit]
= {
lazy val mirrorSpace = s"${spec.namespace}_${spec.name}"
for {
+ _ <-
ZIO.fail(FlinkDeploymentCRDNotFound()).unlessZIO(existFlinkDeploymentCRD)
// Generate FlinkDeployment CR
- _ <- validateDeployFlinkDeploymentCRD
correctedJob <- mirrorJobJarToHttpFileServer(spec.job,
mirrorSpace)
correctedExtJars <- mirrorExtJarsToHttpFileServer(spec.extJarPaths,
mirrorSpace)
correctedPod <- correctPodSpec(
@@ -253,15 +254,11 @@ object CROperator extends CROperator {
} *> ZIO.logInfo(s"Delete FlinkDeployment CR: namespace=$namespace,
name=$name")
/** Check whether FlinkDeployment CRD is deployed in the k8s cluster. */
- def validateDeployFlinkDeploymentCRD = {
+ private def existFlinkDeploymentCRD: IO[Throwable, Boolean] = {
usingK8sClient { client =>
- val crds: CustomResourceDefinitionList =
client.apiextensions.v1.customResourceDefinitions.list
- val flinkDeploymentCRDName: String =
CustomResource.getCRDName(classOf[FlinkDeployment])
-
- val exists: Boolean = crds.getItems.asScala.exists(crd =>
crd.getMetadata.getName == flinkDeploymentCRDName)
-
- if (!exists)
- throw new RuntimeException("The FlinkDeployment CRD is not currently
deployed in the k8s cluster")
+ val crds =
client.apiextensions.v1.customResourceDefinitions.list
+ val deployCRDName = CustomResource.getCRDName(classOf[FlinkDeployment])
+ crds.getItems.asScala.exists(crd => crd.getMetadata.getName ==
deployCRDName)
}
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
index daf8b8a03..5225905f4 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
@@ -28,4 +28,6 @@ object OprError {
case class FlinkRestEndpointNotFound(namespace: String, name: String)
extends Exception(s"Flink cluster rest endpoint not found:
namespace=$namespace, name=$name")
+ case class FlinkDeploymentCRDNotFound()
+ extends Exception("The FlinkDeployment CRD is not currently deployed in
the kubernetes cluster")
}