This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new bc3fb2d74 get cluster detail of flink session on k8s with ingress url 
bug fix (#3523)
bc3fb2d74 is described below

commit bc3fb2d74beb2f8acaf5481ff2d38338d9b74b58
Author: Dimitri <[email protected]>
AuthorDate: Tue Jan 30 00:39:06 2024 +0800

    get cluster detail of flink session on k8s with ingress url bug fix (#3523)
---
 .../core/controller/FlinkClusterController.java       |  2 +-
 .../console/core/service/FlinkClusterService.java     |  2 ++
 .../core/service/impl/FlinkClusterServiceImpl.java    | 19 +++++++++++++++++++
 3 files changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 2b01e2a54..0146542f1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -48,7 +48,7 @@ public class FlinkClusterController {
   @Operation(summary = "List flink clusters")
   @PostMapping("list")
   public RestResponse list() {
-    List<FlinkCluster> flinkClusters = flinkClusterService.list();
+    List<FlinkCluster> flinkClusters = flinkClusterService.listCluster();
     return RestResponse.success(flinkClusters);
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index e9d0397ef..d82387d04 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -47,4 +47,6 @@ public interface FlinkClusterService extends 
IService<FlinkCluster> {
   Boolean existsByFlinkEnvId(Long id);
 
   List<FlinkCluster> getByExecutionModes(Collection<ExecutionMode> 
executionModes);
+
+  List<FlinkCluster> listCluster();
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index a0b56075a..88b648728 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -38,6 +38,9 @@ import org.apache.streampark.flink.client.bean.DeployRequest;
 import org.apache.streampark.flink.client.bean.DeployResponse;
 import org.apache.streampark.flink.client.bean.KubernetesDeployRequest;
 import org.apache.streampark.flink.client.bean.ShutDownResponse;
+import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
+import org.apache.streampark.flink.kubernetes.model.ClusterKey;
+import org.apache.streampark.flink.kubernetes.model.TrackId;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -356,6 +359,22 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
                         .collect(Collectors.toSet())));
   }
 
+  @Override
+  public List<FlinkCluster> listCluster() {
+    List<FlinkCluster> clusters = list();
+    for (FlinkCluster cluster : clusters) {
+      if 
(ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(cluster.getExecutionModeEnum()))
 {
+        cluster.setAddress(
+            KubernetesRetriever.retrieveFlinkRestUrl(
+                    ClusterKey.of(
+                        TrackId.onSession(
+                            cluster.getK8sNamespace(), cluster.getClusterId(), 
0L, null, null)))
+                .getOrElse(cluster::getAddress));
+      }
+    }
+    return clusters;
+  }
+
   @Override
   public void delete(Long id) {
     FlinkCluster flinkCluster = getById(id);

Reply via email to