This is an automated email from the ASF dual-hosted git repository.
kriszu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 4357a9ee8 [Improve] ingress multi-version adaptation (#2704)
4357a9ee8 is described below
commit 4357a9ee85b11fd5e780f05d9ab98e9c78fef9a1
Author: benjobs <[email protected]>
AuthorDate: Wed Apr 26 08:52:15 2023 +0800
[Improve] ingress multi-version adaptation (#2704)
---
.../flink/kubernetes/ingress/IngressStrategy.scala | 3 +--
.../flink/kubernetes/ingress/IngressStrategyV1.scala | 9 +++++----
.../flink/kubernetes/ingress/IngressStrategyV1beta1.scala | 12 +++++-------
.../pipeline/impl/FlinkK8sApplicationBuildPipeline.scala | 2 +-
4 files changed, 12 insertions(+), 14 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index 81466ed9f..e03d3ae5a 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -77,12 +77,11 @@ trait IngressStrategy {
deployment != null,
s"Deployment with name $clusterId not found in namespace $nameSpace")
- val uid = deployment.getMetadata.getUid
new OwnerReferenceBuilder()
+ .withUid(deployment.getMetadata.getUid)
.withApiVersion("apps/v1")
.withKind("Deployment")
.withName(clusterId)
- .withUid(uid)
.withController(true)
.withBlockOwnerDeletion(true)
.build()
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index 234eaee56..8fb5711e6 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -36,15 +36,16 @@ class IngressStrategyV1 extends IngressStrategy {
Utils.using(new DefaultKubernetesClient) {
client =>
- val hosts =
+ Try {
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
.map(ingress => ingress.getSpec.getRules.get(0))
.map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- Try(
- hosts
.map { case (host, path) => s"https://$host$path" }
.getOrElse(clusterClient.getWebInterfaceURL)
- ).getOrElse(throw new RuntimeException("[StreamPark] get
ingressUrlAddress error."))
+ }.recover {
+ case e =>
+ throw new RuntimeException(s"[StreamPark] get ingressUrlAddress
error: $e")
+ }.get
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index 1d4891db8..97654796c 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -37,18 +37,16 @@ class IngressStrategyV1beta1 extends IngressStrategy {
Utils.using(new DefaultKubernetesClient) {
client =>
- // for kubernetes 1.19-
- val hosts =
+ Try {
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
.map(ingress => ingress.getSpec.getRules.get(0))
.map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
-
- Try(
- hosts
.map { case (host, path) => s"https://$host$path" }
.getOrElse(clusterClient.getWebInterfaceURL)
- ).getOrElse(throw new RuntimeException("[StreamPark] get
ingressUrlAddress error."))
-
+ }.recover {
+ case e =>
+ throw new RuntimeException(s"[StreamPark] get ingressUrlAddress
error: $e")
+ }.get
}
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index fc3fe4738..72d47991e 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -243,7 +243,7 @@ object FlinkK8sApplicationBuildPipeline {
val execPool = new ThreadPoolExecutor(
Runtime.getRuntime.availableProcessors * 5,
- Runtime.getRuntime().availableProcessors() * 10,
+ Runtime.getRuntime.availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue[Runnable](2048),