This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch ingress_class
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 7f30f5a3c443770f759f2fc15e46192c2e6d0487
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 27 17:40:01 2024 +0800

    [Improve] ingress class improvement
---
 .../flink/kubernetes/ingress/IngressController.scala        |  2 +-
 .../flink/kubernetes/ingress/IngressStrategy.scala          |  4 +---
 .../flink/kubernetes/ingress/IngressStrategyV1.scala        |  1 -
 .../flink/kubernetes/ingress/IngressStrategyV1beta1.scala   | 13 ++++++++++++-
 4 files changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index 8bf576329..d6597ef19 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -29,7 +29,7 @@ object IngressController extends Logger {
 
   private[this] val VERSION_REGEXP = "(\\d+\\.\\d+)".r
 
-  private lazy val clusterVersion = using(new DefaultKubernetesClient()) {
+  lazy val clusterVersion = using(new DefaultKubernetesClient()) {
     client => 
VERSION_REGEXP.findFirstIn(client.getVersion.getGitVersion).get.toDouble
   }
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index b44c216f8..ade4c8cae 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -47,13 +47,11 @@ trait IngressStrategy {
   }
 
   def buildIngressAnnotations(clusterId: String, namespace: String): 
Map[String, String] = {
-    val annotations = Map(
-      "kubernetes.io/ingress.class" -> ingressClass,
+    Map(
       "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
       "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
       "nginx.ingress.kubernetes.io/configuration-snippet" -> s"""rewrite 
^(/$clusterId)$$ $$1/ permanent; sub_filter '<base href="./">' '<base 
href="/$namespace/$clusterId/">'; sub_filter_once off;"""
     )
-    annotations
   }
 
   def buildIngressLabels(clusterId: String): Map[String, String] = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index d7fd5d37e..9e08338e9 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -47,7 +47,6 @@ class IngressStrategyV1 extends IngressStrategy {
             throw new RuntimeException(s"[StreamPark] get ingressUrlAddress 
error: $e")
         }.get
     }
-
   }
 
   override def configureIngress(domainName: String, clusterId: String, 
nameSpace: String): Unit = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index 878f13b3e..1cfeceb68 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -22,6 +22,7 @@ import org.apache.streampark.common.util.Utils
 import io.fabric8.kubernetes.api.model.IntOrString
 import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.program.ClusterClient
 
 import scala.collection.JavaConverters._
@@ -50,6 +51,17 @@ class IngressStrategyV1beta1 extends IngressStrategy {
     }
   }
 
+  override def buildIngressAnnotations(
+      clusterId: String,
+      namespace: String): Map[String, String] = {
+    val map = super.buildIngressAnnotations(clusterId, namespace)
+    if (StringUtils.isNotBlank(ingressClass)) {
+      Map("kubernetes.io/ingress.class" -> ingressClass) ++ map
+    } else {
+      map
+    }
+  }
+
   override def configureIngress(domainName: String, clusterId: String, 
nameSpace: String): Unit = {
     Utils.using(new DefaultKubernetesClient) {
       client =>
@@ -62,7 +74,6 @@ class IngressStrategyV1beta1 extends IngressStrategy {
           .addToOwnerReferences(ownerReference)
           .endMetadata()
           .withNewSpec()
-          .withIngressClassName(ingressClass)
           .addNewRule()
           .withHost(domainName)
           .withNewHttp()

Reply via email to