This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 7f134002d [Improve] ingressUrl improvement
7f134002d is described below

commit 7f134002d9db6fa50549f70485612914a69452ef
Author: benjobs <[email protected]>
AuthorDate: Tue Jan 30 14:08:44 2024 +0800

    [Improve] ingressUrl improvement
---
 .../core/service/impl/ApplicationServiceImpl.java     |  3 ++-
 .../core/service/impl/FlinkClusterServiceImpl.java    | 19 +++++++++++--------
 .../client/impl/KubernetesNativeSessionClient.scala   | 11 +++--------
 .../client/trait/KubernetesNativeClientTrait.scala    |  7 -------
 .../flink/kubernetes/KubernetesRetriever.scala        |  5 +++++
 .../flink/kubernetes/ingress/IngressController.scala  |  2 +-
 6 files changed, 22 insertions(+), 25 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 6c1fc29da..bd189cdbe 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1421,6 +1421,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   @Override
   public void starting(Application application) {
     application.setState(FlinkAppState.STARTING.getValue());
+    application.setJobManagerUrl(null);
     application.setOptionTime(new Date());
     updateById(application);
   }
@@ -1658,7 +1659,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             application.setJobId(response.jobId());
           }
 
-          if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
+          if (StringUtils.isNotEmpty(response.jobManagerUrl())) {
             application.setJobManagerUrl(response.jobManagerUrl());
             applicationLog.setJobManagerUrl(response.jobManagerUrl());
           }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 88b648728..cc04a5f5d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -32,6 +32,7 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.ServiceHelper;
+import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.YarnQueueService;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.DeployRequest;
@@ -39,8 +40,6 @@ import org.apache.streampark.flink.client.bean.DeployResponse;
 import org.apache.streampark.flink.client.bean.KubernetesDeployRequest;
 import org.apache.streampark.flink.client.bean.ShutDownResponse;
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
-import org.apache.streampark.flink.kubernetes.model.ClusterKey;
-import org.apache.streampark.flink.kubernetes.model.TrackId;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -92,6 +91,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
 
   @Autowired private YarnQueueService yarnQueueService;
 
+  @Autowired private SettingService settingService;
+
   @Override
   public ResponseResult check(FlinkCluster cluster) {
     ResponseResult result = new ResponseResult();
@@ -364,12 +365,14 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     List<FlinkCluster> clusters = list();
     for (FlinkCluster cluster : clusters) {
       if 
(ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(cluster.getExecutionModeEnum()))
 {
-        cluster.setAddress(
-            KubernetesRetriever.retrieveFlinkRestUrl(
-                    ClusterKey.of(
-                        TrackId.onSession(
-                            cluster.getK8sNamespace(), cluster.getClusterId(), 
0L, null, null)))
-                .getOrElse(cluster::getAddress));
+        if (StringUtils.isNotBlank(settingService.getIngressModeDefault())) {
+          String namespace = cluster.getK8sNamespace();
+          String clusterId = cluster.getClusterId();
+          String ingressUrl = 
KubernetesRetriever.getSessionClusterIngressURL(namespace, clusterId);
+          if (ingressUrl != null) {
+            cluster.setAddress(ingressUrl);
+          }
+        }
       }
     }
     return clusters;
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 54bfe210e..c9f7cd2a5 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -109,9 +109,8 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
         .getClusterClient
       val submitResult = client.submitJob(jobGraph)
       val jobId = submitResult.get().toString
-      val url = getClientURL(client, submitRequest.kubernetesNamespace, 
submitRequest.clusterId)
       val result =
-        SubmitResponse(client.getClusterId, flinkConfig.toMap, jobId, url)
+        SubmitResponse(client.getClusterId, flinkConfig.toMap, jobId, 
client.getWebInterfaceURL)
       logInfo(
         s"[flink-submit] flink job has been submitted. 
${flinkConfIdentifierInfo(flinkConfig)}, jobId: $jobId")
       result
@@ -170,12 +169,7 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
         client =
           
clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
       }
-      val url = getClientURL(client, deployRequest.kubernetesNamespace, 
deployRequest.clusterId)
-      if (url != null) {
-        DeployResponse(address = url, clusterId = client.getClusterId)
-      } else {
-        DeployResponse(error = new RuntimeException("get the cluster 
getWebInterfaceURL failed."))
-      }
+      DeployResponse(address = client.getWebInterfaceURL, clusterId = 
client.getClusterId)
     } catch {
       case e: Exception => DeployResponse(error = e)
     } finally {
@@ -239,4 +233,5 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
     flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
     super.doTriggerSavepoint(request, flinkConf)
   }
+
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 36bb67b91..ec3134502 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -19,7 +19,6 @@ package org.apache.streampark.flink.client.`trait`
 
 import org.apache.streampark.common.enums.{ExecutionMode, 
FlinkK8sRestExposedType}
 import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.ingress.IngressController
 import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
 
 import org.apache.commons.lang3.StringUtils
@@ -34,7 +33,6 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.Service
 import javax.annotation.Nonnull
 
 import scala.language.postfixOps
-import scala.util.Try
 
 /** kubernetes native mode submit */
 trait KubernetesNativeClientTrait extends FlinkClientTrait {
@@ -185,9 +183,4 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
     else homePath.concat("/.kube/config")
   }
 
-  def getClientURL(client: ClusterClient[_], namespace: String, clusterId: 
String): String = {
-    Try(IngressController.getIngressUrl(namespace, clusterId, client))
-      .getOrElse(client.getWebInterfaceURL)
-  }
-
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index dd308fb96..fac9213b6 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -148,4 +148,9 @@ object KubernetesRetriever extends Logger {
         Some(url)
     }
   }
+
+  def getSessionClusterIngressURL(namespace: String, clusterId: String): 
String = {
+    retrieveFlinkRestUrl(ClusterKey(FlinkK8sExecuteMode.SESSION, namespace, 
clusterId)).orNull
+  }
+
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index d6597ef19..8bf576329 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -29,7 +29,7 @@ object IngressController extends Logger {
 
   private[this] val VERSION_REGEXP = "(\\d+\\.\\d+)".r
 
-  lazy val clusterVersion = using(new DefaultKubernetesClient()) {
+  private lazy val clusterVersion = using(new DefaultKubernetesClient()) {
     client => 
VERSION_REGEXP.findFirstIn(client.getVersion.getGitVersion).get.toDouble
   }
 

Reply via email to