This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new ae08fae52 [Enhance] Optimize the validation logic of flinkDeploymentCR 
at PR#2966. (#3032)
ae08fae52 is described below

commit ae08fae52b001059b31ecde397f5a74391915ebb
Author: Linying Assad <[email protected]>
AuthorDate: Thu Sep 7 06:40:05 2023 -0500

    [Enhance] Optimize the validation logic of flinkDeploymentCR at PR#2966. 
(#3032)
---
 .../flink/kubernetes/v2/operator/CROperator.scala         | 15 ++++++---------
 .../flink/kubernetes/v2/operator/OprError.scala           |  2 ++
 2 files changed, 8 insertions(+), 9 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
index 3170c3e22..b57d7f478 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
@@ -22,6 +22,7 @@ import 
org.apache.streampark.flink.kubernetes.v2.K8sTools.usingK8sClient
 import org.apache.streampark.flink.kubernetes.v2.fs.FileMirror
 import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, 
FlinkSessionJobDef, JobDef}
 import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
+import 
org.apache.streampark.flink.kubernetes.v2.operator.OprError.FlinkDeploymentCRDNotFound
 
 import io.fabric8.kubernetes.api.model._
 import 
io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList
@@ -62,8 +63,8 @@ object CROperator extends CROperator {
   override def applyDeployment(spec: FlinkDeploymentDef): IO[Throwable, Unit] 
= {
     lazy val mirrorSpace = s"${spec.namespace}_${spec.name}"
     for {
+      _                   <- 
ZIO.fail(FlinkDeploymentCRDNotFound()).unlessZIO(existFlinkDeploymentCRD)
       // Generate FlinkDeployment CR
-      _                   <- validateDeployFlinkDeploymentCRD
       correctedJob        <- mirrorJobJarToHttpFileServer(spec.job, 
mirrorSpace)
       correctedExtJars    <- mirrorExtJarsToHttpFileServer(spec.extJarPaths, 
mirrorSpace)
       correctedPod        <- correctPodSpec(
@@ -253,15 +254,11 @@ object CROperator extends CROperator {
     } *> ZIO.logInfo(s"Delete FlinkDeployment CR: namespace=$namespace, 
name=$name")
 
   /** Check whether FlinkDeployment CRD is deployed in the k8s cluster. */
-  def validateDeployFlinkDeploymentCRD = {
+  private def existFlinkDeploymentCRD: IO[Throwable, Boolean] = {
     usingK8sClient { client =>
-      val crds: CustomResourceDefinitionList = 
client.apiextensions.v1.customResourceDefinitions.list
-      val flinkDeploymentCRDName: String     = 
CustomResource.getCRDName(classOf[FlinkDeployment])
-
-      val exists: Boolean = crds.getItems.asScala.exists(crd => 
crd.getMetadata.getName == flinkDeploymentCRDName)
-
-      if (!exists)
-        throw new RuntimeException("The FlinkDeployment CRD is not currently 
deployed in the k8s cluster")
+      val crds          = 
client.apiextensions.v1.customResourceDefinitions.list
+      val deployCRDName = CustomResource.getCRDName(classOf[FlinkDeployment])
+      crds.getItems.asScala.exists(crd => crd.getMetadata.getName == 
deployCRDName)
     }
   }
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
index daf8b8a03..5225905f4 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
@@ -28,4 +28,6 @@ object OprError {
   case class FlinkRestEndpointNotFound(namespace: String, name: String)
     extends Exception(s"Flink cluster rest endpoint not found: 
namespace=$namespace, name=$name")
 
+  case class FlinkDeploymentCRDNotFound()
+    extends Exception("The FlinkDeployment CRD is not currently deployed in 
the kubernetes cluster")
 }

Reply via email to