This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 61298dd02 [Improve] flink webUI proxy support. (#3932)
61298dd02 is described below
commit 61298dd02d37b9589ad2114f1fa54965919f9ac3
Author: benjobs <[email protected]>
AuthorDate: Thu Aug 1 18:12:31 2024 +0800
[Improve] flink webUI proxy support. (#3932)
* [Improve] proxy minor improve
* [Improve] flink webUI proxy support.
* [Improve] minor improve
---------
Co-authored-by: benjobs <[email protected]>
---
.../console/core/controller/ProxyController.java | 26 ++-
.../console/core/entity/Application.java | 4 -
.../console/core/service/ApplicationService.java | 5 -
.../console/core/service/ProxyService.java | 32 ++++
.../console/core/service/ServiceHelper.java | 6 +-
.../core/service/impl/ApplicationServiceImpl.java | 78 ---------
.../core/service/impl/ProxyServiceImpl.java | 190 +++++++++++++++++++++
.../console/system/authentication/ShiroConfig.java | 2 +-
.../src/views/flink/app/Detail.vue | 2 +-
.../src/views/flink/app/View.vue | 5 +-
.../flink/app/components/AppDetail/DetailTab.vue | 11 +-
.../src/views/flink/app/utils/index.ts | 26 +--
12 files changed, 258 insertions(+), 129 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
index 0d808e754..d909ecd6a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
@@ -17,15 +17,15 @@
package org.apache.streampark.console.core.controller;
-import org.apache.streampark.console.core.service.ApplicationService;
+import org.apache.streampark.console.core.service.ProxyService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
@@ -38,13 +38,23 @@ import java.io.IOException;
@RequestMapping("proxy")
public class ProxyController {
- @Autowired private ApplicationService applicationService;
+ @Autowired private ProxyService proxyService;
- @RequestMapping(
- value = "flink-ui/{id}/**",
- method = {RequestMethod.GET, RequestMethod.POST})
- public ResponseEntity<?> proxyRequest(HttpServletRequest request,
@PathVariable("id") Long id)
+ @GetMapping("flink-ui/{id}/**")
+ public ResponseEntity<?> proxyFlinkUI(HttpServletRequest request,
@PathVariable("id") Long id)
throws IOException {
- return applicationService.proxyFlinkUI(request, id);
+ return proxyService.proxyFlinkUI(request, id);
+ }
+
+ @GetMapping("job_manager/{id}/**")
+ public ResponseEntity<?> proxyJobManager(
+ HttpServletRequest request, @PathVariable("id") Long logId) throws
IOException {
+ return proxyService.proxyJobManager(request, logId);
+ }
+
+ @GetMapping("yarn/{appId}/**")
+ public ResponseEntity<?> proxyURL(HttpServletRequest request,
@PathVariable("appId") String appId)
+ throws IOException {
+ return proxyService.proxyYarn(request, appId);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 36e216a36..f43058d9e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -599,10 +599,6 @@ public class Application implements Serializable {
return ExecutionMode.YARN_PER_JOB.equals(mode) ||
ExecutionMode.YARN_APPLICATION.equals(mode);
}
- public String getFlinkRestUrl() {
- return "/proxy/flink-ui/" + id + "/";
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index cba0d9652..952728973 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -28,11 +28,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
-import org.springframework.http.ResponseEntity;
import org.springframework.web.multipart.MultipartFile;
-import javax.servlet.http.HttpServletRequest;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
@@ -134,6 +131,4 @@ public interface ApplicationService extends
IService<Application> {
List<ApplicationReport> getYARNApplication(String appName);
RestResponse buildApplication(Long appId, boolean forceBuild) throws
Exception;
-
- ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long appId)
throws IOException;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
new file mode 100644
index 000000000..57d66ede6
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.springframework.http.ResponseEntity;
+
+import javax.servlet.http.HttpServletRequest;
+
+import java.io.IOException;
+
+public interface ProxyService {
+ ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long id) throws
IOException;
+
+ ResponseEntity<?> proxyYarn(HttpServletRequest request, String url) throws
IOException;
+
+ ResponseEntity<?> proxyJobManager(HttpServletRequest request, Long logId)
throws IOException;
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ServiceHelper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ServiceHelper.java
index 61bae924f..53e20d687 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ServiceHelper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ServiceHelper.java
@@ -53,7 +53,7 @@ public class ServiceHelper {
private String sqlClientJar = null;
public User getLoginUser() {
- String token = (String) SecurityUtils.getSubject().getPrincipal();
+ String token = getAuthorization();
Long userId = JWTUtil.getUserId(token);
if (userId == null) {
throw new AuthenticationException("Unauthorized");
@@ -116,4 +116,8 @@ public class ServiceHelper {
IngressController.configureIngress(domainName, clusterId, namespace);
}
}
+
+ public String getAuthorization() {
+ return (String) SecurityUtils.getSubject().getPrincipal();
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index b2331051e..177cb85a0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -83,8 +83,6 @@ import
org.apache.streampark.console.core.service.YarnQueueService;
import org.apache.streampark.console.core.task.CheckpointProcessor;
import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper;
-import org.apache.streampark.console.system.entity.Member;
-import org.apache.streampark.console.system.entity.User;
import org.apache.streampark.console.system.service.MemberService;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
@@ -102,7 +100,6 @@ import
org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CoreOptions;
@@ -127,29 +124,17 @@ import
io.fabric8.kubernetes.client.KubernetesClientException;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.web.client.RestTemplateBuilder;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.http.client.ClientHttpResponse;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
-import org.springframework.web.client.DefaultResponseErrorHandler;
-import org.springframework.web.client.RestTemplate;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.servlet.http.HttpServletRequest;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.io.Serializable;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@@ -160,7 +145,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumSet;
-import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -237,8 +221,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Autowired private MemberService memberService;
- private final RestTemplate proxyRestTemplate;
-
private static final int CPU_NUM = Math.max(2,
Runtime.getRuntime().availableProcessors() * 4);
private final ExecutorService bootstrapExecutor =
@@ -250,20 +232,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
new LinkedBlockingQueue<>(),
ThreadUtils.threadFactory("streampark-flink-app-bootstrap"));
- @Autowired
- public ApplicationServiceImpl(RestTemplateBuilder restTemplateBuilder) {
- this.proxyRestTemplate =
- restTemplateBuilder
- .errorHandler(
- new DefaultResponseErrorHandler() {
- @Override
- public void handleError(@Nonnull ClientHttpResponse
response) {
- // Ignore errors in the Flink Web UI itself, such as 404
errors.
- }
- })
- .build();
- }
-
@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
@@ -2071,50 +2039,4 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
return Tuple2.apply(k8sNamespace, clusterId);
}
-
- public ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long
appId) throws IOException {
- ApiAlertException.throwIfTrue(appId == null, "Invalid operation, appId is
null");
-
- User currentUser = serviceHelper.getLoginUser();
- Application app = getById(appId);
- ApiAlertException.throwIfTrue(app == null, "Invalid operation, application
is null");
- if (!currentUser.getUserId().equals(app.getUserId())) {
- Member member = memberService.findByUserName(app.getTeamId(),
currentUser.getUsername());
- ApiAlertException.throwIfTrue(
- member == null,
- "Permission denied, this job not created by the current user, And
the job cannot be found in the current user's team.");
- }
-
- String originalUrl =
- request.getRequestURI()
- + (request.getQueryString() != null ? "?" +
request.getQueryString() : "");
-
- String jobManagerUrl = app.getJobManagerUrl();
- if (jobManagerUrl == null) {
- ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
- builder.body("The flink job manager url is not ready");
- return builder.build();
- }
-
- String newUrl = jobManagerUrl + originalUrl.replace("/proxy/flink-ui/" +
appId, "");
-
- HttpHeaders headers = new HttpHeaders();
- Enumeration<String> headerNames = request.getHeaderNames();
- while (headerNames.hasMoreElements()) {
- String headerName = headerNames.nextElement();
- headers.set(headerName, request.getHeader(headerName));
- }
-
- byte[] body = null;
- if (request.getInputStream().available() > 0) {
- InputStream inputStream = request.getInputStream();
- ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
- IOUtils.copy(inputStream, byteArrayOutputStream);
- body = byteArrayOutputStream.toByteArray();
- }
-
- HttpEntity<?> requestEntity = new HttpEntity<>(body, headers);
- return proxyRestTemplate.exchange(
- newUrl, HttpMethod.valueOf(request.getMethod()), requestEntity,
byte[].class);
- }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
new file mode 100644
index 000000000..d89ff055e
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.ApplicationLog;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.service.ApplicationLogService;
+import org.apache.streampark.console.core.service.ApplicationService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.ProxyService;
+import org.apache.streampark.console.core.service.ServiceHelper;
+import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper;
+import org.apache.streampark.console.system.entity.Member;
+import org.apache.streampark.console.system.entity.User;
+import org.apache.streampark.console.system.service.MemberService;
+import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
+
+import org.apache.commons.io.IOUtils;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.client.ClientHttpResponse;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.DefaultResponseErrorHandler;
+import org.springframework.web.client.RestTemplate;
+
+import javax.annotation.Nonnull;
+import javax.servlet.http.HttpServletRequest;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+
+@Service
+public class ProxyServiceImpl implements ProxyService {
+
+ @Autowired private ServiceHelper serviceHelper;
+
+ @Autowired private ApplicationService applicationService;
+
+ @Autowired private FlinkClusterService flinkClusterService;
+
+ @Autowired private MemberService memberService;
+
+ @Autowired private ApplicationLogService logService;
+
+ @Autowired private FlinkK8sWatcher flinkK8sWatcher;
+
+ @Autowired private FlinkK8sWatcherWrapper k8sWatcherWrapper;
+
+ private final RestTemplate proxyRestTemplate;
+
+ public ProxyServiceImpl(RestTemplateBuilder restTemplateBuilder) {
+ this.proxyRestTemplate =
+ restTemplateBuilder
+ .errorHandler(
+ new DefaultResponseErrorHandler() {
+ @Override
+ public void handleError(@Nonnull ClientHttpResponse
response) {
+ // Ignore errors in the Flink Web UI itself, such as 404
errors.
+ }
+ })
+ .build();
+ }
+
+ @Override
+ public ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long
appId) throws IOException {
+ ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+ if (appId == null) {
+ return builder.body("Invalid operation, appId is null");
+ }
+
+ User currentUser = serviceHelper.getLoginUser();
+ Application app = applicationService.getById(appId);
+ if (app == null) {
+ return builder.body("Invalid operation, appId is invalid.");
+ }
+ if (!currentUser.getUserId().equals(app.getUserId())) {
+ Member member = memberService.findByUserName(app.getTeamId(),
currentUser.getUsername());
+ if (member == null) {
+ return builder.body(
+ "Permission denied, this job not created by the current user, And
the job cannot be found in the current user's team.");
+ }
+ }
+
+ String url = null;
+ switch (app.getExecutionModeEnum()) {
+ case REMOTE:
+ FlinkCluster cluster =
flinkClusterService.getById(app.getFlinkClusterId());
+ url = cluster.getAddress();
+ break;
+ case YARN_PER_JOB:
+ case YARN_APPLICATION:
+ case YARN_SESSION:
+ String yarnURL = YarnUtils.getRMWebAppProxyURL();
+ url = yarnURL + "/proxy/" + app.getClusterId();
+ break;
+ case KUBERNETES_NATIVE_APPLICATION:
+ case KUBERNETES_NATIVE_SESSION:
+ String jobManagerUrl = app.getJobManagerUrl();
+ if (jobManagerUrl == null) {
+ builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+ builder.body("The flink job manager url is not ready");
+ return builder.build();
+ }
+ url =
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(app));
+ break;
+ }
+
+ if (url == null) {
+ return builder.body("The flink job manager url is not ready");
+ }
+
+ url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, "");
+ return proxyRequest(request, url);
+ }
+
+ @Override
+ public ResponseEntity<?> proxyYarn(HttpServletRequest request, String appId)
throws IOException {
+ String yarnURL = YarnUtils.getRMWebAppProxyURL();
+ String url = yarnURL + "/proxy/" + appId + "/";
+ url += getRequestURL(request).replace("/proxy/yarn/" + appId, "");
+ return proxyRequest(request, url);
+ }
+
+ @Override
+ public ResponseEntity<?> proxyJobManager(HttpServletRequest request, Long
logId)
+ throws IOException {
+ ApplicationLog log = logService.getById(logId);
+ String url = log.getJobManagerUrl();
+ url += getRequestURL(request).replace("/proxy/job_manager/" + logId, "");
+ return proxyRequest(request, url);
+ }
+
+ private ResponseEntity<?> proxyRequest(HttpServletRequest request, String
url)
+ throws IOException {
+ HttpHeaders headers = new HttpHeaders();
+ Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String headerName = headerNames.nextElement();
+ headers.set(headerName, request.getHeader(headerName));
+ }
+
+ String token = serviceHelper.getAuthorization();
+ if (token != null) {
+ headers.set("Authorization", WebUtils.encryptToken(token));
+ }
+
+ byte[] body = null;
+ if (request.getInputStream().available() > 0) {
+ InputStream inputStream = request.getInputStream();
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ IOUtils.copy(inputStream, byteArrayOutputStream);
+ body = byteArrayOutputStream.toByteArray();
+ }
+
+ HttpEntity<?> requestEntity = new HttpEntity<>(body, headers);
+ return proxyRestTemplate.exchange(
+ url, HttpMethod.valueOf(request.getMethod()), requestEntity,
byte[].class);
+ }
+
+ private String getRequestURL(HttpServletRequest request) {
+ return request.getRequestURI()
+ + (request.getQueryString() != null ? "?" + request.getQueryString() :
"");
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java
index e9b0b2b58..2585b41e4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java
@@ -74,7 +74,7 @@ public class ShiroConfig {
filterChainDefinitionMap.put("/*.less", "anon");
filterChainDefinitionMap.put("/*.ico", "anon");
filterChainDefinitionMap.put("/", "anon");
- filterChainDefinitionMap.put("/proxy/flink-ui/**", "anon");
+ filterChainDefinitionMap.put("/proxy/**", "anon");
filterChainDefinitionMap.put("/**", "jwt");
shiroFilterFactoryBean.setFilterChainDefinitionMap(filterChainDefinitionMap);
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
index 421ad026a..3bb1377f5 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
@@ -115,7 +115,7 @@
/* Flink Web UI */
function handleFlinkView() {
- handleView(app as any, unref(yarn));
+ handleView(app as any);
}
const { pause } = useIntervalFn(
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
index 399b56425..a5ce1e531 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
@@ -67,7 +67,6 @@
const appDashboardRef = ref<any>();
const noData = ref<boolean>();
- const yarn = ref<Nullable<string>>(null);
const currentTablePage = ref(1);
const { onTableColumnResize, tableColumnWidth, getAppColumns } =
useAppTableColumns();
const { openSavepoint } = useSavepoint(handleOptionApp);
@@ -201,8 +200,7 @@
[AppStateEnum.RESTARTING, AppStateEnum.RUNNING].includes(app.state) ||
app['optionState'] === OptionStateEnum.SAVEPOINTING
) {
- // yarn-per-job|yarn-session|yarn-application
- await handleView(app, unref(yarn));
+ await handleView(app);
}
}
@@ -257,7 +255,6 @@
<template v-if="column.dataIndex === 'jobName'">
<span class="app_type app_jar" v-if="record['jobType'] ===
JobTypeEnum.JAR"> JAR </span>
<span class="app_type app_sql" v-if="record['jobType'] ===
JobTypeEnum.SQL"> SQL </span>
-
<span
class="link"
:class="{
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
index af5d5432b..a2a3b29dd 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
@@ -46,7 +46,6 @@
fetchOptionLog,
fetchRemoveBackup,
fetchDeleteOperationLog,
- fetchYarn,
} from '/@/api/flink/app/app';
import { decodeByBase64 } from '/@/utils/cipher';
import { useModal } from '/@/components/Modal';
@@ -65,6 +64,7 @@
import FlinkSqlReview from './FlinkSqlReview.vue';
import FlinkSqlCompareModal from './FlinkSqlCompareModal.vue';
import { OperationEnum } from '/@/enums/flinkEnum';
+ import { baseUrl } from '/@/api';
const DescriptionItem = Descriptions.Item;
const TabPane = Tabs.TabPane;
@@ -273,8 +273,11 @@
}
async function handleYarnUrl(yarnAppId: string) {
- const res = await fetchYarn();
- window.open(res + '/proxy/' + yarnAppId + '/');
+ window.open(baseUrl() + '/proxy/yarn/' + yarnAppId + '/');
+ }
+
+ async function handleViewJobManager(record: Recordable) {
+ window.open(baseUrl() + '/proxy/job_manager/' + record.id + '/');
}
function getSavePointAction(record: Recordable): ActionItem[] {
@@ -396,7 +399,7 @@
</a>
</template>
<template v-if="column.dataIndex === 'jobManagerUrl'">
- <a type="link" :href="record.jobManagerUrl" target="_blank">
+ <a type="link" target="_blank"
@click="handleViewJobManager(record)">
{{ record.jobManagerUrl }}
</a>
</template>
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 626113cd6..5ea810873 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -15,9 +15,7 @@
* limitations under the License.
*/
import { optionsKeyMapping } from '../data/option';
-import { fetchYarn } from '/@/api/flink/app/app';
import { AppListRecord } from '/@/api/flink/app/app.type';
-import { fetchRemoteURL } from '/@/api/flink/setting/flinkCluster';
import {
AppStateEnum,
ConfigTypeEnum,
@@ -26,6 +24,7 @@ import {
OptionStateEnum,
PipelineStepEnum,
} from '/@/enums/flinkEnum';
+import { baseUrl } from '/@/api';
export function handleAppBuildStatusColor(statusCode: number) {
switch (statusCode) {
@@ -103,27 +102,8 @@ export function descriptionFilter(option) {
}
}
-export async function handleView(app: AppListRecord, yarn: Nullable<string>) {
- const executionMode = app['executionMode'];
- if (executionMode == ExecModeEnum.STANDALONE) {
- const res = await fetchRemoteURL(app.flinkClusterId);
- window.open(res + '/#/job/' + app.jobId + '/overview');
- } else if (
- [ExecModeEnum.YARN_PER_JOB, ExecModeEnum.YARN_SESSION,
ExecModeEnum.YARN_APPLICATION].includes(
- executionMode,
- )
- ) {
- if (!yarn) {
- const res = await fetchYarn();
- window.open(res + '/proxy/' + app['clusterId'] + '/');
- } else {
- window.open(yarn + '/proxy/' + app['clusterId'] + '/');
- }
- } else {
- if (app.flinkRestUrl) {
- window.open(app.flinkRestUrl);
- }
- }
+export async function handleView(app: AppListRecord) {
+ window.open(baseUrl() + '/proxy/flink-ui/' + app.id + '/');
}
export function handleIsStart(app: Recordable, optionApps: Recordable) {