This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 22f6e1590 [Improve]Ingress uses different versions of API according to
differen… (#2697)
22f6e1590 is described below
commit 22f6e1590e03ead704f5484b7a5a7cb014cb60f3
Author: monster <[email protected]>
AuthorDate: Tue Apr 25 19:00:10 2023 +0800
[Improve]Ingress uses different versions of API according to differen…
(#2697)
* [Improve]Ingress uses different versions of API according to different
K8s versions.
* Fix the review problem
* Complete the dependency package
---
.../org/apache/streampark/common/util/Utils.scala | 10 +
.../core/service/impl/ApplicationServiceImpl.java | 2 +-
.../streampark-flink-kubernetes/pom.xml | 4 -
.../flink/kubernetes/IngressController.scala | 230 ---------------------
.../flink/kubernetes/KubernetesRetriever.scala | 1 +
.../kubernetes/ingress/IngressController.scala | 57 +++++
.../flink/kubernetes/ingress/IngressStrategy.scala | 65 ++++++
.../kubernetes/ingress/IngressStrategyV1.scala | 112 ++++++++++
.../ingress/IngressStrategyV1beta1.scala | 103 +++++++++
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 2 +-
.../flink/kubernetes/FlinkRestJsonTest.scala | 3 +-
.../flink/kubernetes/IngressMetaTestUtil.scala | 68 ++++++
.../flink/kubernetes/PodTemplateParserTest.scala | 1 +
.../impl/FlinkK8sApplicationBuildPipeline.scala | 3 +-
14 files changed, 423 insertions(+), 238 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 71e807062..9d696e545 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -136,6 +136,16 @@ object Utils {
})
}
+ def using[T <: AutoCloseable, V](resource: T)(block: T => V): V = {
+ try {
+ block(resource)
+ } finally {
+ if (resource != null) {
+ resource.close()
+ }
+ }
+ }
+
/**
* calculate the percentage of num1 / num2, the result range from 0 to 100,
with one small digit
* reserve.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 3fbee60fa..2242b897e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -86,8 +86,8 @@ import org.apache.streampark.flink.client.bean.SubmitRequest;
import org.apache.streampark.flink.client.bean.SubmitResponse;
import org.apache.streampark.flink.core.conf.ParameterCli;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
-import org.apache.streampark.flink.kubernetes.IngressController;
import
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
+import org.apache.streampark.flink.kubernetes.ingress.IngressController;
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
diff --git a/streampark-flink/streampark-flink-kubernetes/pom.xml
b/streampark-flink/streampark-flink-kubernetes/pom.xml
index 211c61b14..da594bd1c 100644
--- a/streampark-flink/streampark-flink-kubernetes/pom.xml
+++ b/streampark-flink/streampark-flink-kubernetes/pom.xml
@@ -72,10 +72,6 @@
<artifactId>flink-file-sink-common</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
- <exclusion>
- <artifactId>flink-hadoop-fs</artifactId>
- <groupId>org.apache.flink</groupId>
- </exclusion>
</exclusions>
</dependency>
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
deleted file mode 100644
index fd758f4d3..000000000
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.kubernetes
-
-import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.util.Utils._
-
-import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
-import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
-import org.apache.commons.io.FileUtils
-import org.apache.flink.client.program.ClusterClient
-import org.json4s.{DefaultFormats, JArray}
-import org.json4s.jackson.JsonMethods.parse
-
-import java.io.File
-import java.io.IOException
-import java.nio.file.Files
-import java.nio.file.Paths
-
-import scala.collection.JavaConverters._
-import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
-
-object IngressController extends Logger {
-
- def configureIngress(domainName: String, clusterId: String, nameSpace:
String): Unit = {
- Try(new DefaultKubernetesClient) match {
- case Success(client) =>
- val annotMap = Map[String, String](
- "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
- "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
- "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite
^(/" + clusterId + ")$ $1/ permanent;")
- )
- val labelsMap = Map[String, String](
- "app" -> clusterId,
- "type" -> "flink-native-kubernetes",
- "component" -> "ingress")
-
- val deployment = client
- .apps()
- .deployments()
- .inNamespace(nameSpace)
- .withName(clusterId)
- .get()
-
- val deploymentUid = if (deployment != null) {
- deployment.getMetadata.getUid
- } else {
- throw new RuntimeException(
- s"Deployment with name $clusterId not found in namespace
$nameSpace")
- }
-
- // Create OwnerReference object
- val ownerReference = new OwnerReferenceBuilder()
- .withApiVersion("apps/v1")
- .withKind("Deployment")
- .withName(clusterId)
- .withUid(deploymentUid)
- .withController(true)
- .withBlockOwnerDeletion(true)
- .build()
-
- val ingress = new IngressBuilder()
- .withNewMetadata()
- .withName(clusterId)
- .addToAnnotations(annotMap.asJava)
- .addToLabels(labelsMap.asJava)
- .addToOwnerReferences(ownerReference) // Add OwnerReference
- .endMetadata()
- .withNewSpec()
- .addNewRule()
- .withHost(domainName)
- .withNewHttp()
- .addNewPath()
- .withPath(s"/$nameSpace/$clusterId/")
- .withPathType("ImplementationSpecific")
- .withNewBackend()
- .withNewService()
- .withName(s"$clusterId-rest")
- .withNewPort()
- .withName("rest")
- .endPort()
- .endService()
- .endBackend()
- .endPath()
- .addNewPath()
- .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
- .withPathType("ImplementationSpecific")
- .withNewBackend()
- .withNewService()
- .withName(s"$clusterId-rest")
- .withNewPort()
- .withName("rest")
- .endPort()
- .endService()
- .endBackend()
- .endPath()
- .endHttp()
- .endRule()
- .endSpec()
- .build();
- client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
-
- case _ =>
- }
- }
-
- def configureIngress(ingressOutput: String): Unit = {
- close {
- val client = new DefaultKubernetesClient
- client.network.ingress
- .load(Files.newInputStream(Paths.get(ingressOutput)))
- .get()
- client
- }
- }
-
- private[this] def determineThePodSurvivalStatus(name: String, nameSpace:
String): Boolean = {
- tryWithResource(KubernetesRetriever.newK8sClient()) {
- client =>
- Try {
- client
- .apps()
- .deployments()
- .inNamespace(nameSpace)
- .withName(name)
- .get()
- .getSpec()
- .getSelector()
- .getMatchLabels()
- false
- }.getOrElse(true)
- }
- }
-
- def ingressUrlAddress(
- nameSpace: String,
- clusterId: String,
- clusterClient: ClusterClient[_]): String = {
- val client = new DefaultKubernetesClient
- // for kubernetes 1.22+
- lazy val fromV1 =
-
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
- .map(ingress => ingress.getSpec.getRules.get(0))
- .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- // for kubernetes 1.22-
- lazy val fromV1beta1 =
-
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
- .map(ingress => ingress.getSpec.getRules.get(0))
- .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- Try(
- fromV1
- .orElse(fromV1beta1)
- .map { case (host, path) => s"https://$host$path" }
- .getOrElse(clusterClient.getWebInterfaceURL)
- ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress
error."))
- }
-
- @throws[IOException]
- def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates:
String): String = {
- val workspaceDir = new File(buildWorkspace)
- if (!workspaceDir.exists) workspaceDir.mkdir
- if (ingressTemplates.isEmpty) null;
- else {
- val outputPath = buildWorkspace + "/ingress.yaml"
- val outputFile = new File(outputPath)
- FileUtils.write(outputFile, ingressTemplates, "UTF-8")
- outputPath
- }
- }
-
-}
-
-case class IngressMeta(
- addresses: List[String],
- port: Integer,
- protocol: String,
- serviceName: String,
- ingressName: String,
- hostname: String,
- path: String,
- allNodes: Boolean)
-
-object IngressMeta {
-
- @transient implicit lazy val formats: DefaultFormats.type =
org.json4s.DefaultFormats
-
- def as(json: String): Option[List[IngressMeta]] = {
- Try(parse(json)) match {
- case Success(ok) =>
- ok match {
- case JArray(arr) =>
- val list = arr.map(
- x => {
- IngressMeta(
- addresses =
- (x \
"addresses").extractOpt[List[String]].getOrElse(List.empty[String]),
- port = (x \ "port").extractOpt[Integer].getOrElse(0),
- protocol = (x \
"protocol").extractOpt[String].getOrElse(null),
- serviceName = (x \
"serviceName").extractOpt[String].getOrElse(null),
- ingressName = (x \
"ingressName").extractOpt[String].getOrElse(null),
- hostname = (x \
"hostname").extractOpt[String].getOrElse(null),
- path = (x \ "path").extractOpt[String].getOrElse(null),
- allNodes = (x \
"allNodes").extractOpt[Boolean].getOrElse(false)
- )
- })
- Some(list)
- case _ => None
- }
- case Failure(_) => None
- }
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index b2808fc1c..d4e6ec0ed 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -20,6 +20,7 @@ package org.apache.streampark.flink.kubernetes
import org.apache.streampark.common.util.{Logger, Utils}
import org.apache.streampark.common.util.Utils.tryWithResource
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.kubernetes.model.ClusterKey
import io.fabric8.kubernetes.client.{DefaultKubernetesClient,
KubernetesClient, KubernetesClientException}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
new file mode 100644
index 000000000..c636010b6
--- /dev/null
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.util.Utils.using
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.flink.client.program.ClusterClient
+
+import scala.language.postfixOps
+
+object IngressController extends Logger {
+
+ private lazy val ingressStrategy: IngressStrategy = {
+ using(new DefaultKubernetesClient()) {
+ client =>
+ val versionInfo = client.getVersion
+ val version =
s"${versionInfo.getMajor}.${versionInfo.getMinor}".toDouble
+ if (version >= 1.19) {
+ new IngressStrategyV1()
+ } else {
+ new IngressStrategyV1beta1()
+ }
+ }
+ }
+
+ def configureIngress(domainName: String, clusterId: String, nameSpace:
String): Unit = {
+ ingressStrategy.configureIngress(domainName, clusterId, nameSpace)
+ }
+
+ def ingressUrlAddress(
+ nameSpace: String,
+ clusterId: String,
+ clusterClient: ClusterClient[_]): String = {
+ ingressStrategy.ingressUrlAddress(nameSpace, clusterId, clusterClient)
+ }
+
+ def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates:
String): String = {
+ ingressStrategy.prepareIngressTemplateFiles(buildWorkspace,
ingressTemplates)
+ }
+}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
new file mode 100644
index 000000000..4df560b96
--- /dev/null
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.commons.io.FileUtils
+import org.apache.flink.client.program.ClusterClient
+
+import java.io.File
+
+trait IngressStrategy {
+ def ingressUrlAddress(
+ nameSpace: String,
+ clusterId: String,
+ clusterClient: ClusterClient[_]): String
+ def configureIngress(domainName: String, clusterId: String, nameSpace:
String): Unit
+
+ def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates:
String): String = {
+ val workspaceDir = new File(buildWorkspace)
+ if (!workspaceDir.exists) workspaceDir.mkdir
+ if (ingressTemplates.isEmpty) null
+ else {
+ val outputPath = buildWorkspace + "/ingress.yaml"
+ val outputFile = new File(outputPath)
+ FileUtils.write(outputFile, ingressTemplates, "UTF-8")
+ outputPath
+ }
+ }
+
+ def buildIngressAnnotations(clusterId: String): Map[String, String] = {
+ Map(
+ "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
+ "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
+ "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" +
clusterId + ")$ $1/ permanent;")
+ )
+ }
+
+ def buildIngressLabels(clusterId: String): Map[String, String] = {
+ Map(
+ "app" -> clusterId,
+ "type" -> "flink-native-kubernetes",
+ "component" -> "ingress"
+ )
+ }
+
+ def getDeployment(nameSpace: String, clusterId: String, client:
DefaultKubernetesClient) = {
+
client.apps().deployments().inNamespace(nameSpace).withName(clusterId).get()
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
new file mode 100644
index 000000000..9bd40c2d1
--- /dev/null
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder
+import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.flink.client.program.ClusterClient
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.{Success, Try}
+
+class IngressStrategyV1 extends IngressStrategy {
+
+ override def ingressUrlAddress(
+ nameSpace: String,
+ clusterId: String,
+ clusterClient: ClusterClient[_]): String = {
+ val client = new DefaultKubernetesClient
+ val fromV1 =
+
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+ .map(ingress => ingress.getSpec.getRules.get(0))
+ .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
+ Try(
+ fromV1
+ .map { case (host, path) => s"https://$host$path" }
+ .getOrElse(clusterClient.getWebInterfaceURL)
+ ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress
error."))
+ }
+
+ override def configureIngress(domainName: String, clusterId: String,
nameSpace: String): Unit = {
+ Try(new DefaultKubernetesClient) match {
+ case Success(client) =>
+ val deployment = getDeployment(nameSpace, clusterId, client)
+
+ val deploymentUid = if (deployment != null) {
+ deployment.getMetadata.getUid
+ } else {
+ throw new RuntimeException(
+ s"Deployment with name $clusterId not found in namespace
$nameSpace")
+ }
+
+ // Create OwnerReference object
+ val ownerReference = new OwnerReferenceBuilder()
+ .withApiVersion("apps/v1")
+ .withKind("Deployment")
+ .withName(clusterId)
+ .withUid(deploymentUid)
+ .withController(true)
+ .withBlockOwnerDeletion(true)
+ .build()
+
+ val ingress = new IngressBuilder()
+ .withNewMetadata()
+ .withName(clusterId)
+ .addToAnnotations(buildIngressAnnotations(clusterId).asJava)
+ .addToLabels(buildIngressLabels(clusterId).asJava)
+ .addToOwnerReferences(ownerReference) // Add OwnerReference
+ .endMetadata()
+ .withNewSpec()
+ .addNewRule()
+ .withHost(domainName)
+ .withNewHttp()
+ .addNewPath()
+ .withPath(s"/$nameSpace/$clusterId/")
+ .withPathType("ImplementationSpecific")
+ .withNewBackend()
+ .withNewService()
+ .withName(s"$clusterId-rest")
+ .withNewPort()
+ .withName("rest")
+ .endPort()
+ .endService()
+ .endBackend()
+ .endPath()
+ .addNewPath()
+ .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
+ .withPathType("ImplementationSpecific")
+ .withNewBackend()
+ .withNewService()
+ .withName(s"$clusterId-rest")
+ .withNewPort()
+ .withName("rest")
+ .endPort()
+ .endService()
+ .endBackend()
+ .endPath()
+ .endHttp()
+ .endRule()
+ .endSpec()
+ .build();
+ client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
+ case _ =>
+ }
+ }
+}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
new file mode 100644
index 000000000..f3d5a9189
--- /dev/null
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
+import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.flink.client.program.ClusterClient
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.{Success, Try}
+
+class IngressStrategyV1beta1 extends IngressStrategy {
+
+ override def ingressUrlAddress(
+ nameSpace: String,
+ clusterId: String,
+ clusterClient: ClusterClient[_]): String = {
+ val client = new DefaultKubernetesClient
+ // for kubernetes 1.19-
+ val fromV1beta1 =
+
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+ .map(ingress => ingress.getSpec.getRules.get(0))
+ .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
+ Try(
+ fromV1beta1
+ .map { case (host, path) => s"https://$host$path" }
+ .getOrElse(clusterClient.getWebInterfaceURL)
+ ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress
error."))
+ }
+
+ override def configureIngress(domainName: String, clusterId: String,
nameSpace: String): Unit = {
+ Try(new DefaultKubernetesClient) match {
+ case Success(client) =>
+ val deployment = getDeployment(nameSpace, clusterId, client)
+
+ val deploymentUid = if (deployment != null) {
+ deployment.getMetadata.getUid
+ } else {
+ throw new RuntimeException(
+ s"Deployment with name $clusterId not found in namespace
$nameSpace")
+ }
+
+ // Create OwnerReference object
+ val ownerReference = new OwnerReferenceBuilder()
+ .withApiVersion("apps/v1")
+ .withKind("Deployment")
+ .withName(clusterId)
+ .withUid(deploymentUid)
+ .withController(true)
+ .withBlockOwnerDeletion(true)
+ .build()
+
+ val ingress = new IngressBuilder()
+ .withNewMetadata()
+ .withName(clusterId)
+ .addToAnnotations(buildIngressAnnotations(clusterId).asJava)
+ .addToLabels(buildIngressLabels(clusterId).asJava)
+ .addToOwnerReferences(ownerReference)
+ .endMetadata()
+ .withNewSpec()
+ .addNewRule()
+ .withHost(domainName)
+ .withNewHttp()
+ .addNewPath()
+ .withPath(s"/$nameSpace/$clusterId/")
+ .withNewBackend()
+ .withServiceName(s"$clusterId-rest")
+ .withServicePort(new IntOrString("rest"))
+ .endBackend()
+ .endPath()
+ .addNewPath()
+ .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
+ .withNewBackend()
+ .withServiceName(s"$clusterId-rest")
+ .withServicePort(new IntOrString("rest"))
+ .endBackend()
+ .endPath()
+ .endHttp()
+ .endRule()
+ .endSpec()
+ .build()
+ client.network.ingress.inNamespace(nameSpace).create(ingress)
+ case _ =>
+ }
+ }
+}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 03d68b3d3..55be3f9de 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.kubernetes.watcher
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, IngressController, JobStatusWatcherConfig,
KubernetesRetriever}
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION,
SESSION}
import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 5567cf5e9..24abbcdfb 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.streampark.flink.kubernetes
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
@@ -294,7 +295,7 @@ class FlinkRestJsonTest {
|]
|""".stripMargin
- val ingressMeta = IngressMeta.as(json)
+ val ingressMeta = new IngressMetaTestUtil().IngressMeta.as(json)
println(ingressMeta.get)
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
new file mode 100644
index 000000000..3bd8ff10b
--- /dev/null
+++
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes
+
+import org.json4s.{DefaultFormats, JArray, JsonAST}
+import org.json4s.jackson.JsonMethods.parse
+
+import scala.util.{Failure, Success, Try}
+
+class IngressMetaTestUtil {
+
+ case class IngressMeta(
+ addresses: List[String],
+ port: Integer,
+ protocol: String,
+ serviceName: String,
+ ingressName: String,
+ hostname: String,
+ path: String,
+ allNodes: Boolean)
+
+ object IngressMeta {
+
+ @transient implicit lazy val formats: DefaultFormats.type =
org.json4s.DefaultFormats
+
+ def as(json: String): Option[List[IngressMeta]] = {
+ Try(parse(json)) match {
+ case Success(ok) =>
+ ok match {
+ case JArray(arr) =>
+ val list = arr.map(
+ x => {
+ IngressMeta(
+ addresses =
+ (x \
"addresses").extractOpt[List[String]].getOrElse(List.empty[String]),
+ port = (x \ "port").extractOpt[Integer].getOrElse(0),
+ protocol = (x \ "protocol").extractOpt[String].orNull,
+ serviceName = (x \
"serviceName").extractOpt[String].orNull,
+ ingressName = (x \
"ingressName").extractOpt[String].orNull,
+ hostname = (x \ "hostname").extractOpt[String].orNull,
+ path = (x \ "path").extractOpt[String].orNull,
+ allNodes = (x \
"allNodes").extractOpt[Boolean].getOrElse(false)
+ )
+ })
+ Some(list)
+ case _ => None
+ }
+ case Failure(_) => None
+ }
+ }
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/PodTemplateParserTest.scala
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/PodTemplateParserTest.scala
index 4c38f8d18..e9083db7b 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/PodTemplateParserTest.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/PodTemplateParserTest.scala
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.streampark.flink.kubernetes
import org.junit.jupiter.api.Assertions.assertEquals
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 93b8aab17..fc3fe4738 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -20,7 +20,8 @@ package org.apache.streampark.flink.packer.pipeline.impl
import org.apache.streampark.common.enums.DevelopmentMode
import org.apache.streampark.common.fs.LfsOperator
import org.apache.streampark.common.util.ThreadUtils
-import org.apache.streampark.flink.kubernetes.{IngressController,
PodTemplateTool}
+import org.apache.streampark.flink.kubernetes.PodTemplateTool
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.packer.docker._
import org.apache.streampark.flink.packer.maven.MavenTool
import org.apache.streampark.flink.packer.pipeline._