This is an automated email from the ASF dual-hosted git repository.

kriszu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4357a9ee8 [Improve] ingress multi-version adaptation (#2704)
4357a9ee8 is described below

commit 4357a9ee85b11fd5e780f05d9ab98e9c78fef9a1
Author: benjobs <[email protected]>
AuthorDate: Wed Apr 26 08:52:15 2023 +0800

    [Improve] ingress multi-version adaptation (#2704)
---
 .../flink/kubernetes/ingress/IngressStrategy.scala           |  3 +--
 .../flink/kubernetes/ingress/IngressStrategyV1.scala         |  9 +++++----
 .../flink/kubernetes/ingress/IngressStrategyV1beta1.scala    | 12 +++++-------
 .../pipeline/impl/FlinkK8sApplicationBuildPipeline.scala     |  2 +-
 4 files changed, 12 insertions(+), 14 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index 81466ed9f..e03d3ae5a 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -77,12 +77,11 @@ trait IngressStrategy {
       deployment != null,
       s"Deployment with name $clusterId not found in namespace $nameSpace")
 
-    val uid = deployment.getMetadata.getUid
     new OwnerReferenceBuilder()
+      .withUid(deployment.getMetadata.getUid)
       .withApiVersion("apps/v1")
       .withKind("Deployment")
       .withName(clusterId)
-      .withUid(uid)
       .withController(true)
       .withBlockOwnerDeletion(true)
       .build()
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index 234eaee56..8fb5711e6 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -36,15 +36,16 @@ class IngressStrategyV1 extends IngressStrategy {
 
     Utils.using(new DefaultKubernetesClient) {
       client =>
-        val hosts =
+        Try {
           
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
             .map(ingress => ingress.getSpec.getRules.get(0))
             .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
-        Try(
-          hosts
             .map { case (host, path) => s"https://$host$path"; }
             .getOrElse(clusterClient.getWebInterfaceURL)
-        ).getOrElse(throw new RuntimeException("[StreamPark] get 
ingressUrlAddress error."))
+        }.recover {
+          case e =>
+            throw new RuntimeException(s"[StreamPark] get ingressUrlAddress 
error: $e")
+        }.get
     }
 
   }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index 1d4891db8..97654796c 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -37,18 +37,16 @@ class IngressStrategyV1beta1 extends IngressStrategy {
 
     Utils.using(new DefaultKubernetesClient) {
       client =>
-        // for kubernetes 1.19-
-        val hosts =
+        Try {
           
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
             .map(ingress => ingress.getSpec.getRules.get(0))
             .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
-
-        Try(
-          hosts
             .map { case (host, path) => s"https://$host$path"; }
             .getOrElse(clusterClient.getWebInterfaceURL)
-        ).getOrElse(throw new RuntimeException("[StreamPark] get 
ingressUrlAddress error."))
-
+        }.recover {
+          case e =>
+            throw new RuntimeException(s"[StreamPark] get ingressUrlAddress 
error: $e")
+        }.get
     }
   }
 
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index fc3fe4738..72d47991e 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -243,7 +243,7 @@ object FlinkK8sApplicationBuildPipeline {
 
   val execPool = new ThreadPoolExecutor(
     Runtime.getRuntime.availableProcessors * 5,
-    Runtime.getRuntime().availableProcessors() * 10,
+    Runtime.getRuntime.availableProcessors() * 10,
     60L,
     TimeUnit.SECONDS,
     new LinkedBlockingQueue[Runnable](2048),

Reply via email to