This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 22f6e1590 [Improve]Ingress uses different versions of API according to 
differen… (#2697)
22f6e1590 is described below

commit 22f6e1590e03ead704f5484b7a5a7cb014cb60f3
Author: monster <[email protected]>
AuthorDate: Tue Apr 25 19:00:10 2023 +0800

    [Improve]Ingress uses different versions of API according to differen… 
(#2697)
    
    * [Improve]Ingress uses different versions of API according to different 
K8s versions.
    
    * Fix the review problem
    
    * Complete the dependency package
---
 .../org/apache/streampark/common/util/Utils.scala  |  10 +
 .../core/service/impl/ApplicationServiceImpl.java  |   2 +-
 .../streampark-flink-kubernetes/pom.xml            |   4 -
 .../flink/kubernetes/IngressController.scala       | 230 ---------------------
 .../flink/kubernetes/KubernetesRetriever.scala     |   1 +
 .../kubernetes/ingress/IngressController.scala     |  57 +++++
 .../flink/kubernetes/ingress/IngressStrategy.scala |  65 ++++++
 .../kubernetes/ingress/IngressStrategyV1.scala     | 112 ++++++++++
 .../ingress/IngressStrategyV1beta1.scala           | 103 +++++++++
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala |   2 +-
 .../flink/kubernetes/FlinkRestJsonTest.scala       |   3 +-
 .../flink/kubernetes/IngressMetaTestUtil.scala     |  68 ++++++
 .../flink/kubernetes/PodTemplateParserTest.scala   |   1 +
 .../impl/FlinkK8sApplicationBuildPipeline.scala    |   3 +-
 14 files changed, 423 insertions(+), 238 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 71e807062..9d696e545 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -136,6 +136,16 @@ object Utils {
       })
   }
 
