This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 32e38f166 [Improve] ingress improvement
32e38f166 is described below

commit 32e38f1669fe265d5bcf86140943ded26c7f90af
Author: benjobs <[email protected]>
AuthorDate: Wed Jan 24 21:53:24 2024 +0800

    [Improve] ingress improvement
---
 .../flink/client/impl/KubernetesNativeSessionClient.scala     | 10 ++++++++--
 .../streampark/flink/client/impl/YarnSessionClient.scala      |  8 +++++++-
 .../streampark/flink/client/trait/FlinkClientTrait.scala      |  7 -------
 .../flink/client/trait/KubernetesNativeClientTrait.scala      |  8 ++++++++
 .../streampark/flink/kubernetes/KubernetesRetriever.scala     |  5 +----
 .../flink/kubernetes/ingress/IngressController.scala          |  4 ++--
 .../streampark/flink/kubernetes/ingress/IngressStrategy.scala | 11 ++++-------
 .../flink/kubernetes/ingress/IngressStrategyV1.scala          |  2 +-
 .../flink/kubernetes/ingress/IngressStrategyV1beta1.scala     |  2 +-
 9 files changed, 32 insertions(+), 25 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 3606c03a0..54bfe210e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -109,8 +109,9 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
         .getClusterClient
       val submitResult = client.submitJob(jobGraph)
       val jobId = submitResult.get().toString
+      val url = getClientURL(client, submitRequest.kubernetesNamespace, 
submitRequest.clusterId)
       val result =
-        SubmitResponse(client.getClusterId, flinkConfig.toMap, jobId, 
client.getWebInterfaceURL)
+        SubmitResponse(client.getClusterId, flinkConfig.toMap, jobId, url)
       logInfo(
         s"[flink-submit] flink job has been submitted. 
${flinkConfIdentifierInfo(flinkConfig)}, jobId: $jobId")
       result
@@ -169,7 +170,12 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
         client =
           
clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
       }
