This is an automated email from the ASF dual-hosted git repository.

muchunjin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 53ca8d9ef [improve] hadoop sample support (#2793)
53ca8d9ef is described below

commit 53ca8d9ef0825e12fae1383bfedc2dd2416d9edc
Author: benjobs <[email protected]>
AuthorDate: Wed Jun 14 12:43:43 2023 +0800

    [improve] hadoop sample support (#2793)
    
    Co-authored-by: benjobs <[email protected]>
---
 .../org/apache/streampark/common/util/HadoopUtils.scala    |  2 +-
 .../org/apache/streampark/common/util/YarnUtils.scala      | 14 +++++++++++---
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index fa6d3c267..aca0f75a8 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -58,7 +58,7 @@ object HadoopUtils extends Logger {
 
   private[this] var tgt: KerberosTicket = _
 
-  private lazy val hadoopUserName: String =
+  lazy val hadoopUserName: String =
     InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME)
 
   private[this] lazy val debugKerberos =
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index c93034348..2ffc244dc 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -54,6 +54,11 @@ object YarnUtils extends Logger {
     "kerberos".equalsIgnoreCase(yarnHttpAuth)
   }
 
+  lazy val hasYarnHttpSampleAuth: Boolean = {
+    val yarnHttpAuth: String = 
InternalConfigHolder.get[String](CommonConfig.STREAMPARK_YARN_AUTH)
+    "sample".equalsIgnoreCase(yarnHttpAuth)
+  }
+
   /**
    * @param appName
    * @return
@@ -246,15 +251,15 @@ object YarnUtils extends Logger {
   def restRequest(url: String): String = {
     if (url == null) return null
 
-    def request(url: String): String = {
-      logDebug("request url is " + url);
+    def request(reqUrl: String): String = {
+      logDebug("request url is " + reqUrl)
       val config = RequestConfig.custom.setConnectTimeout(5000, 
TimeUnit.MILLISECONDS).build
       if (hasYarnHttpKerberosAuth) {
         HadoopUtils
           .getUgi()
           .doAs(new PrivilegedExceptionAction[String] {
             override def run(): String = {
-              Try(HttpClientUtils.httpAuthGetRequest(url, config)) match {
+              Try(HttpClientUtils.httpAuthGetRequest(reqUrl, config)) match {
                 case Success(v) => v
                 case Failure(e) =>
                   logError("yarnUtils authRestRequest error, detail: ", e)
@@ -263,6 +268,9 @@ object YarnUtils extends Logger {
             }
           })
       } else {
+        val url = if (hasYarnHttpSampleAuth) {
+          s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
+        } else reqUrl
         Try(HttpClientUtils.httpGetRequest(url, config)) match {
           case Success(v) => v
           case Failure(e) =>

Reply via email to