This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new fba37b752 [Improve] proxy bug fixed. (#3982)
fba37b752 is described below
commit fba37b7528182bc0d36e2ae2514bde5ef88edfde
Author: benjobs <[email protected]>
AuthorDate: Wed Aug 21 21:34:43 2024 +0800
[Improve] proxy bug fixed. (#3982)
* [Improve] proxy bug fixed.
* [Improve] proxy check permission improvement
---
.../console/core/controller/ProxyController.java | 18 ++--
.../console/core/service/ProxyService.java | 6 +-
.../core/service/impl/ProxyServiceImpl.java | 99 +++++++++++++---------
.../flink/app/components/AppDetail/DetailTab.vue | 12 +--
.../src/views/flink/app/utils/index.ts | 2 +-
5 files changed, 80 insertions(+), 57 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
index 49c6a9f32..d8e396dd7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
@@ -38,21 +38,21 @@ public class ProxyController {
@Autowired private ProxyService proxyService;
- @GetMapping("flink-ui/{id}/**")
- public ResponseEntity<?> proxyFlinkUI(HttpServletRequest request,
@PathVariable("id") Long id)
+ @GetMapping("flink/{id}/**")
+ public ResponseEntity<?> proxyFlink(HttpServletRequest request,
@PathVariable("id") Long id)
throws Exception {
- return proxyService.proxyFlinkUI(request, id);
+ return proxyService.proxyFlink(request, id);
}
- @GetMapping("job_manager/{id}/**")
- public ResponseEntity<?> proxyJobManager(
- HttpServletRequest request, @PathVariable("id") Long logId) throws
Exception {
- return proxyService.proxyJobManager(request, logId);
+ @GetMapping("history/{id}/**")
+ public ResponseEntity<?> proxyHistory(HttpServletRequest request,
@PathVariable("id") Long logId)
+ throws Exception {
+ return proxyService.proxyHistory(request, logId);
}
@GetMapping("yarn/{appId}/**")
- public ResponseEntity<?> proxyURL(HttpServletRequest request,
@PathVariable("appId") String appId)
+ public ResponseEntity<?> proxyYarn(HttpServletRequest request,
@PathVariable("appId") Long logId)
throws Exception {
- return proxyService.proxyYarn(request, appId);
+ return proxyService.proxyYarn(request, logId);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
index a008c2b3e..aba092371 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
@@ -22,9 +22,9 @@ import org.springframework.http.ResponseEntity;
import javax.servlet.http.HttpServletRequest;
public interface ProxyService {
- ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long id) throws
Exception;
+ ResponseEntity<?> proxyFlink(HttpServletRequest request, Long id) throws
Exception;
- ResponseEntity<?> proxyYarn(HttpServletRequest request, String url) throws
Exception;
+ ResponseEntity<?> proxyYarn(HttpServletRequest request, Long logId) throws
Exception;
- ResponseEntity<?> proxyJobManager(HttpServletRequest request, Long logId)
throws Exception;
+ ResponseEntity<?> proxyHistory(HttpServletRequest request, Long logId)
throws Exception;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
index 98aecd7be..e88acc5a2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.exception.PermissionDeniedException;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -28,12 +29,13 @@ import
org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.ProxyService;
import org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper;
+import org.apache.streampark.console.system.authentication.JWTUtil;
import org.apache.streampark.console.system.entity.Member;
-import org.apache.streampark.console.system.entity.User;
import org.apache.streampark.console.system.service.MemberService;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
@@ -98,75 +100,97 @@ public class ProxyServiceImpl implements ProxyService {
}
@Override
- public ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long
appId) throws Exception {
- ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
- if (appId == null) {
- return builder.body("Invalid operation, appId is null");
- }
+ public ResponseEntity<?> proxyFlink(HttpServletRequest request, Long appId)
throws Exception {
+ ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
- User currentUser = serviceHelper.getLoginUser();
Application app = applicationService.getById(appId);
- if (app == null) {
- return builder.body("Invalid operation, appId is invalid.");
- }
- if (!currentUser.getUserId().equals(app.getUserId())) {
- Member member = memberService.findByUserId(app.getTeamId(),
currentUser.getUserId());
- if (member == null) {
- return builder.body(
- "Permission denied, this job not created by the current user, And
the job cannot be found in the current user's team.");
- }
- }
-
+ checkProxyApp(app);
String url = null;
switch (app.getExecutionModeEnum()) {
- case REMOTE:
- FlinkCluster cluster =
flinkClusterService.getById(app.getFlinkClusterId());
- url = cluster.getAddress();
- break;
case YARN_PER_JOB:
case YARN_APPLICATION:
case YARN_SESSION:
String yarnURL = YarnUtils.getRMWebAppProxyURL();
url = yarnURL + "/proxy/" + app.getClusterId();
- url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, "");
+ url += getRequestURL(request).replace("/proxy/flink/" + appId, "");
return proxyYarnRequest(request, url);
+ case REMOTE:
+ FlinkCluster cluster =
flinkClusterService.getById(app.getFlinkClusterId());
+ url = cluster.getAddress();
+ break;
case KUBERNETES_NATIVE_APPLICATION:
case KUBERNETES_NATIVE_SESSION:
- String jobManagerUrl = app.getJobManagerUrl();
- if (jobManagerUrl == null) {
- builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
- builder.body("The flink job manager url is not ready");
- return builder.build();
- }
url =
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(app));
break;
}
if (url == null) {
- return builder.body("The flink job manager url is not ready");
+ ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+ builder.body("The flink job manager url is not ready");
+ return builder.build();
}
- url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, "");
+ url += getRequestURL(request).replace("/proxy/flink/" + appId, "");
return proxyRequest(request, url);
}
@Override
- public ResponseEntity<?> proxyYarn(HttpServletRequest request, String appId)
throws Exception {
+ public ResponseEntity<?> proxyYarn(HttpServletRequest request, Long logId)
throws Exception {
+ ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+ ApplicationLog log = logService.getById(logId);
+ if (log == null) {
+ return builder.body("The application log not found.");
+ }
+ checkProxyAppLog(log);
+ String yarnId = log.getYarnAppId();
String yarnURL = YarnUtils.getRMWebAppProxyURL();
- String url = yarnURL + "/proxy/" + appId + "/";
- url += getRequestURL(request).replace("/proxy/yarn/" + appId, "");
+ String url = yarnURL + "/proxy/" + yarnId + "/";
+ url += getRequestURL(request).replace("/proxy/yarn/" + yarnId, "");
return proxyYarnRequest(request, url);
}
@Override
- public ResponseEntity<?> proxyJobManager(HttpServletRequest request, Long
logId)
- throws Exception {
+ public ResponseEntity<?> proxyHistory(HttpServletRequest request, Long
logId) throws Exception {
+ ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+
ApplicationLog log = logService.getById(logId);
+ if (log == null) {
+ return builder.body("The application log not found.");
+ }
+ checkProxyAppLog(log);
String url = log.getJobManagerUrl();
- url += getRequestURL(request).replace("/proxy/job_manager/" + logId, "");
+ if (StringUtils.isBlank(url)) {
+ return builder.body("The jobManager url is null.");
+ }
+ url += getRequestURL(request).replace("/proxy/history/" + logId, "");
return proxyRequest(request, url);
}
+ public void checkProxyApp(Application app) {
+ if (app == null) {
+ throw new PermissionDeniedException("Invalid operation, application is
invalid.");
+ }
+ String token = serviceHelper.getAuthorization();
+ if (token != null) {
+ Long userId = JWTUtil.getUserId(token);
+ if (userId != null && !userId.equals(app.getUserId())) {
+ Member member = memberService.findByUserId(app.getTeamId(), userId);
+ if (member == null) {
+ throw new PermissionDeniedException(
+ "Permission denied, this job not created by the current user,
And the job cannot be found in the current user's team.");
+ }
+ }
+ }
+ }
+
+ public void checkProxyAppLog(ApplicationLog log) {
+ if (log == null) {
+ throw new PermissionDeniedException("Invalid operation, The application
log not found.");
+ }
+ Application app = applicationService.getById(log.getAppId());
+ checkProxyApp(app);
+ }
+
private HttpEntity<?> getRequestEntity(HttpServletRequest request, String
url) throws Exception {
HttpHeaders headers = new HttpHeaders();
Enumeration<String> headerNames = request.getHeaderNames();
@@ -178,7 +202,6 @@ public class ProxyServiceImpl implements ProxyService {
// Ensure the Host header is set correctly.
URI uri = new URI(url);
headers.set("Host", uri.getHost());
-
byte[] body = null;
if (request.getInputStream().available() > 0) {
InputStream inputStream = request.getInputStream();
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
index a2a3b29dd..8f7aca349 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
@@ -272,12 +272,12 @@
});
}
- async function handleYarnUrl(yarnAppId: string) {
- window.open(baseUrl() + '/proxy/yarn/' + yarnAppId + '/');
+ async function handleYarnUrl(id: string) {
+ window.open(baseUrl() + '/proxy/yarn/' + id + '/');
}
- async function handleViewJobManager(record: Recordable) {
- window.open(baseUrl() + '/proxy/job_manager/' + record.id + '/');
+ async function handleViewHistory(record: Recordable) {
+ window.open(baseUrl() + '/proxy/history/' + record.id + '/');
}
function getSavePointAction(record: Recordable): ActionItem[] {
@@ -394,12 +394,12 @@
<Tag color="orange" v-if="record.optionName ===
OperationEnum.CANCEL"> Cancel </Tag>
</template>
<template v-if="column.dataIndex === 'yarnAppId'">
- <a type="link" @click="handleYarnUrl(record.yarnAppId)"
target="_blank">
+ <a type="link" @click="handleYarnUrl(record.id)" target="_blank">
{{ record.yarnAppId }}
</a>
</template>
<template v-if="column.dataIndex === 'jobManagerUrl'">
- <a type="link" target="_blank"
@click="handleViewJobManager(record)">
+ <a type="link" target="_blank"
@click="handleViewHistory(record)">
{{ record.jobManagerUrl }}
</a>
</template>
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 5ea810873..390b5eec4 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -103,7 +103,7 @@ export function descriptionFilter(option) {
}
export async function handleView(app: AppListRecord) {
- window.open(baseUrl() + '/proxy/flink-ui/' + app.id + '/');
+ window.open(baseUrl() + '/proxy/flink/' + app.id + '/');
}
export function handleIsStart(app: Recordable, optionApps: Recordable) {