This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
     new 5626ac87b Fix Issue #3638 : Flink native k8s application task with 
INITIALIZING status (#3639)
5626ac87b is described below

commit 5626ac87bb093430441c3ec6e017a9058f31f82c
Author: Yuze <[email protected]>
AuthorDate: Fri Mar 29 11:09:44 2024 +0800

    Fix Issue #3638 : Flink native k8s application task with INITIALIZING 
status (#3639)
    
    Co-authored-by: ze.yu <[email protected]>
---
 .../flink/kubernetes/KubernetesRetriever.scala     | 19 +++++-------
 .../kubernetes/ingress/IngressController.scala     |  6 ++--
 .../flink/kubernetes/ingress/IngressStrategy.scala |  5 ++-
 .../kubernetes/ingress/IngressStrategyV1.scala     | 36 ++++++++++++++++------
 .../ingress/IngressStrategyV1beta1.scala           | 10 +++---
 5 files changed, 47 insertions(+), 29 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index f57060795..c93a84923 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -136,17 +136,14 @@ object KubernetesRetriever extends Logger {
 
   /** retrieve flink jobManager rest url */
   def retrieveFlinkRestUrl(clusterKey: ClusterKey): Option[String] = {
-    Utils.using(
-      KubernetesRetriever
-        .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace, 
clusterKey.executeMode)
-        .getOrElse(return None)) {
-      client =>
-        val url =
-          IngressController.getIngressUrl(clusterKey.namespace, 
clusterKey.clusterId, client)
-        logger.info(s"retrieve flink jobManager rest url: $url")
-        client.close()
-        Some(url)
-    }
+    val url =
+      IngressController.getIngressUrl(clusterKey.namespace, 
clusterKey.clusterId) {
+        KubernetesRetriever
+          .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace, 
clusterKey.executeMode)
+          .getOrElse(return None)
+      }
+    logger.info(s"retrieve flink jobManager rest url: $url")
+    Some(url)
   }
 
   def getSessionClusterIngressURL(namespace: String, clusterId: String): 
String = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index 8bf576329..9394bcd6a 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -47,9 +47,9 @@ object IngressController extends Logger {
 
   def getIngressUrl(
       nameSpace: String,
-      clusterId: String,
-      clusterClient: ClusterClient[_]): String = {
-    ingressStrategy.getIngressUrl(nameSpace, clusterId, clusterClient)
+      clusterId: String
+  )(clusterClient: => ClusterClient[_]): String = {
+    ingressStrategy.getIngressUrl(nameSpace, clusterId)(clusterClient)
   }
 
   def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: 
String): String = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index ade4c8cae..0d2e863f5 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -28,9 +28,12 @@ import java.io.File
 
 trait IngressStrategy {
 
+  val REST_SERVICE_IDENTIFICATION = "rest"
+
   lazy val ingressClass: String = 
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
 
-  def getIngressUrl(nameSpace: String, clusterId: String, clusterClient: 
ClusterClient[_]): String
+  def getIngressUrl(nameSpace: String, clusterId: String)(
+      clusterClient: => ClusterClient[_]): String
 
   def configureIngress(domainName: String, clusterId: String, nameSpace: 
String): Unit
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index 9e08338e9..1ac15bfdc 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -29,10 +29,8 @@ import scala.util.Try
 
 class IngressStrategyV1 extends IngressStrategy {
 
-  override def getIngressUrl(
-      nameSpace: String,
-      clusterId: String,
-      clusterClient: ClusterClient[_]): String = {
+  override def getIngressUrl(nameSpace: String, clusterId: String)(
+      clusterClient: => ClusterClient[_]): String = {
 
     Utils.using(new DefaultKubernetesClient) {
       client =>
@@ -41,7 +39,9 @@ class IngressStrategyV1 extends IngressStrategy {
             .map(ingress => ingress.getSpec.getRules.get(0))
             .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
             .map { case (host, path) => s"http://$host$path"; }
-            .getOrElse(clusterClient.getWebInterfaceURL)
+            .getOrElse {
+              Utils.using(clusterClient)(client => client.getWebInterfaceURL)
+            }
         }.recover {
           case e =>
             throw new RuntimeException(s"[StreamPark] get ingressUrlAddress 
error: $e")
@@ -49,10 +49,28 @@ class IngressStrategyV1 extends IngressStrategy {
     }
   }
 
+  private[this] def touchIngressBackendRestPort(
+      client: DefaultKubernetesClient,
+      clusterId: String,
+      nameSpace: String): Int = {
+    var ports = client.services
+      .inNamespace(nameSpace)
+      .withName(s"$clusterId-$REST_SERVICE_IDENTIFICATION")
+      .get()
+      .getSpec
+      .getPorts
+      .asScala
+    ports =
+      ports.filter(servicePort => 
servicePort.getName.equalsIgnoreCase(REST_SERVICE_IDENTIFICATION))
+    ports.map(servicePort => servicePort.getTargetPort.getIntVal).head
+  }
+
   override def configureIngress(domainName: String, clusterId: String, 
nameSpace: String): Unit = {
     Utils.using(new DefaultKubernetesClient) {
       client =>
         val ownerReference = getOwnerReference(nameSpace, clusterId, client)
+        val ingressBackendRestServicePort =
+          touchIngressBackendRestPort(client, clusterId, nameSpace)
         val ingress = new IngressBuilder()
           .withNewMetadata()
           .withName(clusterId)
@@ -70,9 +88,9 @@ class IngressStrategyV1 extends IngressStrategy {
           .withPathType("ImplementationSpecific")
           .withNewBackend()
           .withNewService()
-          .withName(s"$clusterId-rest")
+          .withName(s"$clusterId-$REST_SERVICE_IDENTIFICATION")
           .withNewPort()
-          .withName("rest")
+          .withNumber(ingressBackendRestServicePort)
           .endPort()
           .endService()
           .endBackend()
@@ -82,9 +100,9 @@ class IngressStrategyV1 extends IngressStrategy {
           .withPathType("ImplementationSpecific")
           .withNewBackend()
           .withNewService()
-          .withName(s"$clusterId-rest")
+          .withName(s"$clusterId-$REST_SERVICE_IDENTIFICATION")
           .withNewPort()
-          .withName("rest")
+          .withNumber(ingressBackendRestServicePort)
           .endPort()
           .endService()
           .endBackend()
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index 1cfeceb68..408210a0c 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -31,10 +31,8 @@ import scala.util.Try
 
 class IngressStrategyV1beta1 extends IngressStrategy {
 
-  override def getIngressUrl(
-      nameSpace: String,
-      clusterId: String,
-      clusterClient: ClusterClient[_]): String = {
+  override def getIngressUrl(nameSpace: String, clusterId: String)(
+      clusterClient: => ClusterClient[_]): String = {
 
     Utils.using(new DefaultKubernetesClient) {
       client =>
@@ -43,7 +41,9 @@ class IngressStrategyV1beta1 extends IngressStrategy {
             .map(ingress => ingress.getSpec.getRules.get(0))
             .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
             .map { case (host, path) => s"http://$host$path"; }
-            .getOrElse(clusterClient.getWebInterfaceURL)
+            .getOrElse {
+              Utils.using(clusterClient)(client => client.getWebInterfaceURL)
+            }
         }.recover {
           case e =>
             throw new RuntimeException(s"[StreamPark] get ingressUrlAddress 
error: $e")

Reply via email to