This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 8432ce3ed [Improve] dev: flink webUI proxy support. (#4054)
8432ce3ed is described below
commit 8432ce3ed42bd19edaa3079abc74a8e07d5064fb
Author: TiDra <[email protected]>
AuthorDate: Sun Sep 22 09:58:47 2024 +0800
[Improve] dev: flink webUI proxy support. (#4054)
* [Improve] flink webUI proxy support.
* [Improve] flink webUI proxy support.
* The issue of the permission issue related to the app details page and the
permission issue of the proxy page cannot be displayed are fixed
* Added a non-null check for the corresponding yarn application ID to
ProxyServiceImpl
---
.../core/controller/ApplicationController.java | 8 +-
.../console/core/controller/ProxyController.java | 122 ++++++++++
.../console/core/entity/ApplicationBackUp.java | 2 +
.../console/core/entity/ApplicationLog.java | 2 +
.../streampark/console/core/entity/FlinkSql.java | 2 +
.../streampark/console/core/entity/Savepoint.java | 2 +
.../core/service/ApplicationLogService.java | 2 +
.../Savepoint.java => service/ProxyService.java} | 37 +--
.../service/impl/ApplicationLogServiceImpl.java | 5 +
.../core/service/impl/ProxyServiceImpl.java | 262 +++++++++++++++++++++
.../flink/app/components/AppDetail/DetailTab.vue | 14 +-
.../src/views/flink/app/utils/index.ts | 26 +-
.../src/views/flink/cluster/View.vue | 4 +-
13 files changed, 427 insertions(+), 61 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index e3b6ac08e..0073efd31 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -214,17 +214,17 @@ public class ApplicationController {
}
@PostMapping("opt_log")
- @Permission(app = "#log.appId", team = "#log.teamId")
+ @Permission(app = "#applicationLog.appId", team = "#applicationLog.teamId")
public RestResponse log(ApplicationLog applicationLog, RestRequest
request) {
IPage<ApplicationLog> applicationList =
applicationLogService.getPage(applicationLog, request);
return RestResponse.success(applicationList);
}
- @Permission(app = "#log.appId", team = "#log.teamId")
+ @Permission(app = "#applicationLog.appId", team = "#applicationLog.teamId")
@PostMapping("delete/opt_log")
@RequiresPermissions("app:delete")
- public RestResponse deleteLog(Long id) {
- Boolean deleted = applicationLogService.removeById(id);
+ public RestResponse deleteLog(ApplicationLog applicationLog) {
+ Boolean deleted = applicationLogService.delete(applicationLog);
return RestResponse.success(deleted);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
new file mode 100644
index 000000000..1ae022594
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.controller;
+
+import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.ApplicationLog;
+import org.apache.streampark.console.core.enums.UserTypeEnum;
+import org.apache.streampark.console.core.service.ApplicationLogService;
+import org.apache.streampark.console.core.service.ProxyService;
+import
org.apache.streampark.console.core.service.application.ApplicationManageService;
+import org.apache.streampark.console.core.util.ServiceHelper;
+import org.apache.streampark.console.system.entity.Member;
+import org.apache.streampark.console.system.entity.User;
+import org.apache.streampark.console.system.service.MemberService;
+import org.apache.streampark.console.system.service.UserService;
+
+import org.apache.shiro.authz.annotation.RequiresPermissions;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.servlet.http.HttpServletRequest;
+
+@Slf4j
+@Validated
+@RestController
+@RequestMapping("proxy")
+public class ProxyController {
+
+ @Autowired
+ private ProxyService proxyService;
+
+ @Autowired
+ private ApplicationManageService applicationManageService;
+
+ @Autowired
+ private ApplicationLogService logService;
+
+ @Autowired
+ private MemberService memberService;
+
+ @Autowired
+ private UserService userService;
+
+ @GetMapping("{type}/{id}/assets/**")
+ public ResponseEntity<?> proxyFlinkAssets(HttpServletRequest request,
@PathVariable("type") String type,
+ @PathVariable("id") Long id)
throws Exception {
+ return proxy(type, request, id);
+ }
+
+ @GetMapping("{type}/{id}/**")
+ @RequiresPermissions("app:view")
+ public ResponseEntity<?> proxyFlink(HttpServletRequest request,
@PathVariable("type") String type,
+ @PathVariable("id") Long id) throws
Exception {
+ return proxy(type, request, id);
+ }
+
+ private ResponseEntity<?> proxy(String type, HttpServletRequest request,
Long id) throws Exception {
+ ApplicationLog log;
+ Application app;
+
+ switch (type) {
+ case "flink":
+ app = applicationManageService.getApp(id);
+ checkProxyApp(app);
+ return proxyService.proxyFlink(request, app);
+ case "cluster":
+ return proxyService.proxyCluster(request, id);
+ case "history":
+ log = logService.getById(id);
+ checkProxyAppLog(log);
+ return proxyService.proxyHistory(request, log);
+ case "yarn":
+ log = logService.getById(id);
+ checkProxyAppLog(log);
+ return proxyService.proxyYarn(request, log);
+ default:
+ return ResponseEntity.notFound().build();
+ }
+ }
+
+ private void checkProxyApp(Application app) {
+ ApiAlertException.throwIfNull(app, "Invalid operation, application is
invalid.");
+
+ User user = ServiceHelper.getLoginUser();
+ ApiAlertException.throwIfNull(user, "Permission denied, please login
first.");
+
+ if (user.getUserType() != UserTypeEnum.ADMIN) {
+ Member member = memberService.getByTeamIdUserName(app.getTeamId(),
user.getUsername());
+ ApiAlertException.throwIfNull(member,
+ "Permission denied, this job not created by the current user,
And the job cannot be found in the current user's team.");
+ }
+ }
+
+ private void checkProxyAppLog(ApplicationLog log) {
+ ApiAlertException.throwIfNull(log, "Invalid operation, The application
log not found.");
+ Application app = applicationManageService.getById(log.getAppId());
+ checkProxyApp(app);
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java
index 737702363..6d6a45b75 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java
@@ -47,6 +47,8 @@ public class ApplicationBackUp {
private transient boolean backup;
+ private transient String teamId;
+
public ApplicationBackUp() {
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
index c147c0b42..a6e1ed92a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
@@ -48,4 +48,6 @@ public class ApplicationLog {
private String exception;
/** The user who operates the application */
private Long userId;
+
+ private transient String teamId;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
index 10fdfa101..74dadb326 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
@@ -61,6 +61,8 @@ public class FlinkSql {
/** dependency diff */
private transient boolean dependencyDifference = false;
+ private transient Long teamId;
+
public FlinkSql() {
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
index 4b78af0e6..1f12b8734 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
@@ -50,4 +50,6 @@ public class Savepoint {
private Date triggerTime;
private Date createTime;
+
+ private transient Long teamId;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
index 6e9f95112..cb8fb1eac 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
@@ -41,4 +41,6 @@ public interface ApplicationLogService extends
IService<ApplicationLog> {
* @param appId The id of the application to be removed
*/
void removeByAppId(Long appId);
+
+ Boolean delete(ApplicationLog applicationLog);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
similarity index 53%
copy from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
index 4b78af0e6..d391017dc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
@@ -15,39 +15,22 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.entity;
+package org.apache.streampark.console.core.service;
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.ApplicationLog;
-import java.util.Date;
+import org.springframework.http.ResponseEntity;
-@Data
-@TableName("t_flink_savepoint")
-@Slf4j
-public class Savepoint {
+import javax.servlet.http.HttpServletRequest;
- @TableId(type = IdType.AUTO)
- private Long id;
+public interface ProxyService {
- private Long appId;
+ ResponseEntity<?> proxyFlink(HttpServletRequest request, Application app)
throws Exception;
- private Long chkId;
+ ResponseEntity<?> proxyYarn(HttpServletRequest request, ApplicationLog
log) throws Exception;
- private Boolean latest;
+ ResponseEntity<?> proxyHistory(HttpServletRequest request, ApplicationLog
log) throws Exception;
- /**
- * 1) checkPoint <br>
- * 2) savepoint
- */
- private Integer type;
-
- private String path;
-
- private Date triggerTime;
-
- private Date createTime;
+ ResponseEntity<?> proxyCluster(HttpServletRequest request, Long clusterId)
throws Exception;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
index 8d793d7d7..3ac1563b4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
@@ -54,4 +54,9 @@ public class ApplicationLogServiceImpl extends
ServiceImpl<ApplicationLogMapper,
.eq(ApplicationLog::getAppId, appId);
this.remove(queryWrapper);
}
+
+ @Override
+ public Boolean delete(ApplicationLog applicationLog) {
+ return removeById(applicationLog.getId());
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
new file mode 100644
index 000000000..7a8b89c8d
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.common.util.HadoopUtils;
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.ApplicationLog;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.ProxyService;
+import org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper;
+import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.client.ClientHttpResponse;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.DefaultResponseErrorHandler;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+
+import javax.annotation.Nonnull;
+import javax.servlet.http.HttpServletRequest;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.Enumeration;
+
+@Slf4j
+@Service
+public class ProxyServiceImpl implements ProxyService {
+
+ @Autowired
+ private FlinkClusterService flinkClusterService;
+
+ @Autowired
+ private FlinkK8sWatcher flinkK8sWatcher;
+
+ @Autowired
+ private FlinkK8sWatcherWrapper k8sWatcherWrapper;
+
+ private final RestTemplate proxyRestTemplate;
+
+ private String httpAuthUsername = "";
+
+ public ProxyServiceImpl(RestTemplateBuilder restTemplateBuilder) {
+ this.proxyRestTemplate =
+ restTemplateBuilder
+ .errorHandler(
+ new DefaultResponseErrorHandler() {
+
+ @Override
+ public void handleError(@Nonnull ClientHttpResponse
response) {
+ // Ignore errors in the Flink Web UI itself, such
as 404 errors.
+ }
+ })
+ .build();
+ }
+
+ @Override
+ public ResponseEntity<?> proxyFlink(HttpServletRequest request,
Application app) throws Exception {
+ ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+
+ String url = null;
+ switch (app.getFlinkExecutionMode()) {
+ case YARN_PER_JOB:
+ case YARN_APPLICATION:
+ case YARN_SESSION:
+ String yarnURL = YarnUtils.getRMWebAppProxyURL();
+ url = yarnURL + "/proxy/" + app.getClusterId();
+ url += getRequestURL(request, "/proxy/flink/" + app.getId());
+ return proxyYarnRequest(request, url);
+ case REMOTE:
+ FlinkCluster cluster =
flinkClusterService.getById(app.getFlinkClusterId());
+ url = cluster.getAddress();
+ break;
+ case KUBERNETES_NATIVE_APPLICATION:
+ case KUBERNETES_NATIVE_SESSION:
+ url =
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(app));
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "unsupported executionMode
".concat(app.getFlinkExecutionMode().getName()));
+ }
+
+ if (url == null) {
+ ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+ builder.body("The flink job manager url is not ready");
+ return builder.build();
+ }
+
+ url += getRequestURL(request, "/proxy/flink/" + app.getId());
+ return proxyRequest(request, url);
+ }
+
+ @Override
+ public ResponseEntity<?> proxyYarn(HttpServletRequest request,
ApplicationLog log) throws Exception {
+ ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+ String yarnId = log.getYarnAppId();
+ if (StringUtils.isBlank(yarnId)) {
+ return builder.body("The yarn application id is null.");
+ }
+ String yarnURL = YarnUtils.getRMWebAppProxyURL();
+ String url = yarnURL + "/proxy/" + yarnId + "/";
+ url += getRequestURL(request, "/proxy/yarn/" + log.getId());
+ return proxyYarnRequest(request, url);
+ }
+
+ @Override
+ public ResponseEntity<?> proxyHistory(HttpServletRequest request,
ApplicationLog log) throws Exception {
+ ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+
+ String url = log.getJobManagerUrl();
+ if (StringUtils.isBlank(url)) {
+ return builder.body("The jobManager url is null.");
+ }
+ url += getRequestURL(request, "/proxy/history/" + log.getId());
+ return proxyRequest(request, url);
+ }
+
+ @Override
+ public ResponseEntity<?> proxyCluster(HttpServletRequest request, Long
clusterId) throws Exception {
+ ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+ FlinkCluster cluster = flinkClusterService.getById(clusterId);
+ if (cluster == null) {
+ return builder.body("The cluster not found.");
+ }
+ String url = cluster.getAddress();
+ if (StringUtils.isBlank(url)) {
+ return builder.body("The cluster address is invalid.");
+ }
+
+ url += getRequestURL(request, "/proxy/cluster/" + clusterId);
+ switch (cluster.getFlinkExecutionModeEnum()) {
+ case YARN_PER_JOB:
+ case YARN_APPLICATION:
+ case YARN_SESSION:
+ return proxyYarnRequest(request, url);
+ case REMOTE:
+ case KUBERNETES_NATIVE_APPLICATION:
+ case KUBERNETES_NATIVE_SESSION:
+ return proxyRequest(request, url);
+ default:
+ throw new UnsupportedOperationException(
+ "unsupported executionMode
".concat(cluster.getFlinkExecutionModeEnum().getName()));
+ }
+ }
+
+ private HttpEntity<?> getRequestEntity(HttpServletRequest request, String
url) throws Exception {
+ HttpHeaders headers = new HttpHeaders();
+ Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String headerName = headerNames.nextElement();
+ headers.set(headerName, request.getHeader(headerName));
+ }
+
+ // Ensure the Host header is set correctly.
+ URI uri = new URI(url);
+ headers.set("Host", uri.getHost());
+ byte[] body = null;
+ if (request.getInputStream().available() > 0) {
+ InputStream inputStream = request.getInputStream();
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ IOUtils.copy(inputStream, byteArrayOutputStream);
+ body = byteArrayOutputStream.toByteArray();
+ }
+ return new HttpEntity<>(body, headers);
+ }
+
+ private ResponseEntity<?> proxyRequest(HttpServletRequest request, String
url) throws Exception {
+ return proxy(request, url, getRequestEntity(request, url));
+ }
+
+ private ResponseEntity<?> proxyYarnRequest(HttpServletRequest request,
String url) throws Exception {
+ if (YarnUtils.hasYarnHttpKerberosAuth()) {
+ UserGroupInformation ugi = HadoopUtils.getUgi();
+ HttpEntity<?> requestEntity = getRequestEntity(request, url);
+ setRestTemplateCredentials(ugi.getShortUserName());
+ return ugi.doAs(
+ (PrivilegedExceptionAction<ResponseEntity<?>>) () ->
proxy(request, url, requestEntity));
+ } else {
+ return proxyRequest(request, url);
+ }
+ }
+
+ private ResponseEntity<?> proxy(
+ HttpServletRequest request, String url,
HttpEntity<?> requestEntity) {
+ try {
+ return proxyRestTemplate.exchange(
+ url, HttpMethod.valueOf(request.getMethod()), requestEntity,
byte[].class);
+ } catch (RestClientException e) {
+ log.error("Proxy url: {} failed. ", url, e);
+ return new ResponseEntity<>(HttpStatus.BAD_GATEWAY);
+ }
+ }
+
+ private String getRequestURL(HttpServletRequest request, String
replaceString) {
+ String url =
+ request.getRequestURI()
+ + (request.getQueryString() != null ? "?" +
request.getQueryString() : "");
+ return url.replace(replaceString, "");
+ }
+
+ /**
+ * Configures the RestTemplate's HttpClient connector. This method is
primarily used to configure
+ * the HttpClient authentication information and SSL certificate validation
policies.
+ *
+ * @param username The username for HTTP basic authentication.
+ */
+ private void setRestTemplateCredentials(String username) {
+ // Check if the username is not null and has changed since the last
configuration
+ if (username != null && !this.httpAuthUsername.equals(username)) {
+ // Create a new credentials provider
+ BasicCredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
+ // Add the username and password for HTTP basic authentication
+ credentialsProvider.setCredentials(
+ AuthScope.ANY, new UsernamePasswordCredentials(username,
null));
+ // Customize the HttpClient with the credentials provider
+ CloseableHttpClient httpClient =
+
HttpClients.custom().setDefaultCredentialsProvider(credentialsProvider).build();
+ // Set the HttpClient request factory for the RestTemplate
+ this.proxyRestTemplate.setRequestFactory(
+ new HttpComponentsClientHttpRequestFactory(httpClient));
+ // Update the last known username
+ this.httpAuthUsername = username;
+ }
+ }
+}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
index 0d3300c6a..59a256238 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
@@ -65,6 +65,7 @@
import FlinkSqlReview from './FlinkSqlReview.vue';
import FlinkSqlCompareModal from './FlinkSqlCompareModal.vue';
import { OperationEnum } from '/@/enums/flinkEnum';
+ import { baseUrl } from '/@/api';
const DescriptionItem = Descriptions.Item;
const TabPane = Tabs.TabPane;
@@ -272,9 +273,12 @@
});
}
- async function handleYarnUrl(yarnAppId: string) {
- const res = await fetchYarn();
- window.open(res + '/proxy/' + yarnAppId + '/');
+ async function handleYarnUrl(id: string) {
+ window.open(baseUrl() + '/proxy/yarn/' + id + '/');
+ }
+
+ async function handleViewHistory(id: string) {
+ window.open(baseUrl() + '/proxy/history/' + id + '/');
}
function getSavePointAction(record: Recordable): ActionItem[] {
@@ -391,12 +395,12 @@
<Tag color="orange" v-if="record.optionName ===
OperationEnum.CANCEL"> Cancel </Tag>
</template>
<template v-if="column.dataIndex === 'yarnAppId'">
- <a type="link" @click="handleYarnUrl(record.yarnAppId)"
target="_blank">
+ <a type="link" @click="handleYarnUrl(record.id)" target="_blank">
{{ record.yarnAppId }}
</a>
</template>
<template v-if="column.dataIndex === 'jobManagerUrl'">
- <a type="link" :href="record.jobManagerUrl" target="_blank">
+ <a type="link" @click="handleViewHistory(record.id)"
target="_blank">
{{ record.jobManagerUrl }}
</a>
</template>
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 40fd45ad1..51c16b533 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -15,9 +15,7 @@
* limitations under the License.
*/
import { optionsKeyMapping } from '../data/option';
-import { fetchYarn } from '/@/api/flink/app';
import { AppListRecord } from '/@/api/flink/app.type';
-import { fetchRemoteURL } from '/@/api/flink/flinkCluster';
import {
AppStateEnum,
ConfigTypeEnum,
@@ -26,6 +24,7 @@ import {
OptionStateEnum,
PipelineStepEnum,
} from '/@/enums/flinkEnum';
+import { baseUrl } from '/@/api';
export function handleAppBuildStatusColor(statusCode: number) {
switch (statusCode) {
@@ -103,27 +102,8 @@ export function descriptionFilter(option) {
}
}
-export async function handleView(app: AppListRecord, yarn: Nullable<string>) {
- const executionMode = app['executionMode'];
- if (executionMode == ExecModeEnum.REMOTE) {
- const res = await fetchRemoteURL(app.flinkClusterId);
- window.open(res + '/#/job/' + app.jobId + '/overview');
- } else if (
- [ExecModeEnum.YARN_PER_JOB, ExecModeEnum.YARN_SESSION,
ExecModeEnum.YARN_APPLICATION].includes(
- executionMode,
- )
- ) {
- if (!yarn) {
- const res = await fetchYarn();
- window.open(res + '/proxy/' + app['clusterId'] + '/');
- } else {
- window.open(yarn + '/proxy/' + app['clusterId'] + '/');
- }
- } else {
- if (app.flinkRestUrl) {
- window.open(app.flinkRestUrl);
- }
- }
+export async function handleView(app: AppListRecord) {
+ window.open(baseUrl() + '/proxy/flink/' + app.id + '/');
}
export function handleIsStart(app: Recordable, optionApps: Recordable) {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/cluster/View.vue
b/streampark-console/streampark-console-webapp/src/views/flink/cluster/View.vue
index 7bc2bdc3d..75bb327e0 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/cluster/View.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/cluster/View.vue
@@ -181,7 +181,7 @@
<div class="list-content-item">
<span>{{ t('setting.flinkCluster.form.address') }}</span>
<p style="margin-top: 10px">
- <a :href="item.address" target="_blank">
+ <a :href="`/proxy/cluster/${item.id}/`" target="_blank">
{{ item.address }}
</a>
</p>
@@ -249,7 +249,7 @@
:disabled="!handleIsStart(item)"
v-auth="'app:detail'"
shape="circle"
- :href="item.address"
+ :href="`/proxy/cluster/${item.id}/`"
target="_blank"
size="large"
class="control-button"