This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new bc3fb2d74 get cluster detail of flink session on k8s with ingress url
bug fix (#3523)
bc3fb2d74 is described below
commit bc3fb2d74beb2f8acaf5481ff2d38338d9b74b58
Author: Dimitri <[email protected]>
AuthorDate: Tue Jan 30 00:39:06 2024 +0800
get cluster detail of flink session on k8s with ingress url bug fix (#3523)
---
.../core/controller/FlinkClusterController.java | 2 +-
.../console/core/service/FlinkClusterService.java | 2 ++
.../core/service/impl/FlinkClusterServiceImpl.java | 19 +++++++++++++++++++
3 files changed, 22 insertions(+), 1 deletion(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 2b01e2a54..0146542f1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -48,7 +48,7 @@ public class FlinkClusterController {
@Operation(summary = "List flink clusters")
@PostMapping("list")
public RestResponse list() {
- List<FlinkCluster> flinkClusters = flinkClusterService.list();
+ List<FlinkCluster> flinkClusters = flinkClusterService.listCluster();
return RestResponse.success(flinkClusters);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index e9d0397ef..d82387d04 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -47,4 +47,6 @@ public interface FlinkClusterService extends
IService<FlinkCluster> {
Boolean existsByFlinkEnvId(Long id);
List<FlinkCluster> getByExecutionModes(Collection<ExecutionMode>
executionModes);
+
+ List<FlinkCluster> listCluster();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index a0b56075a..88b648728 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -38,6 +38,9 @@ import org.apache.streampark.flink.client.bean.DeployRequest;
import org.apache.streampark.flink.client.bean.DeployResponse;
import org.apache.streampark.flink.client.bean.KubernetesDeployRequest;
import org.apache.streampark.flink.client.bean.ShutDownResponse;
+import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
+import org.apache.streampark.flink.kubernetes.model.ClusterKey;
+import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -356,6 +359,22 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
.collect(Collectors.toSet())));
}
+ @Override
+ public List<FlinkCluster> listCluster() {
+ List<FlinkCluster> clusters = list();
+ for (FlinkCluster cluster : clusters) {
+ if
(ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(cluster.getExecutionModeEnum()))
{
+ cluster.setAddress(
+ KubernetesRetriever.retrieveFlinkRestUrl(
+ ClusterKey.of(
+ TrackId.onSession(
+ cluster.getK8sNamespace(), cluster.getClusterId(),
0L, null, null)))
+ .getOrElse(cluster::getAddress));
+ }
+ }
+ return clusters;
+ }
+
@Override
public void delete(Long id) {
FlinkCluster flinkCluster = getById(id);