This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 8907e7ac7 [Improve] ingress class improvement (#3514)
8907e7ac7 is described below
commit 8907e7ac7c8714f8815b6f28801df65b72d142dc
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 28 12:21:27 2024 +0800
[Improve] ingress class improvement (#3514)
Co-authored-by: benjobs <[email protected]>
---
.../flink/kubernetes/ingress/IngressController.scala | 2 +-
.../flink/kubernetes/ingress/IngressStrategy.scala | 4 +---
.../flink/kubernetes/ingress/IngressStrategyV1.scala | 1 -
.../flink/kubernetes/ingress/IngressStrategyV1beta1.scala | 13 ++++++++++++-
4 files changed, 14 insertions(+), 6 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index 8bf576329..d6597ef19 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -29,7 +29,7 @@ object IngressController extends Logger {
private[this] val VERSION_REGEXP = "(\\d+\\.\\d+)".r
- private lazy val clusterVersion = using(new DefaultKubernetesClient()) {
+ lazy val clusterVersion = using(new DefaultKubernetesClient()) {
client =>
VERSION_REGEXP.findFirstIn(client.getVersion.getGitVersion).get.toDouble
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index b44c216f8..ade4c8cae 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -47,13 +47,11 @@ trait IngressStrategy {
}
def buildIngressAnnotations(clusterId: String, namespace: String):
Map[String, String] = {
- val annotations = Map(
- "kubernetes.io/ingress.class" -> ingressClass,
+ Map(
"nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
"nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
"nginx.ingress.kubernetes.io/configuration-snippet" -> s"""rewrite
^(/$clusterId)$$ $$1/ permanent; sub_filter '<base href="./">' '<base
href="/$namespace/$clusterId/">'; sub_filter_once off;"""
)
- annotations
}
def buildIngressLabels(clusterId: String): Map[String, String] = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index d7fd5d37e..9e08338e9 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -47,7 +47,6 @@ class IngressStrategyV1 extends IngressStrategy {
throw new RuntimeException(s"[StreamPark] get ingressUrlAddress
error: $e")
}.get
}
-
}
override def configureIngress(domainName: String, clusterId: String,
nameSpace: String): Unit = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index 878f13b3e..1cfeceb68 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -22,6 +22,7 @@ import org.apache.streampark.common.util.Utils
import io.fabric8.kubernetes.api.model.IntOrString
import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.program.ClusterClient
import scala.collection.JavaConverters._
@@ -50,6 +51,17 @@ class IngressStrategyV1beta1 extends IngressStrategy {
}
}
+ override def buildIngressAnnotations(
+ clusterId: String,
+ namespace: String): Map[String, String] = {
+ val map = super.buildIngressAnnotations(clusterId, namespace)
+ if (StringUtils.isNotBlank(ingressClass)) {
+ Map("kubernetes.io/ingress.class" -> ingressClass) ++ map
+ } else {
+ map
+ }
+ }
+
override def configureIngress(domainName: String, clusterId: String,
nameSpace: String): Unit = {
Utils.using(new DefaultKubernetesClient) {
client =>
@@ -62,7 +74,6 @@ class IngressStrategyV1beta1 extends IngressStrategy {
.addToOwnerReferences(ownerReference)
.endMetadata()
.withNewSpec()
- .withIngressClassName(ingressClass)
.addNewRule()
.withHost(domainName)
.withNewHttp()