This is an automated email from the ASF dual-hosted git repository.
muchunjin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 53ca8d9ef [improve] hadoop sample support (#2793)
53ca8d9ef is described below
commit 53ca8d9ef0825e12fae1383bfedc2dd2416d9edc
Author: benjobs <[email protected]>
AuthorDate: Wed Jun 14 12:43:43 2023 +0800
[improve] hadoop sample support (#2793)
Co-authored-by: benjobs <[email protected]>
---
.../org/apache/streampark/common/util/HadoopUtils.scala | 2 +-
.../org/apache/streampark/common/util/YarnUtils.scala | 14 +++++++++++---
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index fa6d3c267..aca0f75a8 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -58,7 +58,7 @@ object HadoopUtils extends Logger {
private[this] var tgt: KerberosTicket = _
- private lazy val hadoopUserName: String =
+ lazy val hadoopUserName: String =
InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME)
private[this] lazy val debugKerberos =
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index c93034348..2ffc244dc 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -54,6 +54,11 @@ object YarnUtils extends Logger {
"kerberos".equalsIgnoreCase(yarnHttpAuth)
}
+ lazy val hasYarnHttpSampleAuth: Boolean = {
+ val yarnHttpAuth: String =
InternalConfigHolder.get[String](CommonConfig.STREAMPARK_YARN_AUTH)
+ "sample".equalsIgnoreCase(yarnHttpAuth)
+ }
+
/**
* @param appName
* @return
@@ -246,15 +251,15 @@ object YarnUtils extends Logger {
def restRequest(url: String): String = {
if (url == null) return null
- def request(url: String): String = {
- logDebug("request url is " + url);
+ def request(reqUrl: String): String = {
+ logDebug("request url is " + reqUrl)
val config = RequestConfig.custom.setConnectTimeout(5000,
TimeUnit.MILLISECONDS).build
if (hasYarnHttpKerberosAuth) {
HadoopUtils
.getUgi()
.doAs(new PrivilegedExceptionAction[String] {
override def run(): String = {
- Try(HttpClientUtils.httpAuthGetRequest(url, config)) match {
+ Try(HttpClientUtils.httpAuthGetRequest(reqUrl, config)) match {
case Success(v) => v
case Failure(e) =>
logError("yarnUtils authRestRequest error, detail: ", e)
@@ -263,6 +268,9 @@ object YarnUtils extends Logger {
}
})
} else {
+ val url = if (hasYarnHttpSampleAuth) {
+ s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
+ } else reqUrl
Try(HttpClientUtils.httpGetRequest(url, config)) match {
case Success(v) => v
case Failure(e) =>