This is an automated email from the ASF dual-hosted git repository.
linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 23b191fdc Added flink k8s operator validate (#2966)
23b191fdc is described below
commit 23b191fdc7af9b2cf29467d4d58c426eae793f69
Author: ChengJie1053 <[email protected]>
AuthorDate: Tue Sep 5 15:29:33 2023 +0800
Added flink k8s operator validate (#2966)
* Added flink k8s operator validate
* Code optimization
* Modify flink k8s operator validate
* Modified CROperator
---
.../flink/kubernetes/v2/observer/DeployCRObserver.scala | 9 ---------
.../flink/kubernetes/v2/operator/CROperator.scala | 15 +++++++++++++++
2 files changed, 15 insertions(+), 9 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/DeployCRObserver.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/DeployCRObserver.scala
index 43904ee1e..accfcb584 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/DeployCRObserver.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/DeployCRObserver.scala
@@ -53,15 +53,6 @@ case class DeployCRObserver(deployCRSnaps:
ConcurrentMap[(Namespace, Name), (Dep
watchers.remove((namespace, name)).unit
}
-// private def existCr(namespace: String, name: String): IO[Throwable,
Boolean] =
-// usingK8sClient { client =>
-// client
-// .resources(classOf[FlinkDeployment])
-// .inNamespace(namespace)
-// .withName(name)
-// .get != null
-// }
-
private def launchProc(namespace: String, name: String):
K8sResourceWatcher[FlinkDeployment] =
watchK8sResourceForever(client =>
client
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
index e87da3da6..3170c3e22 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
@@ -24,6 +24,8 @@ import
org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, Flin
import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
import io.fabric8.kubernetes.api.model._
+import
io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList
+import io.fabric8.kubernetes.client.CustomResource
import org.apache.flink.v1beta1.{FlinkDeployment, FlinkSessionJob}
import zio.{IO, UIO, ZIO}
import zio.stream.ZStream
@@ -61,6 +63,7 @@ object CROperator extends CROperator {
lazy val mirrorSpace = s"${spec.namespace}_${spec.name}"
for {
// Generate FlinkDeployment CR
+ _ <- validateDeployFlinkDeploymentCRD
correctedJob <- mirrorJobJarToHttpFileServer(spec.job,
mirrorSpace)
correctedExtJars <- mirrorExtJarsToHttpFileServer(spec.extJarPaths,
mirrorSpace)
correctedPod <- correctPodSpec(
@@ -249,4 +252,16 @@ object CROperator extends CROperator {
.delete()
} *> ZIO.logInfo(s"Delete FlinkDeployment CR: namespace=$namespace,
name=$name")
+ /** Check whether FlinkDeployment CRD is deployed in the k8s cluster. */
+ def validateDeployFlinkDeploymentCRD = {
+ usingK8sClient { client =>
+ val crds: CustomResourceDefinitionList =
client.apiextensions.v1.customResourceDefinitions.list
+ val flinkDeploymentCRDName: String =
CustomResource.getCRDName(classOf[FlinkDeployment])
+
+ val exists: Boolean = crds.getItems.asScala.exists(crd =>
crd.getMetadata.getName == flinkDeploymentCRDName)
+
+ if (!exists)
+ throw new RuntimeException("The FlinkDeployment CRD is not currently
deployed in the k8s cluster")
+ }
+ }
}