-      getDeployResponse(client)
+      val url = getClientURL(client, deployRequest.kubernetesNamespace, 
deployRequest.clusterId)
+      if (url != null) {
+        DeployResponse(address = url, clusterId = client.getClusterId)
+      } else {
+        DeployResponse(error = new RuntimeException("get the cluster 
getWebInterfaceURL failed."))
+      }
     } catch {
       case e: Exception => DeployResponse(error = e)
     } finally {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 0e0b4b3c9..e901a30c1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -198,7 +198,13 @@ object YarnSessionClient extends YarnClientTrait {
       }
       val clientProvider = 
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
       client = clientProvider.getClusterClient
-      getDeployResponse(client)
+      if (client.getWebInterfaceURL != null) {
+        DeployResponse(
+          address = client.getWebInterfaceURL,
+          clusterId = client.getClusterId.toString)
+      } else {
+        DeployResponse(error = new RuntimeException("get the cluster 
getWebInterfaceURL failed."))
+      }
     } catch {
       case e: Exception => DeployResponse(error = e)
     } finally {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 09b4ed8ad..f1b171310 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -524,11 +524,4 @@ trait FlinkClientTrait extends Logger {
     clientWrapper.triggerSavepoint(jobID, savepointPath).get()
   }
 
-  def getDeployResponse(client: ClusterClient[_]): DeployResponse = {
-    if (client.getWebInterfaceURL != null) {
-      DeployResponse(address = client.getWebInterfaceURL, clusterId = 
client.getClusterId.toString)
-    } else {
-      DeployResponse(error = new RuntimeException("get the cluster 
getWebInterfaceURL failed."))
-    }
-  }
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 9e7468053..36bb67b91 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.client.`trait`
 
 import org.apache.streampark.common.enums.{ExecutionMode, 
FlinkK8sRestExposedType}
 import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
 import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
 
 import org.apache.commons.lang3.StringUtils
@@ -33,6 +34,7 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.Service
 import javax.annotation.Nonnull
 
 import scala.language.postfixOps
+import scala.util.Try
 
 /** kubernetes native mode submit */
 trait KubernetesNativeClientTrait extends FlinkClientTrait {
@@ -182,4 +184,10 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait 
{
     if (k8sConf != null) k8sConf.replace("~", homePath)
     else homePath.concat("/.kube/config")
   }
+
+  def getClientURL(client: ClusterClient[_], namespace: String, clusterId: 
String): String = {
+    Try(IngressController.getIngressUrl(namespace, clusterId, client))
+      .getOrElse(client.getWebInterfaceURL)
+  }
+
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index d2e3a8236..dd308fb96 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -34,10 +34,7 @@ import org.apache.hc.core5.util.Timeout
 
 import javax.annotation.Nullable
 
-import java.time.Duration
-
 import scala.collection.JavaConverters._
-import scala.util
 import scala.util.{Failure, Success, Try}
 
 object KubernetesRetriever extends Logger {
@@ -145,7 +142,7 @@ object KubernetesRetriever extends Logger {
         .getOrElse(return None)) {
       client =>
         val url =
-          IngressController.ingressUrlAddress(clusterKey.namespace, 
clusterKey.clusterId, client)
+          IngressController.getIngressUrl(clusterKey.namespace, 
clusterKey.clusterId, client)
         logger.info(s"retrieve flink jobManager rest url: $url")
         client.close()
         Some(url)
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index d443a22bd..7e82f996a 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -45,11 +45,11 @@ object IngressController extends Logger {
     ingressStrategy.configureIngress(domainName, clusterId, nameSpace)
   }
 
-  def ingressUrlAddress(
+  def getIngressUrl(
       nameSpace: String,
       clusterId: String,
       clusterClient: ClusterClient[_]): String = {
-    ingressStrategy.ingressUrlAddress(nameSpace, clusterId, clusterClient)
+    ingressStrategy.getIngressUrl(nameSpace, clusterId, clusterClient)
   }
 
   def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: 
String): String = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index a049c12a4..b44c216f8 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.ingress
 
-import org.apache.streampark.common.conf.{ConfigConst, InternalConfigHolder, 
K8sFlinkConfig}
+import org.apache.streampark.common.conf.{InternalConfigHolder, K8sFlinkConfig}
 
 import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder}
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
@@ -28,12 +28,9 @@ import java.io.File
 
 trait IngressStrategy {
 
-  lazy val ingressClass = 
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
+  lazy val ingressClass: String = 
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
 
-  def ingressUrlAddress(
-      nameSpace: String,
-      clusterId: String,
-      clusterClient: ClusterClient[_]): String
+  def getIngressUrl(nameSpace: String, clusterId: String, clusterClient: 
ClusterClient[_]): String
 
   def configureIngress(domainName: String, clusterId: String, nameSpace: 
String): Unit
 
@@ -50,7 +47,7 @@ trait IngressStrategy {
   }
 
   def buildIngressAnnotations(clusterId: String, namespace: String): 
Map[String, String] = {
-    var annotations = Map(
+    val annotations = Map(
       "kubernetes.io/ingress.class" -> ingressClass,
       "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
       "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index a4e08e49b..d7fd5d37e 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -29,7 +29,7 @@ import scala.util.Try
 
 class IngressStrategyV1 extends IngressStrategy {
 
-  override def ingressUrlAddress(
+  override def getIngressUrl(
       nameSpace: String,
       clusterId: String,
       clusterClient: ClusterClient[_]): String = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index f143f073a..878f13b3e 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -30,7 +30,7 @@ import scala.util.Try
 
 class IngressStrategyV1beta1 extends IngressStrategy {
 
-  override def ingressUrlAddress(
+  override def getIngressUrl(
       nameSpace: String,
       clusterId: String,
       clusterClient: ClusterClient[_]): String = {

Reply via email to