This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new fba37b752 [Improve] proxy bug fixed. (#3982)
fba37b752 is described below

commit fba37b7528182bc0d36e2ae2514bde5ef88edfde
Author: benjobs <[email protected]>
AuthorDate: Wed Aug 21 21:34:43 2024 +0800

    [Improve] proxy bug fixed. (#3982)
    
    * [Improve] proxy bug fixed.
    
    * [Improve] proxy check permission improvement
---
 .../console/core/controller/ProxyController.java   | 18 ++--
 .../console/core/service/ProxyService.java         |  6 +-
 .../core/service/impl/ProxyServiceImpl.java        | 99 +++++++++++++---------
 .../flink/app/components/AppDetail/DetailTab.vue   | 12 +--
 .../src/views/flink/app/utils/index.ts             |  2 +-
 5 files changed, 80 insertions(+), 57 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
index 49c6a9f32..d8e396dd7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
@@ -38,21 +38,21 @@ public class ProxyController {
 
   @Autowired private ProxyService proxyService;
 
-  @GetMapping("flink-ui/{id}/**")
-  public ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, 
@PathVariable("id") Long id)
+  @GetMapping("flink/{id}/**")
+  public ResponseEntity<?> proxyFlink(HttpServletRequest request, 
@PathVariable("id") Long id)
       throws Exception {
-    return proxyService.proxyFlinkUI(request, id);
+    return proxyService.proxyFlink(request, id);
   }
 
-  @GetMapping("job_manager/{id}/**")
-  public ResponseEntity<?> proxyJobManager(
-      HttpServletRequest request, @PathVariable("id") Long logId) throws 
Exception {
-    return proxyService.proxyJobManager(request, logId);
+  @GetMapping("history/{id}/**")
+  public ResponseEntity<?> proxyHistory(HttpServletRequest request, 
@PathVariable("id") Long logId)
+      throws Exception {
+    return proxyService.proxyHistory(request, logId);
   }
 
   @GetMapping("yarn/{appId}/**")
-  public ResponseEntity<?> proxyURL(HttpServletRequest request, 
@PathVariable("appId") String appId)
+  public ResponseEntity<?> proxyYarn(HttpServletRequest request, 
@PathVariable("appId") Long logId)
       throws Exception {
-    return proxyService.proxyYarn(request, appId);
+    return proxyService.proxyYarn(request, logId);
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
index a008c2b3e..aba092371 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProxyService.java
@@ -22,9 +22,9 @@ import org.springframework.http.ResponseEntity;
 import javax.servlet.http.HttpServletRequest;
 
 public interface ProxyService {
-  ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long id) throws 
Exception;
+  ResponseEntity<?> proxyFlink(HttpServletRequest request, Long id) throws 
Exception;
 
-  ResponseEntity<?> proxyYarn(HttpServletRequest request, String url) throws 
Exception;
+  ResponseEntity<?> proxyYarn(HttpServletRequest request, Long logId) throws 
Exception;
 
-  ResponseEntity<?> proxyJobManager(HttpServletRequest request, Long logId) 
throws Exception;
+  ResponseEntity<?> proxyHistory(HttpServletRequest request, Long logId) 
throws Exception;
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
index 98aecd7be..e88acc5a2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.exception.PermissionDeniedException;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationLog;
 import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -28,12 +29,13 @@ import 
org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.ProxyService;
 import org.apache.streampark.console.core.service.ServiceHelper;
 import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper;
+import org.apache.streampark.console.system.authentication.JWTUtil;
 import org.apache.streampark.console.system.entity.Member;
-import org.apache.streampark.console.system.entity.User;
 import org.apache.streampark.console.system.service.MemberService;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
@@ -98,75 +100,97 @@ public class ProxyServiceImpl implements ProxyService {
   }
 
   @Override
-  public ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long 
appId) throws Exception {
-    ResponseEntity.BodyBuilder builder = 
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
-    if (appId == null) {
-      return builder.body("Invalid operation, appId is null");
-    }
+  public ResponseEntity<?> proxyFlink(HttpServletRequest request, Long appId) 
throws Exception {
+    ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
 
-    User currentUser = serviceHelper.getLoginUser();
     Application app = applicationService.getById(appId);
-    if (app == null) {
-      return builder.body("Invalid operation, appId is invalid.");
-    }
-    if (!currentUser.getUserId().equals(app.getUserId())) {
-      Member member = memberService.findByUserId(app.getTeamId(), 
currentUser.getUserId());
-      if (member == null) {
-        return builder.body(
-            "Permission denied, this job not created by the current user, And 
the job cannot be found in the current user's team.");
-      }
-    }
-
+    checkProxyApp(app);
     String url = null;
     switch (app.getExecutionModeEnum()) {
-      case REMOTE:
-        FlinkCluster cluster = 
flinkClusterService.getById(app.getFlinkClusterId());
-        url = cluster.getAddress();
-        break;
       case YARN_PER_JOB:
       case YARN_APPLICATION:
       case YARN_SESSION:
         String yarnURL = YarnUtils.getRMWebAppProxyURL();
         url = yarnURL + "/proxy/" + app.getClusterId();
-        url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, "");
+        url += getRequestURL(request).replace("/proxy/flink/" + appId, "");
         return proxyYarnRequest(request, url);
+      case REMOTE:
+        FlinkCluster cluster = 
flinkClusterService.getById(app.getFlinkClusterId());
+        url = cluster.getAddress();
+        break;
       case KUBERNETES_NATIVE_APPLICATION:
       case KUBERNETES_NATIVE_SESSION:
-        String jobManagerUrl = app.getJobManagerUrl();
-        if (jobManagerUrl == null) {
-          builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
-          builder.body("The flink job manager url is not ready");
-          return builder.build();
-        }
         url = 
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(app));
         break;
     }
 
     if (url == null) {
-      return builder.body("The flink job manager url is not ready");
+      ResponseEntity.BodyBuilder builder = 
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+      builder.body("The flink job manager url is not ready");
+      return builder.build();
     }
 
-    url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, "");
+    url += getRequestURL(request).replace("/proxy/flink/" + appId, "");
     return proxyRequest(request, url);
   }
 
   @Override
-  public ResponseEntity<?> proxyYarn(HttpServletRequest request, String appId) 
throws Exception {
+  public ResponseEntity<?> proxyYarn(HttpServletRequest request, Long logId) 
throws Exception {
+    ResponseEntity.BodyBuilder builder = 
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+    ApplicationLog log = logService.getById(logId);
+    if (log == null) {
+      return builder.body("The application log not found.");
+    }
+    checkProxyAppLog(log);
+    String yarnId = log.getYarnAppId();
     String yarnURL = YarnUtils.getRMWebAppProxyURL();
-    String url = yarnURL + "/proxy/" + appId + "/";
-    url += getRequestURL(request).replace("/proxy/yarn/" + appId, "");
+    String url = yarnURL + "/proxy/" + yarnId + "/";
+    url += getRequestURL(request).replace("/proxy/yarn/" + yarnId, "");
     return proxyYarnRequest(request, url);
   }
 
   @Override
-  public ResponseEntity<?> proxyJobManager(HttpServletRequest request, Long 
logId)
-      throws Exception {
+  public ResponseEntity<?> proxyHistory(HttpServletRequest request, Long 
logId) throws Exception {
+    ResponseEntity.BodyBuilder builder = 
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
+
     ApplicationLog log = logService.getById(logId);
+    if (log == null) {
+      return builder.body("The application log not found.");
+    }
+    checkProxyAppLog(log);
     String url = log.getJobManagerUrl();
-    url += getRequestURL(request).replace("/proxy/job_manager/" + logId, "");
+    if (StringUtils.isBlank(url)) {
+      return builder.body("The jobManager url is null.");
+    }
+    url += getRequestURL(request).replace("/proxy/history/" + logId, "");
     return proxyRequest(request, url);
   }
 
+  public void checkProxyApp(Application app) {
+    if (app == null) {
+      throw new PermissionDeniedException("Invalid operation, application is 
invalid.");
+    }
+    String token = serviceHelper.getAuthorization();
+    if (token != null) {
+      Long userId = JWTUtil.getUserId(token);
+      if (userId != null && !userId.equals(app.getUserId())) {
+        Member member = memberService.findByUserId(app.getTeamId(), userId);
+        if (member == null) {
+          throw new PermissionDeniedException(
+              "Permission denied, this job not created by the current user, 
And the job cannot be found in the current user's team.");
+        }
+      }
+    }
+  }
+
+  public void checkProxyAppLog(ApplicationLog log) {
+    if (log == null) {
+      throw new PermissionDeniedException("Invalid operation, The application 
log not found.");
+    }
+    Application app = applicationService.getById(log.getAppId());
+    checkProxyApp(app);
+  }
+
   private HttpEntity<?> getRequestEntity(HttpServletRequest request, String 
url) throws Exception {
     HttpHeaders headers = new HttpHeaders();
     Enumeration<String> headerNames = request.getHeaderNames();
@@ -178,7 +202,6 @@ public class ProxyServiceImpl implements ProxyService {
     // Ensure the Host header is set correctly.
     URI uri = new URI(url);
     headers.set("Host", uri.getHost());
-
     byte[] body = null;
     if (request.getInputStream().available() > 0) {
       InputStream inputStream = request.getInputStream();
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
index a2a3b29dd..8f7aca349 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
@@ -272,12 +272,12 @@
     });
   }
 
-  async function handleYarnUrl(yarnAppId: string) {
-    window.open(baseUrl() + '/proxy/yarn/' + yarnAppId + '/');
+  async function handleYarnUrl(id: string) {
+    window.open(baseUrl() + '/proxy/yarn/' + id + '/');
   }
 
-  async function handleViewJobManager(record: Recordable) {
-    window.open(baseUrl() + '/proxy/job_manager/' + record.id + '/');
+  async function handleViewHistory(record: Recordable) {
+    window.open(baseUrl() + '/proxy/history/' + record.id + '/');
   }
 
   function getSavePointAction(record: Recordable): ActionItem[] {
@@ -394,12 +394,12 @@
               <Tag color="orange" v-if="record.optionName === 
OperationEnum.CANCEL"> Cancel </Tag>
             </template>
             <template v-if="column.dataIndex === 'yarnAppId'">
-              <a type="link" @click="handleYarnUrl(record.yarnAppId)" 
target="_blank">
+              <a type="link" @click="handleYarnUrl(record.id)" target="_blank">
                 {{ record.yarnAppId }}
               </a>
             </template>
             <template v-if="column.dataIndex === 'jobManagerUrl'">
-              <a type="link" target="_blank" 
@click="handleViewJobManager(record)">
+              <a type="link" target="_blank" 
@click="handleViewHistory(record)">
                 {{ record.jobManagerUrl }}
               </a>
             </template>
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 5ea810873..390b5eec4 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -103,7 +103,7 @@ export function descriptionFilter(option) {
 }
 
 export async function handleView(app: AppListRecord) {
-  window.open(baseUrl() + '/proxy/flink-ui/' + app.id + '/');
+  window.open(baseUrl() + '/proxy/flink/' + app.id + '/');
 }
 
 export function handleIsStart(app: Recordable, optionApps: Recordable) {

Reply via email to