This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new 5aae2e589 fix(streampark-console): Proxy and Session Issues with YARN 
HTTP Kerberos Authentication (#3979)
5aae2e589 is described below

commit 5aae2e589c8c209f5bbccbbfeff701c2d3e709dc
Author: TiDra <[email protected]>
AuthorDate: Tue Aug 20 19:59:45 2024 +0800

    fix(streampark-console): Proxy and Session Issues with YARN HTTP Kerberos 
Authentication (#3979)
---
 .../console/core/entity/FlinkCluster.java          |  4 +-
 .../core/service/impl/FlinkClusterServiceImpl.java |  2 +-
 .../core/service/impl/ProxyServiceImpl.java        | 95 ++++++++++++++++++++--
 3 files changed, 91 insertions(+), 10 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index f43aac68d..3fef77703 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -174,9 +174,7 @@ public class FlinkCluster implements Serializable {
     } else if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) 
{
       try {
         String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" + 
this.clusterId + "/overview";
-        String result =
-            HttpClientUtils.httpGetRequest(
-                restUrl, 
RequestConfig.custom().setConnectTimeout(2000).build());
+        String result = YarnUtils.restRequest(restUrl, 2000);
         JacksonUtils.read(result, Overview.class);
         return true;
       } catch (Exception e) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 1cc9a82f4..9c1c54b45 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -185,7 +185,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
         // 2) setAddress
         if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
           String address =
-              YarnUtils.getRMWebAppURL(true) + "/proxy/" + 
deployResponse.clusterId() + "/";
+              YarnUtils.getRMWebAppProxyURL() + "/proxy/" + 
deployResponse.clusterId() + "/";
           flinkCluster.setAddress(address);
         } else {
           flinkCluster.setAddress(deployResponse.address());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
index bc6d712e7..9bf47b5e3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.HadoopUtils;
+import org.apache.streampark.common.util.SystemPropertyUtils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.util.EncryptUtils;
 import org.apache.streampark.console.core.entity.Application;
@@ -34,6 +36,12 @@ import 
org.apache.streampark.console.system.service.MemberService;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.web.client.RestTemplateBuilder;
@@ -43,6 +51,7 @@ import org.springframework.http.HttpMethod;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.http.client.ClientHttpResponse;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.DefaultResponseErrorHandler;
 import org.springframework.web.client.RestTemplate;
@@ -52,6 +61,8 @@ import javax.servlet.http.HttpServletRequest;
 
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.util.Enumeration;
 
 @Service
@@ -73,6 +84,10 @@ public class ProxyServiceImpl implements ProxyService {
 
   private final RestTemplate proxyRestTemplate;
 
+  private final boolean hasYarnHttpKerberosAuth;
+
+  private String lastUsername = "";
+
   public ProxyServiceImpl(RestTemplateBuilder restTemplateBuilder) {
     this.proxyRestTemplate =
         restTemplateBuilder
@@ -84,6 +99,9 @@ public class ProxyServiceImpl implements ProxyService {
                   }
                 })
             .build();
+
+    String yarnHttpAuth = SystemPropertyUtils.get("streampark.yarn.http-auth");
+    this.hasYarnHttpKerberosAuth = "kerberos".equalsIgnoreCase(yarnHttpAuth);
   }
 
   @Override
@@ -117,7 +135,8 @@ public class ProxyServiceImpl implements ProxyService {
       case YARN_SESSION:
         String yarnURL = YarnUtils.getRMWebAppProxyURL();
         url = yarnURL + "/proxy/" + app.getClusterId();
-        break;
+        url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, "");
+        return proxyYarnRequest(request, url);
       case KUBERNETES_NATIVE_APPLICATION:
       case KUBERNETES_NATIVE_SESSION:
         String jobManagerUrl = app.getJobManagerUrl();
@@ -143,7 +162,7 @@ public class ProxyServiceImpl implements ProxyService {
     String yarnURL = YarnUtils.getRMWebAppProxyURL();
     String url = yarnURL + "/proxy/" + appId + "/";
     url += getRequestURL(request).replace("/proxy/yarn/" + appId, "");
-    return proxyRequest(request, url);
+    return proxyYarnRequest(request, url);
   }
 
   @Override
@@ -155,7 +174,8 @@ public class ProxyServiceImpl implements ProxyService {
     return proxyRequest(request, url);
   }
 
-  private ResponseEntity<?> proxyRequest(HttpServletRequest request, String 
url) throws Exception {
+  private HttpEntity<?> getRequestEntity(HttpServletRequest request, String 
url, boolean setAuth)
+      throws Exception {
     HttpHeaders headers = new HttpHeaders();
     Enumeration<String> headerNames = request.getHeaderNames();
     while (headerNames.hasMoreElements()) {
@@ -163,9 +183,15 @@ public class ProxyServiceImpl implements ProxyService {
       headers.set(headerName, request.getHeader(headerName));
     }
 
-    String token = serviceHelper.getAuthorization();
-    if (token != null) {
-      headers.set("Authorization", EncryptUtils.encrypt(token));
+    // Ensure the Host header is set correctly.
+    URI uri = new URI(url);
+    headers.set("Host", uri.getHost());
+
+    if (setAuth) {
+      String token = serviceHelper.getAuthorization();
+      if (token != null) {
+        headers.set("Authorization", EncryptUtils.encrypt(token));
+      }
     }
 
     byte[] body = null;
@@ -177,12 +203,69 @@ public class ProxyServiceImpl implements ProxyService {
     }
 
     HttpEntity<?> requestEntity = new HttpEntity<>(body, headers);
+    return requestEntity;
+  }
+
+  private ResponseEntity<?> proxyRequest(HttpServletRequest request, String 
url) throws Exception {
+    HttpEntity<?> requestEntity = getRequestEntity(request, url, true);
     return proxyRestTemplate.exchange(
         url, HttpMethod.valueOf(request.getMethod()), requestEntity, 
byte[].class);
   }
 
+  private ResponseEntity<?> proxyYarnRequest(HttpServletRequest request, 
String url)
+      throws Exception {
+    if (hasYarnHttpKerberosAuth) {
+      UserGroupInformation ugi = HadoopUtils.getUgi();
+
+      HttpEntity<?> requestEntity = getRequestEntity(request, url, false);
+      setRestTemplateCredentials(ugi.getShortUserName());
+
+      return ugi.doAs(
+          new PrivilegedExceptionAction<ResponseEntity<?>>() {
+            @Override
+            public ResponseEntity<?> run() throws Exception {
+              return proxyRestTemplate.exchange(
+                  url, HttpMethod.valueOf(request.getMethod()), requestEntity, 
byte[].class);
+            }
+          });
+    } else {
+      return proxyRequest(request, url);
+    }
+  }
+
   private String getRequestURL(HttpServletRequest request) {
     return request.getRequestURI()
         + (request.getQueryString() != null ? "?" + request.getQueryString() : 
"");
   }
+
+  private void setRestTemplateCredentials(String username) {
+    setRestTemplateCredentials(username, null);
+  }
+
+  /**
+   * Configures the RestTemplate's HttpClient connector. This method is 
primarily used to configure
+   * the HttpClient authentication information and SSL certificate validation 
policies.
+   *
+   * @param username The username for HTTP basic authentication.
+   * @param password The password for HTTP basic authentication.
+   */
+  private void setRestTemplateCredentials(String username, String password) {
+    // Check if the username is not null and has changed since the last 
configuration
+    if (username != null && !this.lastUsername.equals(username)) {
+      // Create a new credentials provider
+      BasicCredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+      // Add the username and password for HTTP basic authentication
+      credentialsProvider.setCredentials(
+          AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+
+      // Customize the HttpClient with the credentials provider
+      CloseableHttpClient httpClient =
+          
HttpClients.custom().setDefaultCredentialsProvider(credentialsProvider).build();
+      // Set the HttpClient request factory for the RestTemplate
+      this.proxyRestTemplate.setRequestFactory(
+          new HttpComponentsClientHttpRequestFactory(httpClient));
+      // Update the last known username
+      this.lastUsername = username;
+    }
+  }
 }

Reply via email to