This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 5aae2e589 fix(streampark-console): Proxy and Session Issues with YARN
HTTP Kerberos Authentication (#3979)
5aae2e589 is described below
commit 5aae2e589c8c209f5bbccbbfeff701c2d3e709dc
Author: TiDra <[email protected]>
AuthorDate: Tue Aug 20 19:59:45 2024 +0800
fix(streampark-console): Proxy and Session Issues with YARN HTTP Kerberos
Authentication (#3979)
---
.../console/core/entity/FlinkCluster.java | 4 +-
.../core/service/impl/FlinkClusterServiceImpl.java | 2 +-
.../core/service/impl/ProxyServiceImpl.java | 95 ++++++++++++++++++++--
3 files changed, 91 insertions(+), 10 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index f43aac68d..3fef77703 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -174,9 +174,7 @@ public class FlinkCluster implements Serializable {
} else if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum()))
{
try {
String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" +
this.clusterId + "/overview";
- String result =
- HttpClientUtils.httpGetRequest(
- restUrl,
RequestConfig.custom().setConnectTimeout(2000).build());
+ String result = YarnUtils.restRequest(restUrl, 2000);
JacksonUtils.read(result, Overview.class);
return true;
} catch (Exception e) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 1cc9a82f4..9c1c54b45 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -185,7 +185,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
// 2) setAddress
if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
String address =
- YarnUtils.getRMWebAppURL(true) + "/proxy/" +
deployResponse.clusterId() + "/";
+ YarnUtils.getRMWebAppProxyURL() + "/proxy/" +
deployResponse.clusterId() + "/";
flinkCluster.setAddress(address);
} else {
flinkCluster.setAddress(deployResponse.address());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
index bc6d712e7..9bf47b5e3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
@@ -17,6 +17,8 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.util.HadoopUtils;
+import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.EncryptUtils;
import org.apache.streampark.console.core.entity.Application;
@@ -34,6 +36,12 @@ import
org.apache.streampark.console.system.service.MemberService;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.web.client.RestTemplateBuilder;
@@ -43,6 +51,7 @@ import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpResponse;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.client.DefaultResponseErrorHandler;
import org.springframework.web.client.RestTemplate;
@@ -52,6 +61,8 @@ import javax.servlet.http.HttpServletRequest;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
import java.util.Enumeration;
@Service
@@ -73,6 +84,10 @@ public class ProxyServiceImpl implements ProxyService {
private final RestTemplate proxyRestTemplate;
+ private final boolean hasYarnHttpKerberosAuth;
+
+ private String lastUsername = "";
+
public ProxyServiceImpl(RestTemplateBuilder restTemplateBuilder) {
this.proxyRestTemplate =
restTemplateBuilder
@@ -84,6 +99,9 @@ public class ProxyServiceImpl implements ProxyService {
}
})
.build();
+
+ String yarnHttpAuth = SystemPropertyUtils.get("streampark.yarn.http-auth");
+ this.hasYarnHttpKerberosAuth = "kerberos".equalsIgnoreCase(yarnHttpAuth);
}
@Override
@@ -117,7 +135,8 @@ public class ProxyServiceImpl implements ProxyService {
case YARN_SESSION:
String yarnURL = YarnUtils.getRMWebAppProxyURL();
url = yarnURL + "/proxy/" + app.getClusterId();
- break;
+ url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, "");
+ return proxyYarnRequest(request, url);
case KUBERNETES_NATIVE_APPLICATION:
case KUBERNETES_NATIVE_SESSION:
String jobManagerUrl = app.getJobManagerUrl();
@@ -143,7 +162,7 @@ public class ProxyServiceImpl implements ProxyService {
String yarnURL = YarnUtils.getRMWebAppProxyURL();
String url = yarnURL + "/proxy/" + appId + "/";
url += getRequestURL(request).replace("/proxy/yarn/" + appId, "");
- return proxyRequest(request, url);
+ return proxyYarnRequest(request, url);
}
@Override
@@ -155,7 +174,8 @@ public class ProxyServiceImpl implements ProxyService {
return proxyRequest(request, url);
}
- private ResponseEntity<?> proxyRequest(HttpServletRequest request, String
url) throws Exception {
+ private HttpEntity<?> getRequestEntity(HttpServletRequest request, String
url, boolean setAuth)
+ throws Exception {
HttpHeaders headers = new HttpHeaders();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
@@ -163,9 +183,15 @@ public class ProxyServiceImpl implements ProxyService {
headers.set(headerName, request.getHeader(headerName));
}
- String token = serviceHelper.getAuthorization();
- if (token != null) {
- headers.set("Authorization", EncryptUtils.encrypt(token));
+ // Ensure the Host header is set correctly.
+ URI uri = new URI(url);
+ headers.set("Host", uri.getHost());
+
+ if (setAuth) {
+ String token = serviceHelper.getAuthorization();
+ if (token != null) {
+ headers.set("Authorization", EncryptUtils.encrypt(token));
+ }
}
byte[] body = null;
@@ -177,12 +203,69 @@ public class ProxyServiceImpl implements ProxyService {
}
HttpEntity<?> requestEntity = new HttpEntity<>(body, headers);
+ return requestEntity;
+ }
+
+ private ResponseEntity<?> proxyRequest(HttpServletRequest request, String
url) throws Exception {
+ HttpEntity<?> requestEntity = getRequestEntity(request, url, true);
return proxyRestTemplate.exchange(
url, HttpMethod.valueOf(request.getMethod()), requestEntity,
byte[].class);
}
+ private ResponseEntity<?> proxyYarnRequest(HttpServletRequest request,
String url)
+ throws Exception {
+ if (hasYarnHttpKerberosAuth) {
+ UserGroupInformation ugi = HadoopUtils.getUgi();
+
+ HttpEntity<?> requestEntity = getRequestEntity(request, url, false);
+ setRestTemplateCredentials(ugi.getShortUserName());
+
+ return ugi.doAs(
+ new PrivilegedExceptionAction<ResponseEntity<?>>() {
+ @Override
+ public ResponseEntity<?> run() throws Exception {
+ return proxyRestTemplate.exchange(
+ url, HttpMethod.valueOf(request.getMethod()), requestEntity,
byte[].class);
+ }
+ });
+ } else {
+ return proxyRequest(request, url);
+ }
+ }
+
private String getRequestURL(HttpServletRequest request) {
return request.getRequestURI()
+ (request.getQueryString() != null ? "?" + request.getQueryString() :
"");
}
+
+ private void setRestTemplateCredentials(String username) {
+ setRestTemplateCredentials(username, null);
+ }
+
+ /**
+ * Configures the RestTemplate's HttpClient connector. This method is
primarily used to configure
+ * the HttpClient authentication information and SSL certificate validation
policies.
+ *
+ * @param username The username for HTTP basic authentication.
+ * @param password The password for HTTP basic authentication.
+ */
+ private void setRestTemplateCredentials(String username, String password) {
+ // Check if the username is not null and has changed since the last
configuration
+ if (username != null && !this.lastUsername.equals(username)) {
+ // Create a new credentials provider
+ BasicCredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
+ // Add the username and password for HTTP basic authentication
+ credentialsProvider.setCredentials(
+ AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+
+ // Customize the HttpClient with the credentials provider
+ CloseableHttpClient httpClient =
+
HttpClients.custom().setDefaultCredentialsProvider(credentialsProvider).build();
+ // Set the HttpClient request factory for the RestTemplate
+ this.proxyRestTemplate.setRequestFactory(
+ new HttpComponentsClientHttpRequestFactory(httpClient));
+ // Update the last known username
+ this.lastUsername = username;
+ }
+ }
}