This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new a196ace28 [KYUUBI #6199] Support to run HiveSQLEngine on kerberized 
YARN
a196ace28 is described below

commit a196ace2844ea46c6eb3cc78f9ceb5d14eef4ede
Author: zhouyifan279 <[email protected]>
AuthorDate: Fri Mar 22 16:18:43 2024 +0800

    [KYUUBI #6199] Support to run HiveSQLEngine on kerberized YARN
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request implement a feature -  Run HiveSQLEngine on kerberized 
YARN
    
    ## Describe Your Solution ๐Ÿ”ง
    Introduced two configs:
    - kyuubi.engine.principal
    - kyuubi.engine.keytab
    
    When do submit to a kerberized YARN, submitter uploads 
`kyuubi.engine.keytab` to application's staging dir.
    YARN NodeManager downloads keytab to AM's working directory. AM logins to 
Kerberos using the principal and keytab
    
    **Note**
    I've tried to run HiveSQLEngine with only DelegationTokens but failed.
    
    Take SQL `SELECT * FROM a` as an example:
    Hive handles this simple TableScan SQL by reading directly from table's 
hdfs file.
    When Hive invokes `FileInputFormat.getSplits` during reading, 
`java.io.IOException: Delegation Token can be issued only with kerberos or web 
authentication` will be thrown.
    The simplified stacktrace from IDEA is as below:
    ```
    getDelegationToken:734, DFSClient (org.apache.hadoop.hdfs)
    getDelegationToken:2072, DistributedFileSystem (org.apache.hadoop.hdfs)
    collectDelegationTokens:108, DelegationTokenIssuer 
(org.apache.hadoop.security.token)
    addDelegationTokens:83, DelegationTokenIssuer 
(org.apache.hadoop.security.token)
    obtainTokensForNamenodesInternal:143, TokenCache 
(org.apache.hadoop.mapreduce.security)
    obtainTokensForNamenodesInternal:102, TokenCache 
(org.apache.hadoop.mapreduce.security)
    obtainTokensForNamenodes:81, TokenCache 
(org.apache.hadoop.mapreduce.security)
    listStatus:221, FileInputFormat (org.apache.hadoop.mapred)
    getSplits:332, FileInputFormat (org.apache.hadoop.mapred)
    getNextSplits:372, FetchOperator (org.apache.hadoop.hive.ql.exec)
    getRecordReader:304, FetchOperator (org.apache.hadoop.hive.ql.exec)
    getNextRow:459, FetchOperator (org.apache.hadoop.hive.ql.exec)
    pushRow:428, FetchOperator (org.apache.hadoop.hive.ql.exec)
    fetch:147, FetchTask (org.apache.hadoop.hive.ql.exec)
    getResults:2208, Driver (org.apache.hadoop.hive.ql)
    getNextRowSet:494, SQLOperation (org.apache.hive.service.cli.operation)
    getNextRowSetInternal:105, HiveOperation 
(org.apache.kyuubi.engine.hive.operation)
    ```
    
    Theoretically, it can be solved by add AM DelegationTokens into
     `org.apache.hadoop.hive.ql.exec.FetchOperator.job.credentials`.
    But actually, it is impossible without modifying Hive's source code.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    HiveSQLEngine can not run on a kerberized YARN
    
    #### Behavior With This Pull Request :tada:
    HiveSQLEngine can run on a kerberized YARN
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6199 from zhouyifan279/kerberized-hive-engine-on-yarn.
    
    Closes #6199
    
    383d1cdcb [zhouyifan279] Fix tests
    458493a91 [zhouyifan279] Warn if run Hive on YARN without principal and 
keytab
    118afe280 [zhouyifan279] Warn if run Hive on YARN without principal and 
keytab
    41fed0c44 [zhouyifan279] Ignore Principal&Keytab when hadoop security is no 
enabled.
    9e2d86237 [Cheng Pan] Update 
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
    5ae0a3eac [zhouyifan279] Remove redundant checks
    5d3013aaf [zhouyifan279] Use principal & keytab in Local mode
    5733dfdcb [zhouyifan279] Use principal & keytab in Local mode
    85ce9bb7a [zhouyifan279] Use principal & keytab in Local mode
    061223dbe [zhouyifan279] Resolve comments
    e706936e7 [zhouyifan279] Resolve comments
    f84c7bccc [zhouyifan279] Support run HiveSQLEngine on kerberized YARN
    4d262c847 [zhouyifan279] Support run HiveSQLEngine on kerberized YARN
    
    Lead-authored-by: zhouyifan279 <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/configuration/settings.md                     |  2 +
 .../apache/kyuubi/engine/hive/HiveSQLEngine.scala  | 45 ++++++++-----
 .../engine/hive/deploy/HiveYarnModeSubmitter.scala | 14 +++-
 .../HiveCatalogDatabaseOperationSuite.scala        |  5 +-
 .../engine/hive/operation/HiveOperationSuite.scala |  5 +-
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 14 ++++
 .../engine/deploy/yarn/ApplicationMaster.scala     | 45 ++++++++++++-
 .../deploy/yarn/EngineYarnModeSubmitter.scala      | 77 +++++++++++++++++++++-
 .../org/apache/kyuubi/util/KyuubiHadoopUtils.scala | 15 +++++
 .../kyuubi/engine/hive/HiveProcessBuilder.scala    | 39 +++++++++--
 10 files changed, 232 insertions(+), 29 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 8bb87603d..95c7e82a0 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -170,6 +170,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.engine.jdbc.operation.incremental.collect         | false             
        | When true, the result will be sequentially calculated and returned to 
the JDBC engine. It fallback to `kyuubi.operation.incremental.collect`          
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.engine.jdbc.session.initialize.sql                                    
       || SemiColon-separated list of SQL statements to be initialized in the 
newly created engine session before queries.                                    
                                                                                
                                                                                
                                                                                
                [...]
 | kyuubi.engine.jdbc.type                                  | &lt;undefined&gt; 
        | The short name of JDBC type                                           
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| kyuubi.engine.keytab                                     | &lt;undefined&gt; 
        | Kerberos keytab for the kyuubi engine.                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.engine.kubernetes.submit.timeout                  | PT30S             
        | The engine submit timeout for Kubernetes application.                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.engine.operation.convert.catalog.database.enabled | true              
        | When set to true, The engine converts the JDBC methods of set/get 
Catalog and set/get Schema to the implementation of different engines           
                                                                                
                                                                                
                                                                                
                  [...]
 | kyuubi.engine.operation.log.dir.root                     | 
engine_operation_logs     | Root directory for query operation log at 
engine-side.                                                                    
                                                                                
                                                                                
                                                                                
                                          [...]
@@ -177,6 +178,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.engine.pool.selectPolicy                          | RANDOM            
        | The select policy of an engine from the corresponding engine pool 
engine for a session. <ul><li>RANDOM - Randomly use the engine in the 
pool</li><li>POLLING - Polling use the engine in the pool</li></ul>             
                                                                                
                                                                                
                            [...]
 | kyuubi.engine.pool.size                                  | -1                
        | The size of the engine pool. Note that, if the size is less than 1, 
the engine pool will not be enabled; otherwise, the size of the engine pool 
will be min(this, kyuubi.engine.pool.size.threshold).                           
                                                                                
                                                                                
                    [...]
 | kyuubi.engine.pool.size.threshold                        | 9                 
        | This parameter is introduced as a server-side parameter controlling 
the upper limit of the engine pool.                                             
                                                                                
                                                                                
                                                                                
                [...]
+| kyuubi.engine.principal                                  | &lt;undefined&gt; 
        | Kerberos principal for the kyuubi engine.                             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.engine.session.initialize.sql                                         
       || SemiColon-separated list of SQL statements to be initialized in the 
newly created engine session before queries. This configuration can not be used 
in JDBC url due to the limitation of Beeline/JDBC driver.                       
                                                                                
                                                                                
                [...]
 | kyuubi.engine.share.level                                | USER              
        | Engines will be shared in different levels, available configs are: 
<ul> <li>CONNECTION: the engine will not be shared but only used by the current 
client connection, and the engine will be launched by session user.</li> 
<li>USER: the engine will be shared by all sessions created by a unique 
username, and the engine will be launched by session user.</li> <li>GROUP: the 
engine will be shared by all ses [...]
 | kyuubi.engine.share.level.sub.domain                     | &lt;undefined&gt; 
        | (deprecated) - Using kyuubi.engine.share.level.subdomain instead      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
index 4e0787039..24876456c 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
@@ -27,6 +27,8 @@ import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_HIVE_DEPLOY_MODE, 
ENGINE_KEYTAB, ENGINE_PRINCIPAL}
+import org.apache.kyuubi.engine.deploy.DeployMode
 import org.apache.kyuubi.engine.hive.HiveSQLEngine.currentEngine
 import org.apache.kyuubi.engine.hive.events.{HiveEngineEvent, 
HiveEventHandlerRegister}
 import org.apache.kyuubi.events.EventBus
@@ -133,26 +135,37 @@ object HiveSQLEngine extends Logging {
     SignalRegister.registerLogger(logger)
     try {
       Utils.fromCommandLineArgs(args, kyuubiConf)
-      val sessionUser = 
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
+      val proxyUser = 
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
+      require(proxyUser.isDefined, 
s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY} is not set")
       val realUser = UserGroupInformation.getLoginUser
-
-      if (sessionUser.isEmpty || sessionUser.get == realUser.getShortUserName) 
{
-        startEngine()
-      } else {
-        val effectiveUser = 
UserGroupInformation.createProxyUser(sessionUser.get, realUser)
-        effectiveUser.doAs(new PrivilegedExceptionAction[Unit] {
-          override def run(): Unit = {
-            val engineCredentials =
-              
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
-            kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
-            engineCredentials.filter(_.nonEmpty).foreach { credentials =>
-              HiveTBinaryFrontendService.renewDelegationToken(credentials)
+      val principal = kyuubiConf.get(ENGINE_PRINCIPAL)
+      val keytab = kyuubiConf.get(ENGINE_KEYTAB)
+
+      val ugi = DeployMode.withName(kyuubiConf.get(ENGINE_HIVE_DEPLOY_MODE)) 
match {
+        case DeployMode.LOCAL
+            if UserGroupInformation.isSecurityEnabled && principal.isDefined 
&& keytab.isDefined =>
+          UserGroupInformation.loginUserFromKeytab(principal.get, keytab.get)
+          UserGroupInformation.getCurrentUser
+        case DeployMode.LOCAL if proxyUser.get != realUser.getShortUserName =>
+          val newUGI = UserGroupInformation.createProxyUser(proxyUser.get, 
realUser)
+          newUGI.doAs(new PrivilegedExceptionAction[Unit] {
+            override def run(): Unit = {
+              val engineCredentials =
+                
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
+              
kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
+              engineCredentials.filter(_.nonEmpty).foreach { credentials =>
+                HiveTBinaryFrontendService.renewDelegationToken(credentials)
+              }
             }
-            startEngine()
-          }
-        })
+          })
+          newUGI
+        case _ =>
+          UserGroupInformation.getCurrentUser
       }
 
+      ugi.doAs(new PrivilegedExceptionAction[Unit] {
+        override def run(): Unit = startEngine()
+      })
     } catch {
       case t: Throwable =>
         currentEngine match {
diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala
index 9d5126ad6..a717f9dfd 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala
@@ -20,7 +20,10 @@ import java.io.File
 
 import scala.collection.mutable.ListBuffer
 
+import org.apache.hadoop.security.UserGroupInformation
+
 import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_HIVE_EXTRA_CLASSPATH
 import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter
 import org.apache.kyuubi.engine.hive.HiveSQLEngine
@@ -29,7 +32,16 @@ object HiveYarnModeSubmitter extends EngineYarnModeSubmitter 
{
 
   def main(args: Array[String]): Unit = {
     Utils.fromCommandLineArgs(args, kyuubiConf)
-    submitApplication()
+
+    if (UserGroupInformation.isSecurityEnabled) {
+      require(
+        kyuubiConf.get(KyuubiConf.ENGINE_PRINCIPAL).isDefined
+          && kyuubiConf.get(KyuubiConf.ENGINE_KEYTAB).isDefined,
+        s"${KyuubiConf.ENGINE_PRINCIPAL.key} and " +
+          s"${KyuubiConf.ENGINE_KEYTAB.key} must be set when submit " +
+          s"${HiveSQLEngine.getClass.getSimpleName.stripSuffix("$")} to YARN")
+    }
+    run()
   }
 
   override var engineType: String = "hive"
diff --git 
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala
 
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala
index 7db2d7fdc..aa4d77d11 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 
 import org.apache.kyuubi.Utils
 import 
org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED
+import org.apache.kyuubi.config.KyuubiReservedKeys
 import org.apache.kyuubi.engine.hive.HiveSQLEngine
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.util.command.CommandLineUtils._
@@ -34,7 +35,9 @@ class HiveCatalogDatabaseOperationSuite extends 
HiveJDBCTestHelper {
       CONF,
       
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true",
       CONF,
-      s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true")
+      s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true",
+      CONF,
+      s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY}=kyuubi")
     HiveSQLEngine.main(args)
     super.beforeAll()
   }
diff --git 
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
 
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
index 53cc9457a..8150bea32 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.hive.operation
 import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 
 import org.apache.kyuubi.{HiveEngineTests, KYUUBI_VERSION, Utils}
+import org.apache.kyuubi.config.KyuubiReservedKeys
 import org.apache.kyuubi.engine.hive.HiveSQLEngine
 import org.apache.kyuubi.jdbc.hive.KyuubiStatement
 import org.apache.kyuubi.util.command.CommandLineUtils._
@@ -31,7 +32,9 @@ class HiveOperationSuite extends HiveEngineTests {
     metastore.toFile.delete()
     val args = Array(
       CONF,
-      
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true")
+      
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true",
+      CONF,
+      s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY}=kyuubi")
     HiveSQLEngine.main(args)
     super.beforeAll()
   }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index da8b3395a..78d440a31 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2836,6 +2836,20 @@ object KyuubiConf {
       .stringConf
       .createOptional
 
+  val ENGINE_PRINCIPAL: OptionalConfigEntry[String] =
+    buildConf("kyuubi.engine.principal")
+      .doc("Kerberos principal for the kyuubi engine.")
+      .version("1.10.0")
+      .stringConf
+      .createOptional
+
+  val ENGINE_KEYTAB: OptionalConfigEntry[String] =
+    buildConf("kyuubi.engine.keytab")
+      .doc("Kerberos keytab for the kyuubi engine.")
+      .version("1.10.0")
+      .stringConf
+      .createOptional
+
   val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
     buildConf("kyuubi.engine.flink.memory")
       .doc("The heap memory for the Flink SQL engine. Only effective in yarn 
session mode.")
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala
index 3e396beb0..1993009ad 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala
@@ -17,17 +17,20 @@
 package org.apache.kyuubi.engine.deploy.yarn
 
 import java.io.{File, IOException}
+import java.security.PrivilegedExceptionAction
 
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
 
-import org.apache.kyuubi.{Logging, Utils}
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.{KyuubiException, Logging, Utils}
+import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
 import org.apache.kyuubi.service.Serverable
 import org.apache.kyuubi.util.KyuubiHadoopUtils
 import org.apache.kyuubi.util.command.CommandLineUtils.confKeyValues
@@ -71,7 +74,43 @@ object ApplicationMaster extends Logging {
           unregister(finalStatus, finalMsg)
         }
       })
-      runApplicationMaster()
+
+      val ugi = kyuubiConf.get(KyuubiConf.ENGINE_PRINCIPAL) match {
+        case Some(principalName) if UserGroupInformation.isSecurityEnabled =>
+          val originalCreds = 
UserGroupInformation.getCurrentUser().getCredentials()
+          val keytabFilename = kyuubiConf.get(KyuubiConf.ENGINE_KEYTAB).orNull
+          if (!new File(keytabFilename).exists()) {
+            throw new KyuubiException(s"Keytab file: $keytabFilename does not 
exist")
+          } else {
+            info("Attempting to login to Kerberos " +
+              s"using principal: $principalName and keytab: $keytabFilename")
+            UserGroupInformation.loginUserFromKeytab(principalName, 
keytabFilename)
+          }
+
+          val newUGI = UserGroupInformation.getCurrentUser()
+
+          // Transfer YARN_AM_RM_TOKEN to the new user.
+          // Not transfer other tokens, such as HDFS_DELEGATION_TOKEN,
+          // to avoid 
"org.apache.hadoop.ipc.RemoteException(java.io.IOException):
+          // Delegation Token can be issued only with kerberos or web 
authentication"
+          // when engine tries to obtain new tokens.
+          KyuubiHadoopUtils.getTokenMap(originalCreds).values
+            .find(_.getKind == AMRMTokenIdentifier.KIND_NAME)
+            .foreach { token =>
+              newUGI.addToken(token)
+            }
+          newUGI
+        case _ =>
+          val appUser = 
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
+          require(appUser.isDefined, 
s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY} is not set")
+          val newUGI = UserGroupInformation.createRemoteUser(appUser.get)
+          
newUGI.addCredentials(UserGroupInformation.getCurrentUser.getCredentials)
+          newUGI
+      }
+
+      ugi.doAs(new PrivilegedExceptionAction[Unit] {
+        override def run(): Unit = runApplicationMaster()
+      })
     } catch {
       case t: Throwable =>
         error("Error running ApplicationMaster", t)
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
index 552a3158f..e2913539b 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
@@ -17,10 +17,12 @@
 package org.apache.kyuubi.engine.deploy.yarn
 
 import java.io._
+import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
 import java.nio.file.Files
+import java.security.PrivilegedExceptionAction
 import java.util
-import java.util.{Locale, Properties}
+import java.util.{Locale, Properties, UUID}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConverters._
@@ -30,7 +32,8 @@ import scala.collection.mutable.ListBuffer
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.mapred.Master
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records._
@@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.util.Records
 import org.apache.kyuubi.{KyuubiException, Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
 import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter._
 import org.apache.kyuubi.util.KyuubiHadoopUtils
 
@@ -81,6 +85,9 @@ abstract class EngineYarnModeSubmitter extends Logging {
 
   var yarnConf: Configuration = _
   var hadoopConf: Configuration = _
+  var appUser: String = _
+  var keytab: String = _
+  var amKeytabFileName: Option[String] = _
 
   var engineType: String
 
@@ -91,9 +98,35 @@ abstract class EngineYarnModeSubmitter extends Logging {
    */
   def engineExtraJars(): Seq[File] = Seq.empty
 
-  protected def submitApplication(): Unit = {
+  def run(): Unit = {
     yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
     hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
+    appUser = kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY).orNull
+    require(appUser != null, s"$KYUUBI_SESSION_USER_KEY is not set")
+    keytab = kyuubiConf.get(ENGINE_KEYTAB).orNull
+    val principal = kyuubiConf.get(ENGINE_PRINCIPAL).orNull
+    amKeytabFileName =
+      if (UserGroupInformation.isSecurityEnabled && principal != null && 
keytab != null) {
+        info(s"Kerberos credentials: principal = $principal, keytab = $keytab")
+        UserGroupInformation.loginUserFromKeytab(principal, keytab)
+        // Generate a file name that can be used for the keytab file, that 
does not conflict
+        // with any user file.
+        Some(new File(keytab).getName() + "-" + UUID.randomUUID().toString)
+      } else {
+        None
+      }
+
+    val ugi = if (UserGroupInformation.getCurrentUser.getShortUserName == 
appUser) {
+      UserGroupInformation.getCurrentUser
+    } else {
+      UserGroupInformation.createProxyUser(appUser, 
UserGroupInformation.getCurrentUser)
+    }
+    ugi.doAs(new PrivilegedExceptionAction[Unit] {
+      override def run(): Unit = submitApplication()
+    })
+  }
+
+  protected def submitApplication(): Unit = {
     try {
       yarnClient = YarnClient.createYarnClient()
       yarnClient.init(yarnConf)
@@ -134,6 +167,29 @@ abstract class EngineYarnModeSubmitter extends Logging {
     }
   }
 
+  private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
+    if (UserGroupInformation.isSecurityEnabled) {
+      val credentials = obtainHadoopFsDelegationToken()
+      val serializedCreds = KyuubiHadoopUtils.serializeCredentials(credentials)
+      amContainer.setTokens(ByteBuffer.wrap(serializedCreds))
+    }
+  }
+
+  private def obtainHadoopFsDelegationToken(): Credentials = {
+    val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
+    info(s"Delegation token renewer is: $tokenRenewer")
+
+    if (tokenRenewer == null || tokenRenewer.isEmpty) {
+      val errorMessage = "Can't get Master Kerberos principal for use as 
renewer."
+      error(errorMessage)
+      throw new KyuubiException(errorMessage)
+    }
+
+    val credentials = new Credentials()
+    FileSystem.get(hadoopConf).addDelegationTokens(tokenRenewer, credentials)
+    credentials
+  }
+
   private def createContainerLaunchContext(): ContainerLaunchContext = {
     info("Setting up container launch context for engine AM")
     val env = setupLaunchEnv(kyuubiConf)
@@ -171,6 +227,7 @@ abstract class EngineYarnModeSubmitter extends Logging {
     amContainer.setCommands(printableCommands.asJava)
     info(s"Commands: ${printableCommands.mkString(" ")}")
 
+    setupSecurityToken(amContainer)
     amContainer
   }
 
@@ -187,6 +244,19 @@ abstract class EngineYarnModeSubmitter extends Logging {
 
     distributeJars(localResources, env)
     distributeConf(localResources, env)
+
+    // If we passed in a keytab, make sure we copy the keytab to the staging 
directory on
+    // HDFS, and setup the relevant environment vars, so the AM can login 
again.
+    amKeytabFileName.foreach { kt =>
+      info("To enable the AM to login from keytab, credentials are being 
copied over to the AM" +
+        " via the YARN Secure Distributed Cache.")
+      distribute(
+        new Path(new File(keytab).toURI),
+        LocalResourceType.FILE,
+        kt,
+        localResources)
+    }
+
     localResources
   }
 
@@ -253,6 +323,7 @@ abstract class EngineYarnModeSubmitter extends Logging {
       listDistinctFiles(yarnConf.get).foreach(putEntry)
 
       val properties = confToProperties(kyuubiConf)
+      amKeytabFileName.foreach(kt => properties.put(ENGINE_KEYTAB.key, kt))
       writePropertiesToArchive(properties, KYUUBI_CONF_FILE, confStream)
     } finally {
       confStream.close()
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index 2d9ea4a8a..ad083f173 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -72,6 +72,21 @@ object KyuubiHadoopUtils extends Logging {
     creds
   }
 
+  def serializeCredentials(creds: Credentials): Array[Byte] = {
+    val byteStream = new ByteArrayOutputStream
+    val dataStream = new DataOutputStream(byteStream)
+    creds.writeTokenStorageToStream(dataStream)
+    byteStream.toByteArray
+  }
+
+  def deserializeCredentials(tokenBytes: Array[Byte]): Credentials = {
+    val tokensBuf = new ByteArrayInputStream(tokenBytes)
+
+    val creds = new Credentials()
+    creds.readTokenStorageStream(new DataInputStream(tokensBuf))
+    creds
+  }
+
   /**
    * Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]] 
is not present before
    * Hadoop 3.2.1.
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
index 903e06575..da035fcf9 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
@@ -23,10 +23,11 @@ import java.nio.file.{Files, Paths}
 import scala.collection.mutable
 
 import com.google.common.annotations.VisibleForTesting
+import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_DEPLOY_YARN_MODE_APP_NAME, 
ENGINE_HIVE_DEPLOY_MODE, ENGINE_HIVE_EXTRA_CLASSPATH, ENGINE_HIVE_JAVA_OPTIONS, 
ENGINE_HIVE_MEMORY}
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_DEPLOY_YARN_MODE_APP_NAME, 
ENGINE_HIVE_DEPLOY_MODE, ENGINE_HIVE_EXTRA_CLASSPATH, ENGINE_HIVE_JAVA_OPTIONS, 
ENGINE_HIVE_MEMORY, ENGINE_KEYTAB, ENGINE_PRINCIPAL}
 import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, 
KYUUBI_SESSION_USER_KEY}
 import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
 import org.apache.kyuubi.engine.deploy.DeployMode
@@ -121,19 +122,49 @@ object HiveProcessBuilder extends Logging {
   final val HIVE_ENGINE_NAME = "hive.engine.name"
 
   def apply(
-      appUser: String,
+      proxyUser: String,
       doAsEnabled: Boolean,
       conf: KyuubiConf,
       engineRefId: String,
       extraEngineLog: Option[OperationLog],
       defaultEngineName: String): HiveProcessBuilder = {
+    checkKeytab(proxyUser, conf)
     DeployMode.withName(conf.get(ENGINE_HIVE_DEPLOY_MODE)) match {
-      case LOCAL => new HiveProcessBuilder(appUser, doAsEnabled, conf, 
engineRefId, extraEngineLog)
+      case LOCAL =>
+        new HiveProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, 
extraEngineLog)
       case YARN =>
         warn(s"Hive on YARN model is experimental.")
         conf.setIfMissing(ENGINE_DEPLOY_YARN_MODE_APP_NAME, 
Some(defaultEngineName))
-        new HiveYarnModeProcessBuilder(appUser, doAsEnabled, conf, 
engineRefId, extraEngineLog)
+        new HiveYarnModeProcessBuilder(proxyUser, doAsEnabled, conf, 
engineRefId, extraEngineLog)
       case other => throw new KyuubiException(s"Unsupported deploy mode: 
$other")
     }
   }
+
+  private def checkKeytab(proxyUser: String, conf: KyuubiConf): Unit = {
+    val principal = conf.get(ENGINE_PRINCIPAL)
+    val keytab = conf.get(ENGINE_KEYTAB)
+    if (!UserGroupInformation.isSecurityEnabled) {
+      if (principal.isDefined || keytab.isDefined) {
+        warn("Principal and keytab takes no effect when hadoop security is not 
enabled.")
+      }
+      return
+    }
+
+    require(
+      principal.isDefined == keytab.isDefined,
+      s"Both principal and keytab must be defined, or neither.")
+    if (principal.isDefined && keytab.isDefined) {
+      val ugi = UserGroupInformation
+        .loginUserFromKeytabAndReturnUGI(principal.get, keytab.get)
+      require(
+        ugi.getShortUserName == proxyUser,
+        s"Proxy user: $proxyUser is not same with " +
+          s"engine principal: ${ugi.getShortUserName}.")
+    }
+
+    val deployMode = DeployMode.withName(conf.get(ENGINE_HIVE_DEPLOY_MODE))
+    if (principal.isEmpty && keytab.isEmpty && deployMode == YARN) {
+      warn("Hive on YARN can not work properly without principal and keytab.")
+    }
+  }
 }


Reply via email to