This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch hadoop-sample-auth in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 7b988a936e0277548c9e6e426a92b4daa5264146 Author: benjobs <[email protected]> AuthorDate: Tue Jun 6 18:25:33 2023 +0800 [improve] hadoop sample support --- .../org/apache/streampark/common/util/HadoopUtils.scala | 2 +- .../org/apache/streampark/common/util/YarnUtils.scala | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala index fa6d3c267..aca0f75a8 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala @@ -58,7 +58,7 @@ object HadoopUtils extends Logger { private[this] var tgt: KerberosTicket = _ - private lazy val hadoopUserName: String = + lazy val hadoopUserName: String = InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME) private[this] lazy val debugKerberos = diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala index c93034348..2ffc244dc 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala @@ -54,6 +54,11 @@ object YarnUtils extends Logger { "kerberos".equalsIgnoreCase(yarnHttpAuth) } + lazy val hasYarnHttpSampleAuth: Boolean = { + val yarnHttpAuth: String = InternalConfigHolder.get[String](CommonConfig.STREAMPARK_YARN_AUTH) + "sample".equalsIgnoreCase(yarnHttpAuth) + } + /** * @param appName * @return @@ -246,15 +251,15 @@ object YarnUtils extends Logger { def restRequest(url: String): String = { if (url == null) return null - def request(url: String): String = { - logDebug("request url is " + url); + def request(reqUrl: String): String = { + logDebug("request url is " + reqUrl) val config = RequestConfig.custom.setConnectTimeout(5000, TimeUnit.MILLISECONDS).build if (hasYarnHttpKerberosAuth) { HadoopUtils .getUgi() .doAs(new PrivilegedExceptionAction[String] { override def run(): String = { - Try(HttpClientUtils.httpAuthGetRequest(url, config)) match { + Try(HttpClientUtils.httpAuthGetRequest(reqUrl, config)) match { case Success(v) => v case Failure(e) => logError("yarnUtils authRestRequest error, detail: ", e) @@ -263,6 +268,9 @@ object YarnUtils extends Logger { } }) } else { + val url = if (hasYarnHttpSampleAuth) { + s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}" + } else reqUrl Try(HttpClientUtils.httpGetRequest(url, config)) match { case Success(v) => v case Failure(e) =>
