This is an automated email from the ASF dual-hosted git repository.

linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 23b191fdc Added flink k8s operator validate (#2966)
23b191fdc is described below

commit 23b191fdc7af9b2cf29467d4d58c426eae793f69
Author: ChengJie1053 <[email protected]>
AuthorDate: Tue Sep 5 15:29:33 2023 +0800

    Added flink k8s operator validate (#2966)
    
    * Added flink k8s operator validate
    
    * Code optimization
    
    * Modify flink k8s operator validate
    
    * Modified CROperator
---
 .../flink/kubernetes/v2/observer/DeployCRObserver.scala   |  9 ---------
 .../flink/kubernetes/v2/operator/CROperator.scala         | 15 +++++++++++++++
 2 files changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/DeployCRObserver.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/DeployCRObserver.scala
index 43904ee1e..accfcb584 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/DeployCRObserver.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/DeployCRObserver.scala
@@ -53,15 +53,6 @@ case class DeployCRObserver(deployCRSnaps: 
ConcurrentMap[(Namespace, Name), (Dep
         watchers.remove((namespace, name)).unit
       }
 
-//  private def existCr(namespace: String, name: String): IO[Throwable, 
Boolean] =
-//    usingK8sClient { client =>
-//      client
-//        .resources(classOf[FlinkDeployment])
-//        .inNamespace(namespace)
-//        .withName(name)
-//        .get != null
-//    }
-
   private def launchProc(namespace: String, name: String): 
K8sResourceWatcher[FlinkDeployment] =
     watchK8sResourceForever(client =>
       client
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
index e87da3da6..3170c3e22 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
@@ -24,6 +24,8 @@ import 
org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, Flin
 import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
 
 import io.fabric8.kubernetes.api.model._
+import 
io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList
+import io.fabric8.kubernetes.client.CustomResource
 import org.apache.flink.v1beta1.{FlinkDeployment, FlinkSessionJob}
 import zio.{IO, UIO, ZIO}
 import zio.stream.ZStream
@@ -61,6 +63,7 @@ object CROperator extends CROperator {
     lazy val mirrorSpace = s"${spec.namespace}_${spec.name}"
     for {
       // Generate FlinkDeployment CR
+      _                   <- validateDeployFlinkDeploymentCRD
       correctedJob        <- mirrorJobJarToHttpFileServer(spec.job, 
mirrorSpace)
       correctedExtJars    <- mirrorExtJarsToHttpFileServer(spec.extJarPaths, 
mirrorSpace)
       correctedPod        <- correctPodSpec(
@@ -249,4 +252,16 @@ object CROperator extends CROperator {
         .delete()
     } *> ZIO.logInfo(s"Delete FlinkDeployment CR: namespace=$namespace, 
name=$name")
 
+  /** Check whether FlinkDeployment CRD is deployed in the k8s cluster. */
+  def validateDeployFlinkDeploymentCRD = {
+    usingK8sClient { client =>
+      val crds: CustomResourceDefinitionList = 
client.apiextensions.v1.customResourceDefinitions.list
+      val flinkDeploymentCRDName: String     = 
CustomResource.getCRDName(classOf[FlinkDeployment])
+
+      val exists: Boolean = crds.getItems.asScala.exists(crd => 
crd.getMetadata.getName == flinkDeploymentCRDName)
+
+      if (!exists)
+        throw new RuntimeException("The FlinkDeployment CRD is not currently 
deployed in the k8s cluster")
+    }
+  }
 }

Reply via email to