This is an automated email from the ASF dual-hosted git repository. monster pushed a commit to branch ingressv1 in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit f6e7b6b9257039f47251e5008ad2e560fe0e4af8 Author: monster <[email protected]> AuthorDate: Tue Apr 25 13:10:41 2023 +0800 [Improve]Ingress uses different versions of API according to different K8s versions. --- .../core/service/impl/ApplicationServiceImpl.java | 2 +- .../flink/kubernetes/IngressController.scala | 230 --------------------- .../flink/kubernetes/KubernetesRetriever.scala | 1 + .../kubernetes/ingress/BaseIngressStrategy.scala | 123 +++++++++++ .../kubernetes/ingress/IngressController.scala | 55 +++++ .../flink/kubernetes/ingress/IngressStrategy.scala | 30 +++ .../kubernetes/ingress/IngressStrategyV1.scala | 98 +++++++++ .../ingress/IngressStrategyV1beta1.scala | 92 +++++++++ .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 3 +- .../flink/kubernetes/FlinkRestJsonTest.scala | 3 +- .../impl/FlinkK8sApplicationBuildPipeline.scala | 3 +- 11 files changed, 406 insertions(+), 234 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 3fbee60fa..2242b897e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -86,8 +86,8 @@ import org.apache.streampark.flink.client.bean.SubmitRequest; import org.apache.streampark.flink.client.bean.SubmitResponse; import org.apache.streampark.flink.core.conf.ParameterCli; import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher; -import org.apache.streampark.flink.kubernetes.IngressController; import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper; +import org.apache.streampark.flink.kubernetes.ingress.IngressController; import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV; import org.apache.streampark.flink.kubernetes.model.TrackId; import org.apache.streampark.flink.packer.pipeline.BuildResult; diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala deleted file mode 100644 index fd758f4d3..000000000 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.flink.kubernetes - -import org.apache.streampark.common.util.Logger -import org.apache.streampark.common.util.Utils._ - -import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder} -import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder -import io.fabric8.kubernetes.client.DefaultKubernetesClient -import org.apache.commons.io.FileUtils -import org.apache.flink.client.program.ClusterClient -import org.json4s.{DefaultFormats, JArray} -import org.json4s.jackson.JsonMethods.parse - -import java.io.File -import java.io.IOException -import java.nio.file.Files -import java.nio.file.Paths - -import scala.collection.JavaConverters._ -import scala.language.postfixOps -import scala.util.{Failure, Success, Try} - -object IngressController extends Logger { - - def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { - Try(new DefaultKubernetesClient) match { - case Success(client) => - val annotMap = Map[String, String]( - "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2", - "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m", - "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + clusterId + ")$ $1/ permanent;") - ) - val labelsMap = Map[String, String]( - "app" -> clusterId, - "type" -> "flink-native-kubernetes", - "component" -> "ingress") - - val deployment = client - .apps() - .deployments() - .inNamespace(nameSpace) - .withName(clusterId) - .get() - - val deploymentUid = if (deployment != null) { - deployment.getMetadata.getUid - } else { - throw new RuntimeException( - s"Deployment with name $clusterId not found in namespace $nameSpace") - } - - // Create OwnerReference object - val ownerReference = new OwnerReferenceBuilder() - .withApiVersion("apps/v1") - .withKind("Deployment") - .withName(clusterId) - .withUid(deploymentUid) - .withController(true) - .withBlockOwnerDeletion(true) - .build() - - val ingress = new IngressBuilder() - .withNewMetadata() - .withName(clusterId) - .addToAnnotations(annotMap.asJava) - .addToLabels(labelsMap.asJava) - .addToOwnerReferences(ownerReference) // Add OwnerReference - .endMetadata() - .withNewSpec() - .addNewRule() - .withHost(domainName) - .withNewHttp() - .addNewPath() - .withPath(s"/$nameSpace/$clusterId/") - .withPathType("ImplementationSpecific") - .withNewBackend() - .withNewService() - .withName(s"$clusterId-rest") - .withNewPort() - .withName("rest") - .endPort() - .endService() - .endBackend() - .endPath() - .addNewPath() - .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)") - .withPathType("ImplementationSpecific") - .withNewBackend() - .withNewService() - .withName(s"$clusterId-rest") - .withNewPort() - .withName("rest") - .endPort() - .endService() - .endBackend() - .endPath() - .endHttp() - .endRule() - .endSpec() - .build(); - client.network.v1.ingresses().inNamespace(nameSpace).create(ingress) - - case _ => - } - } - - def configureIngress(ingressOutput: String): Unit = { - close { - val client = new DefaultKubernetesClient - client.network.ingress - .load(Files.newInputStream(Paths.get(ingressOutput))) - .get() - client - } - } - - private[this] def determineThePodSurvivalStatus(name: String, nameSpace: String): Boolean = { - tryWithResource(KubernetesRetriever.newK8sClient()) { - client => - Try { - client - .apps() - .deployments() - .inNamespace(nameSpace) - .withName(name) - .get() - .getSpec() - .getSelector() - .getMatchLabels() - false - }.getOrElse(true) - } - } - - def ingressUrlAddress( - nameSpace: String, - clusterId: String, - clusterClient: ClusterClient[_]): String = { - val client = new DefaultKubernetesClient - // for kubernetes 1.22+ - lazy val fromV1 = - Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get) - .map(ingress => ingress.getSpec.getRules.get(0)) - .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath) - // for kubernetes 1.22- - lazy val fromV1beta1 = - Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get) - .map(ingress => ingress.getSpec.getRules.get(0)) - .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath) - Try( - fromV1 - .orElse(fromV1beta1) - .map { case (host, path) => s"https://$host$path" } - .getOrElse(clusterClient.getWebInterfaceURL) - ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress error.")) - } - - @throws[IOException] - def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String = { - val workspaceDir = new File(buildWorkspace) - if (!workspaceDir.exists) workspaceDir.mkdir - if (ingressTemplates.isEmpty) null; - else { - val outputPath = buildWorkspace + "/ingress.yaml" - val outputFile = new File(outputPath) - FileUtils.write(outputFile, ingressTemplates, "UTF-8") - outputPath - } - } - -} - -case class IngressMeta( - addresses: List[String], - port: Integer, - protocol: String, - serviceName: String, - ingressName: String, - hostname: String, - path: String, - allNodes: Boolean) - -object IngressMeta { - - @transient implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - - def as(json: String): Option[List[IngressMeta]] = { - Try(parse(json)) match { - case Success(ok) => - ok match { - case JArray(arr) => - val list = arr.map( - x => { - IngressMeta( - addresses = - (x \ "addresses").extractOpt[List[String]].getOrElse(List.empty[String]), - port = (x \ "port").extractOpt[Integer].getOrElse(0), - protocol = (x \ "protocol").extractOpt[String].getOrElse(null), - serviceName = (x \ "serviceName").extractOpt[String].getOrElse(null), - ingressName = (x \ "ingressName").extractOpt[String].getOrElse(null), - hostname = (x \ "hostname").extractOpt[String].getOrElse(null), - path = (x \ "path").extractOpt[String].getOrElse(null), - allNodes = (x \ "allNodes").extractOpt[Boolean].getOrElse(false) - ) - }) - Some(list) - case _ => None - } - case Failure(_) => None - } - } - -} diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala index b2808fc1c..d4e6ec0ed 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala @@ -20,6 +20,7 @@ package org.apache.streampark.flink.kubernetes import org.apache.streampark.common.util.{Logger, Utils} import org.apache.streampark.common.util.Utils.tryWithResource import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode +import org.apache.streampark.flink.kubernetes.ingress.IngressController import org.apache.streampark.flink.kubernetes.model.ClusterKey import io.fabric8.kubernetes.client.{DefaultKubernetesClient, KubernetesClient, KubernetesClientException} diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/BaseIngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/BaseIngressStrategy.scala new file mode 100644 index 000000000..0a0e456d0 --- /dev/null +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/BaseIngressStrategy.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.kubernetes.ingress + +import org.apache.streampark.common.util.Utils._ + +import io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.apache.commons.io.FileUtils +import org.apache.flink.client.program.ClusterClient +import org.json4s.{DefaultFormats, JArray} +import org.json4s.jackson.JsonMethods.parse + +import java.io.File +import java.nio.file.{Files, Paths} + +import scala.language.postfixOps +import scala.util.{Failure, Success, Try} + +abstract class BaseIngressStrategy extends IngressStrategy { + override def ingressUrlAddress( + nameSpace: String, + clusterId: String, + clusterClient: ClusterClient[_]): String = { + val client = new DefaultKubernetesClient + // for kubernetes 1.19+ + lazy val fromV1 = + Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get) + .map(ingress => ingress.getSpec.getRules.get(0)) + .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath) + // for kubernetes 1.19- + lazy val fromV1beta1 = + Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get) + .map(ingress => ingress.getSpec.getRules.get(0)) + .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath) + Try( + fromV1 + .orElse(fromV1beta1) + .map { case (host, path) => s"https://$host$path" } + .getOrElse(clusterClient.getWebInterfaceURL) + ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress error.")) + } + + override def prepareIngressTemplateFiles( + buildWorkspace: String, + ingressTemplates: String): String = { + val workspaceDir = new File(buildWorkspace) + if (!workspaceDir.exists) workspaceDir.mkdir + if (ingressTemplates.isEmpty) null + else { + val outputPath = buildWorkspace + "/ingress.yaml" + val outputFile = new File(outputPath) + FileUtils.write(outputFile, ingressTemplates, "UTF-8") + outputPath + } + } + + def configureIngress(ingressOutput: String): Unit = { + close { + val client = new DefaultKubernetesClient + client.network.ingress + .load(Files.newInputStream(Paths.get(ingressOutput))) + .get() + client + } + } + + case class IngressMeta( + addresses: List[String], + port: Integer, + protocol: String, + serviceName: String, + ingressName: String, + hostname: String, + path: String, + allNodes: Boolean) + + object IngressMeta { + + @transient implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + + def as(json: String): Option[List[IngressMeta]] = { + Try(parse(json)) match { + case Success(ok) => + ok match { + case JArray(arr) => + val list = arr.map( + x => { + IngressMeta( + addresses = + (x \ "addresses").extractOpt[List[String]].getOrElse(List.empty[String]), + port = (x \ "port").extractOpt[Integer].getOrElse(0), + protocol = (x \ "protocol").extractOpt[String].orNull, + serviceName = (x \ "serviceName").extractOpt[String].orNull, + ingressName = (x \ "ingressName").extractOpt[String].orNull, + hostname = (x \ "hostname").extractOpt[String].orNull, + path = (x \ "path").extractOpt[String].orNull, + allNodes = (x \ "allNodes").extractOpt[Boolean].getOrElse(false) + ) + }) + Some(list) + case _ => None + } + case Failure(_) => None + } + } + + } +} diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala new file mode 100644 index 000000000..eca3d566b --- /dev/null +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.kubernetes.ingress + +import org.apache.streampark.common.util.Logger + +import io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.apache.flink.client.program.ClusterClient + +import scala.language.postfixOps + +object IngressController extends Logger { + + val ingressStrategy: IngressStrategy = { + if (getKubernetesVersion() >= 1.19) new IngressStrategyV1() + else new IngressStrategyV1beta1() + } + + def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { + ingressStrategy.configureIngress(domainName, clusterId, nameSpace) + } + + def ingressUrlAddress( + nameSpace: String, + clusterId: String, + clusterClient: ClusterClient[_]): String = { + ingressStrategy.ingressUrlAddress(nameSpace, clusterId, clusterClient) + } + + def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String = { + ingressStrategy.prepareIngressTemplateFiles(buildWorkspace, ingressTemplates) + } + + def getKubernetesVersion(): Double = { + val client = new DefaultKubernetesClient() + val versionInfo = client.getVersion + val version = versionInfo.getMajor.toDouble + versionInfo.getMinor.toDouble / 10 + version + } +} diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala new file mode 100644 index 000000000..7137f3c07 --- /dev/null +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.kubernetes.ingress + +import org.apache.flink.client.program.ClusterClient + +trait IngressStrategy { + def ingressUrlAddress( + nameSpace: String, + clusterId: String, + clusterClient: ClusterClient[_]): String + def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String + + def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit +} diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala new file mode 100644 index 000000000..4bff48fc9 --- /dev/null +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.kubernetes.ingress + +import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder} +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import scala.collection.JavaConverters._ +import scala.language.postfixOps +import scala.util.{Success, Try} + +class IngressStrategyV1 extends BaseIngressStrategy { + override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { + Try(new DefaultKubernetesClient) match { + case Success(client) => + val annotMap = Map[String, String]( + "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2", + "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m", + "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + clusterId + ")$ $1/ permanent;") + ) + val labelsMap = Map[String, String]( + "app" -> clusterId, + "type" -> "flink-native-kubernetes", + "component" -> "ingress") + + val deployment = client + .apps() + .deployments() + .inNamespace(nameSpace) + .withName(clusterId) + .get() + + val deploymentUid = if (deployment != null) { + deployment.getMetadata.getUid + } else { + throw new RuntimeException( + s"Deployment with name $clusterId not found in namespace $nameSpace") + } + + // Create OwnerReference object + val ownerReference = new OwnerReferenceBuilder() + .withApiVersion("apps/v1") + .withKind("Deployment") + .withName(clusterId) + .withUid(deploymentUid) + .withController(true) + .withBlockOwnerDeletion(true) + .build() + + val ingress = new io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder() + .withNewMetadata() + .withName(clusterId) + .addToAnnotations(annotMap.asJava) + .addToLabels(labelsMap.asJava) + .addToOwnerReferences(ownerReference) + .endMetadata() + .withNewSpec() + .addNewRule() + .withHost(domainName) + .withNewHttp() + .addNewPath() + .withPath(s"/$nameSpace/$clusterId/") + .withNewBackend() + .withServiceName(s"$clusterId-rest") + .withServicePort(new IntOrString("rest")) + .endBackend() + .endPath() + .addNewPath() + .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)") + .withNewBackend() + .withServiceName(s"$clusterId-rest") + .withServicePort(new IntOrString("rest")) + .endBackend() + .endPath() + .endHttp() + .endRule() + .endSpec() + .build() + client.network.ingress.inNamespace(nameSpace).create(ingress) + case _ => + } + } +} diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala new file mode 100644 index 000000000..8a1fabf62 --- /dev/null +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala @@ -0,0 +1,92 @@ +package org.apache.streampark.flink.kubernetes.ingress + +import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import scala.collection.JavaConverters._ +import scala.language.postfixOps +import scala.util.{Success, Try} + +class IngressStrategyV1beta1 extends BaseIngressStrategy { + override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { + Try(new DefaultKubernetesClient) match { + case Success(client) => + val annotMap = Map[String, String]( + "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2", + "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m", + "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + clusterId + ")$ $1/ permanent;") + ) + val labelsMap = Map[String, String]( + "app" -> clusterId, + "type" -> "flink-native-kubernetes", + "component" -> "ingress") + + val deployment = client + .apps() + .deployments() + .inNamespace(nameSpace) + .withName(clusterId) + .get() + + val deploymentUid = if (deployment != null) { + deployment.getMetadata.getUid + } else { + throw new RuntimeException( + s"Deployment with name $clusterId not found in namespace $nameSpace") + } + + // Create OwnerReference object + val ownerReference = new OwnerReferenceBuilder() + .withApiVersion("apps/v1") + .withKind("Deployment") + .withName(clusterId) + .withUid(deploymentUid) + .withController(true) + .withBlockOwnerDeletion(true) + .build() + + val ingress = new io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder() + .withNewMetadata() + .withName(clusterId) + .addToAnnotations(annotMap.asJava) + .addToLabels(labelsMap.asJava) + .addToOwnerReferences(ownerReference) + .endMetadata() + .withNewSpec() + .addNewRule() + .withHost(domainName) + .withNewHttp() + .addNewPath() + .withPath(s"/$nameSpace/$clusterId/") + .withPathType("ImplementationSpecific") + .withNewBackend() + .withNewService() + .withName(s"$clusterId-rest") + .withNewPort() + .withName("rest") + .endPort() + .endService() + .endBackend() + .endPath() + .addNewPath() + .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)") + .withPathType("ImplementationSpecific") + .withNewBackend() + .withNewService() + .withName(s"$clusterId-rest") + .withNewPort() + .withName("rest") + .endPort() + .endService() + .endBackend() + .endPath() + .endHttp() + .endRule() + .endSpec() + .build() + client.network.v1.ingresses().inNamespace(nameSpace).create(ingress) + + case _ => + } + } +} diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala index 03d68b3d3..2dc756a0d 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala @@ -19,11 +19,12 @@ package org.apache.streampark.flink.kubernetes.watcher import org.apache.streampark.common.conf.Workspace import org.apache.streampark.common.util.Logger -import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, IngressController, JobStatusWatcherConfig, KubernetesRetriever} +import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever} import org.apache.streampark.flink.kubernetes.enums.FlinkJobState import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION} import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper +import org.apache.streampark.flink.kubernetes.ingress.IngressController import org.apache.streampark.flink.kubernetes.model._ import com.google.common.base.Charsets diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala index 5567cf5e9..5539222fe 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala @@ -17,6 +17,7 @@ package org.apache.streampark.flink.kubernetes import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper +import org.apache.streampark.flink.kubernetes.ingress.{IngressStrategyV1, IngressStrategyV1beta1} import org.apache.streampark.flink.kubernetes.watcher.{Checkpoint, FlinkRestJmConfigItem, FlinkRestOverview, JobDetails} import com.google.common.base.Charsets @@ -294,7 +295,7 @@ class FlinkRestJsonTest { |] |""".stripMargin - val ingressMeta = IngressMeta.as(json) + val ingressMeta = new IngressStrategyV1().IngressMeta.as(json) println(ingressMeta.get) } diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala index 93b8aab17..fc3fe4738 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala @@ -20,7 +20,8 @@ package org.apache.streampark.flink.packer.pipeline.impl import org.apache.streampark.common.enums.DevelopmentMode import org.apache.streampark.common.fs.LfsOperator import org.apache.streampark.common.util.ThreadUtils -import org.apache.streampark.flink.kubernetes.{IngressController, PodTemplateTool} +import org.apache.streampark.flink.kubernetes.PodTemplateTool +import org.apache.streampark.flink.kubernetes.ingress.IngressController import org.apache.streampark.flink.packer.docker._ import org.apache.streampark.flink.packer.maven.MavenTool import org.apache.streampark.flink.packer.pipeline._