+  def using[T <: AutoCloseable, V](resource: T)(block: T => V): V = {
+    try {
+      block(resource)
+    } finally {
+      if (resource != null) {
+        resource.close()
+      }
+    }
+  }
+
   /**
    * calculate the percentage of num1 / num2, the result range from 0 to 100, 
with one small digit
    * reserve.
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 3fbee60fa..2242b897e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -86,8 +86,8 @@ import org.apache.streampark.flink.client.bean.SubmitRequest;
 import org.apache.streampark.flink.client.bean.SubmitResponse;
 import org.apache.streampark.flink.core.conf.ParameterCli;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
-import org.apache.streampark.flink.kubernetes.IngressController;
 import 
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
+import org.apache.streampark.flink.kubernetes.ingress.IngressController;
 import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
diff --git a/streampark-flink/streampark-flink-kubernetes/pom.xml 
b/streampark-flink/streampark-flink-kubernetes/pom.xml
index 211c61b14..da594bd1c 100644
--- a/streampark-flink/streampark-flink-kubernetes/pom.xml
+++ b/streampark-flink/streampark-flink-kubernetes/pom.xml
@@ -72,10 +72,6 @@
                     <artifactId>flink-file-sink-common</artifactId>
                     <groupId>org.apache.flink</groupId>
                 </exclusion>
-                <exclusion>
-                    <artifactId>flink-hadoop-fs</artifactId>
-                    <groupId>org.apache.flink</groupId>
-                </exclusion>
             </exclusions>
         </dependency>
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
deleted file mode 100644
index fd758f4d3..000000000
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.kubernetes
-
-import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.util.Utils._
-
-import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
-import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
-import org.apache.commons.io.FileUtils
-import org.apache.flink.client.program.ClusterClient
-import org.json4s.{DefaultFormats, JArray}
-import org.json4s.jackson.JsonMethods.parse
-
-import java.io.File
-import java.io.IOException
-import java.nio.file.Files
-import java.nio.file.Paths
-
-import scala.collection.JavaConverters._
-import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
-
-object IngressController extends Logger {
-
-  def configureIngress(domainName: String, clusterId: String, nameSpace: 
String): Unit = {
-    Try(new DefaultKubernetesClient) match {
-      case Success(client) =>
-        val annotMap = Map[String, String](
-          "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
-          "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
-          "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite 
^(/" + clusterId + ")$ $1/ permanent;")
-        )
-        val labelsMap = Map[String, String](
-          "app" -> clusterId,
-          "type" -> "flink-native-kubernetes",
-          "component" -> "ingress")
-
-        val deployment = client
-          .apps()
-          .deployments()
-          .inNamespace(nameSpace)
-          .withName(clusterId)
-          .get()
-
-        val deploymentUid = if (deployment != null) {
-          deployment.getMetadata.getUid
-        } else {
-          throw new RuntimeException(
-            s"Deployment with name $clusterId not found in namespace 
$nameSpace")
-        }
-
-        // Create OwnerReference object
-        val ownerReference = new OwnerReferenceBuilder()
-          .withApiVersion("apps/v1")
-          .withKind("Deployment")
-          .withName(clusterId)
-          .withUid(deploymentUid)
-          .withController(true)
-          .withBlockOwnerDeletion(true)
-          .build()
-
-        val ingress = new IngressBuilder()
-          .withNewMetadata()
-          .withName(clusterId)
-          .addToAnnotations(annotMap.asJava)
-          .addToLabels(labelsMap.asJava)
-          .addToOwnerReferences(ownerReference) // Add OwnerReference
-          .endMetadata()
-          .withNewSpec()
-          .addNewRule()
-          .withHost(domainName)
-          .withNewHttp()
-          .addNewPath()
-          .withPath(s"/$nameSpace/$clusterId/")
-          .withPathType("ImplementationSpecific")
-          .withNewBackend()
-          .withNewService()
-          .withName(s"$clusterId-rest")
-          .withNewPort()
-          .withName("rest")
-          .endPort()
-          .endService()
-          .endBackend()
-          .endPath()
-          .addNewPath()
-          .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
-          .withPathType("ImplementationSpecific")
-          .withNewBackend()
-          .withNewService()
-          .withName(s"$clusterId-rest")
-          .withNewPort()
-          .withName("rest")
-          .endPort()
-          .endService()
-          .endBackend()
-          .endPath()
-          .endHttp()
-          .endRule()
-          .endSpec()
-          .build();
-        client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
-
-      case _ =>
-    }
-  }
-
-  def configureIngress(ingressOutput: String): Unit = {
-    close {
-      val client = new DefaultKubernetesClient
-      client.network.ingress
-        .load(Files.newInputStream(Paths.get(ingressOutput)))
-        .get()
-      client
-    }
-  }
-
-  private[this] def determineThePodSurvivalStatus(name: String, nameSpace: 
String): Boolean = {
-    tryWithResource(KubernetesRetriever.newK8sClient()) {
-      client =>
-        Try {
-          client
-            .apps()
-            .deployments()
-            .inNamespace(nameSpace)
-            .withName(name)
-            .get()
-            .getSpec()
-            .getSelector()
-            .getMatchLabels()
-          false
-        }.getOrElse(true)
-    }
-  }
-
-  def ingressUrlAddress(
-      nameSpace: String,
-      clusterId: String,
-      clusterClient: ClusterClient[_]): String = {
-    val client = new DefaultKubernetesClient
-    // for kubernetes 1.22+
-    lazy val fromV1 =
-      
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
-        .map(ingress => ingress.getSpec.getRules.get(0))
-        .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
-    // for kubernetes 1.22-
-    lazy val fromV1beta1 =
-      
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
-        .map(ingress => ingress.getSpec.getRules.get(0))
-        .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
-    Try(
-      fromV1
-        .orElse(fromV1beta1)
-        .map { case (host, path) => s"https://$host$path"; }
-        .getOrElse(clusterClient.getWebInterfaceURL)
-    ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress 
error."))
-  }
-
-  @throws[IOException]
-  def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: 
String): String = {
-    val workspaceDir = new File(buildWorkspace)
-    if (!workspaceDir.exists) workspaceDir.mkdir
-    if (ingressTemplates.isEmpty) null;
-    else {
-      val outputPath = buildWorkspace + "/ingress.yaml"
-      val outputFile = new File(outputPath)
-      FileUtils.write(outputFile, ingressTemplates, "UTF-8")
-      outputPath
-    }
-  }
-
-}
-
-case class IngressMeta(
-    addresses: List[String],
-    port: Integer,
-    protocol: String,
-    serviceName: String,
-    ingressName: String,
-    hostname: String,
-    path: String,
-    allNodes: Boolean)
-
-object IngressMeta {
-
-  @transient implicit lazy val formats: DefaultFormats.type = 
org.json4s.DefaultFormats
-
-  def as(json: String): Option[List[IngressMeta]] = {
-    Try(parse(json)) match {
-      case Success(ok) =>
-        ok match {
-          case JArray(arr) =>
-            val list = arr.map(
-              x => {
-                IngressMeta(
-                  addresses =
-                    (x \ 
"addresses").extractOpt[List[String]].getOrElse(List.empty[String]),
-                  port = (x \ "port").extractOpt[Integer].getOrElse(0),
-                  protocol = (x \ 
"protocol").extractOpt[String].getOrElse(null),
-                  serviceName = (x \ 
"serviceName").extractOpt[String].getOrElse(null),
-                  ingressName = (x \ 
"ingressName").extractOpt[String].getOrElse(null),
-                  hostname = (x \ 
"hostname").extractOpt[String].getOrElse(null),
-                  path = (x \ "path").extractOpt[String].getOrElse(null),
-                  allNodes = (x \ 
"allNodes").extractOpt[Boolean].getOrElse(false)
-                )
-              })
-            Some(list)
-          case _ => None
-        }
-      case Failure(_) => None
-    }
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index b2808fc1c..d4e6ec0ed 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -20,6 +20,7 @@ package org.apache.streampark.flink.kubernetes
 import org.apache.streampark.common.util.{Logger, Utils}
 import org.apache.streampark.common.util.Utils.tryWithResource
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
 
 import io.fabric8.kubernetes.client.{DefaultKubernetesClient, 
KubernetesClient, KubernetesClientException}
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
new file mode 100644
index 000000000..c636010b6
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.util.Utils.using
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.flink.client.program.ClusterClient
+
+import scala.language.postfixOps
+
+object IngressController extends Logger {
+
+  private lazy val ingressStrategy: IngressStrategy = {
+    using(new DefaultKubernetesClient()) {
+      client =>
+        val versionInfo = client.getVersion
+        val version = 
s"${versionInfo.getMajor}.${versionInfo.getMinor}".toDouble
+        if (version >= 1.19) {
+          new IngressStrategyV1()
+        } else {
+          new IngressStrategyV1beta1()
+        }
+    }
+  }
+
+  def configureIngress(domainName: String, clusterId: String, nameSpace: 
String): Unit = {
+    ingressStrategy.configureIngress(domainName, clusterId, nameSpace)
+  }
+
+  def ingressUrlAddress(
+      nameSpace: String,
+      clusterId: String,
+      clusterClient: ClusterClient[_]): String = {
+    ingressStrategy.ingressUrlAddress(nameSpace, clusterId, clusterClient)
+  }
+
+  def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: 
String): String = {
+    ingressStrategy.prepareIngressTemplateFiles(buildWorkspace, 
ingressTemplates)
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
new file mode 100644
index 000000000..4df560b96
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.commons.io.FileUtils
+import org.apache.flink.client.program.ClusterClient
+
+import java.io.File
+
+trait IngressStrategy {
+  def ingressUrlAddress(
+      nameSpace: String,
+      clusterId: String,
+      clusterClient: ClusterClient[_]): String
+  def configureIngress(domainName: String, clusterId: String, nameSpace: 
String): Unit
+
+  def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: 
String): String = {
+    val workspaceDir = new File(buildWorkspace)
+    if (!workspaceDir.exists) workspaceDir.mkdir
+    if (ingressTemplates.isEmpty) null
+    else {
+      val outputPath = buildWorkspace + "/ingress.yaml"
+      val outputFile = new File(outputPath)
+      FileUtils.write(outputFile, ingressTemplates, "UTF-8")
+      outputPath
+    }
+  }
+
+  def buildIngressAnnotations(clusterId: String): Map[String, String] = {
+    Map(
+      "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
+      "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
+      "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + 
clusterId + ")$ $1/ permanent;")
+    )
+  }
+
+  def buildIngressLabels(clusterId: String): Map[String, String] = {
+    Map(
+      "app" -> clusterId,
+      "type" -> "flink-native-kubernetes",
+      "component" -> "ingress"
+    )
+  }
+
+  def getDeployment(nameSpace: String, clusterId: String, client: 
DefaultKubernetesClient) = {
+    
client.apps().deployments().inNamespace(nameSpace).withName(clusterId).get()
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
new file mode 100644
index 000000000..9bd40c2d1
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder
+import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.flink.client.program.ClusterClient
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.{Success, Try}
+
+class IngressStrategyV1 extends IngressStrategy {
+
+  override def ingressUrlAddress(
+      nameSpace: String,
+      clusterId: String,
+      clusterClient: ClusterClient[_]): String = {
+    val client = new DefaultKubernetesClient
+    val fromV1 =
+      
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+        .map(ingress => ingress.getSpec.getRules.get(0))
+        .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
+    Try(
+      fromV1
+        .map { case (host, path) => s"https://$host$path"; }
+        .getOrElse(clusterClient.getWebInterfaceURL)
+    ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress 
error."))
+  }
+
+  override def configureIngress(domainName: String, clusterId: String, 
nameSpace: String): Unit = {
+    Try(new DefaultKubernetesClient) match {
+      case Success(client) =>
+        val deployment = getDeployment(nameSpace, clusterId, client)
+
+        val deploymentUid = if (deployment != null) {
+          deployment.getMetadata.getUid
+        } else {
+          throw new RuntimeException(
+            s"Deployment with name $clusterId not found in namespace 
$nameSpace")
+        }
+
+        // Create OwnerReference object
+        val ownerReference = new OwnerReferenceBuilder()
+          .withApiVersion("apps/v1")
+          .withKind("Deployment")
+          .withName(clusterId)
+          .withUid(deploymentUid)
+          .withController(true)
+          .withBlockOwnerDeletion(true)
+          .build()
+
+        val ingress = new IngressBuilder()
+          .withNewMetadata()
+          .withName(clusterId)
+          .addToAnnotations(buildIngressAnnotations(clusterId).asJava)
+          .addToLabels(buildIngressLabels(clusterId).asJava)
+          .addToOwnerReferences(ownerReference) // Add OwnerReference
+          .endMetadata()
+          .withNewSpec()
+          .addNewRule()
+          .withHost(domainName)
+          .withNewHttp()
+          .addNewPath()
+          .withPath(s"/$nameSpace/$clusterId/")
+          .withPathType("ImplementationSpecific")
+          .withNewBackend()
+          .withNewService()
+          .withName(s"$clusterId-rest")
+          .withNewPort()
+          .withName("rest")
+          .endPort()
+          .endService()
+          .endBackend()
+          .endPath()
+          .addNewPath()
+          .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
+          .withPathType("ImplementationSpecific")
+          .withNewBackend()
+          .withNewService()
+          .withName(s"$clusterId-rest")
+          .withNewPort()
+          .withName("rest")
+          .endPort()
+          .endService()
+          .endBackend()
+          .endPath()
+          .endHttp()
+          .endRule()
+          .endSpec()
+          .build();
+        client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
+      case _ =>
+    }
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
new file mode 100644
index 000000000..f3d5a9189
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
+import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.flink.client.program.ClusterClient
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.{Success, Try}
+
+class IngressStrategyV1beta1 extends IngressStrategy {
+
+  override def ingressUrlAddress(
+      nameSpace: String,
+      clusterId: String,
+      clusterClient: ClusterClient[_]): String = {
+    val client = new DefaultKubernetesClient
+    // for kubernetes 1.19-
+    val fromV1beta1 =
+      
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+        .map(ingress => ingress.getSpec.getRules.get(0))
+        .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
+    Try(
+      fromV1beta1
+        .map { case (host, path) => s"https://$host$path"; }
+        .getOrElse(clusterClient.getWebInterfaceURL)
+    ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress 
error."))
+  }
+
+  override def configureIngress(domainName: String, clusterId: String, 
nameSpace: String): Unit = {
+    Try(new DefaultKubernetesClient) match {
+      case Success(client) =>
+        val deployment = getDeployment(nameSpace, clusterId, client)
+
+        val deploymentUid = if (deployment != null) {
+          deployment.getMetadata.getUid
+        } else {
+          throw new RuntimeException(
+            s"Deployment with name $clusterId not found in namespace 
$nameSpace")
+        }
+
+        // Create OwnerReference object
+        val ownerReference = new OwnerReferenceBuilder()
+          .withApiVersion("apps/v1")
+          .withKind("Deployment")
+          .withName(clusterId)
+          .withUid(deploymentUid)
+          .withController(true)
+          .withBlockOwnerDeletion(true)
+          .build()
+
+        val ingress = new IngressBuilder()
+          .withNewMetadata()
+          .withName(clusterId)
+          .addToAnnotations(buildIngressAnnotations(clusterId).asJava)
+          .addToLabels(buildIngressLabels(clusterId).asJava)
+          .addToOwnerReferences(ownerReference)
+          .endMetadata()
+          .withNewSpec()
+          .addNewRule()
+          .withHost(domainName)
+          .withNewHttp()
+          .addNewPath()
+          .withPath(s"/$nameSpace/$clusterId/")
+          .withNewBackend()
+          .withServiceName(s"$clusterId-rest")
+          .withServicePort(new IntOrString("rest"))
+          .endBackend()
+          .endPath()
+          .addNewPath()
+          .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
+          .withNewBackend()
+          .withServiceName(s"$clusterId-rest")
+          .withServicePort(new IntOrString("rest"))
+          .endBackend()
+          .endPath()
+          .endHttp()
+          .endRule()
+          .endSpec()
+          .build()
+        client.network.ingress.inNamespace(nameSpace).create(ingress)
+      case _ =>
+    }
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 03d68b3d3..55be3f9de 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.kubernetes.watcher
 
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, IngressController, JobStatusWatcherConfig, 
KubernetesRetriever}
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
 import 
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, 
SESSION}
 import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 5567cf5e9..24abbcdfb 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.streampark.flink.kubernetes
 
 import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
@@ -294,7 +295,7 @@ class FlinkRestJsonTest {
         |]
         |""".stripMargin
 
-    val ingressMeta = IngressMeta.as(json)
+    val ingressMeta = new IngressMetaTestUtil().IngressMeta.as(json)
     println(ingressMeta.get)
   }
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
new file mode 100644
index 000000000..3bd8ff10b
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes
+
+import org.json4s.{DefaultFormats, JArray, JsonAST}
+import org.json4s.jackson.JsonMethods.parse
+
+import scala.util.{Failure, Success, Try}
+
+class IngressMetaTestUtil {
+
+  case class IngressMeta(
+      addresses: List[String],
+      port: Integer,
+      protocol: String,
+      serviceName: String,
+      ingressName: String,
+      hostname: String,
+      path: String,
+      allNodes: Boolean)
+
+  object IngressMeta {
+
+    @transient implicit lazy val formats: DefaultFormats.type = 
org.json4s.DefaultFormats
+
+    def as(json: String): Option[List[IngressMeta]] = {
+      Try(parse(json)) match {
+        case Success(ok) =>
+          ok match {
+            case JArray(arr) =>
+              val list = arr.map(
+                x => {
+                  IngressMeta(
+                    addresses =
+                      (x \ 
"addresses").extractOpt[List[String]].getOrElse(List.empty[String]),
+                    port = (x \ "port").extractOpt[Integer].getOrElse(0),
+                    protocol = (x \ "protocol").extractOpt[String].orNull,
+                    serviceName = (x \ 
"serviceName").extractOpt[String].orNull,
+                    ingressName = (x \ 
"ingressName").extractOpt[String].orNull,
+                    hostname = (x \ "hostname").extractOpt[String].orNull,
+                    path = (x \ "path").extractOpt[String].orNull,
+                    allNodes = (x \ 
"allNodes").extractOpt[Boolean].getOrElse(false)
+                  )
+                })
+              Some(list)
+            case _ => None
+          }
+        case Failure(_) => None
+      }
+    }
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/PodTemplateParserTest.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/PodTemplateParserTest.scala
index 4c38f8d18..e9083db7b 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/PodTemplateParserTest.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/PodTemplateParserTest.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.streampark.flink.kubernetes
 
 import org.junit.jupiter.api.Assertions.assertEquals
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 93b8aab17..fc3fe4738 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -20,7 +20,8 @@ package org.apache.streampark.flink.packer.pipeline.impl
 import org.apache.streampark.common.enums.DevelopmentMode
 import org.apache.streampark.common.fs.LfsOperator
 import org.apache.streampark.common.util.ThreadUtils
-import org.apache.streampark.flink.kubernetes.{IngressController, 
PodTemplateTool}
+import org.apache.streampark.flink.kubernetes.PodTemplateTool
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
 import org.apache.streampark.flink.packer.docker._
 import org.apache.streampark.flink.packer.maven.MavenTool
 import org.apache.streampark.flink.packer.pipeline._


Reply via email to