This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a01f65  [KYUUBI #916] Send DelegationTokens to SQL engines
6a01f65 is described below

commit 6a01f65d3ec418dd4b12ace1d30189edebb04c0c
Author: zhouyifan279 <[email protected]>
AuthorDate: Tue Sep 14 16:37:37 2021 +0800

    [KYUUBI #916] Send DelegationTokens to SQL engines
    
    ### _Why are the changes needed?_
    This PR finishes issue #916
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1074 from zhouyifan279/KYUUBI#916.
    
    Closes #916
    
    28176c7e [zhouyifan279] [KYUUBI #916] Send DelegationTokens to SQL engines
    b6c931bd [zhouyifan279] [KYUUBI #916] Send DelegationTokens to SQL engines
    
    Authored-by: zhouyifan279 <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 docs/deployment/settings.md                        |   6 +-
 .../engine/spark/SparkSQLBackendService.scala      |   2 +
 .../kyuubi/engine/spark/SparkSQLEngine.scala       |   4 +-
 .../engine/spark/SparkThriftFrontendService.scala  | 140 +++++++++++++++++++++
 .../apache/spark/kyuubi/SparkContextHelper.scala   |  24 +++-
 .../spark/operation/SparkOperationSuite.scala      | 119 ++++++++++++++++++
 .../org/apache/kyuubi/config/KyuubiConf.scala      |   6 +-
 .../kyuubi/service/ThriftFrontendService.scala     |   2 +-
 .../org/apache/kyuubi/util/KyuubiHadoopUtils.scala |  56 ++++++++-
 .../kyuubi/client/KyuubiSyncThriftClient.scala     |  10 ++
 .../credentials/HadoopCredentialsManager.scala     |   4 +
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |   2 +-
 .../apache/kyuubi/operation/ExecuteStatement.scala |  16 ++-
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  |   3 +-
 14 files changed, 377 insertions(+), 17 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index a3c3bcf..a77eaed 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -151,10 +151,10 @@ 
kyuubi\.backend\.server<br>\.exec\.pool\.wait\.queue<br>\.size|<div style='width
 
 Key | Default | Meaning | Type | Since
 --- | --- | --- | --- | ---
-kyuubi\.credentials<br>\.hadoopfs\.enabled|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Whether to renew HadoopFS 
DelegationToken</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 
20pt'>1.4.0</div>
+kyuubi\.credentials<br>\.hadoopfs\.enabled|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Whether to renew Hadoop filesystem delegation 
tokens</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 
20pt'>1.4.0</div>
 kyuubi\.credentials<br>\.hadoopfs\.uris|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Extra Hadoop filesystem URIs for which to 
request delegation tokens. The filesystem that hosts fs.defaultFS does not need 
to be listed here.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 
20pt'>1.4.0</div>
-kyuubi\.credentials<br>\.hive\.enabled|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Whether to renew HiveMetaStore 
DelegationToken</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 
20pt'>1.4.0</div>
-kyuubi\.credentials<br>\.renewal\.interval|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT1H</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>How often Kyuubi renews one user's 
DelegationTokens</div>|<div style='width: 30pt'>duration</div>|<div 
style='width: 20pt'>1.4.0</div>
+kyuubi\.credentials<br>\.hive\.enabled|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Whether to renew Hive metastore delegation 
token</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 
20pt'>1.4.0</div>
+kyuubi\.credentials<br>\.renewal\.interval|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT1H</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>How often Kyuubi renews one user's delegation 
tokens</div>|<div style='width: 30pt'>duration</div>|<div style='width: 
20pt'>1.4.0</div>
 kyuubi\.credentials<br>\.renewal\.retryWait|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>How long to wait before retrying to fetch new 
credentials after a failure.</div>|<div style='width: 30pt'>duration</div>|<div 
style='width: 20pt'>1.4.0</div>
 
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala
index fb43e64..691631d 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala
@@ -38,4 +38,6 @@ class SparkSQLBackendService(name: String, spark: 
SparkSession)
   def this(spark: SparkSession) = 
this(classOf[SparkSQLBackendService].getSimpleName, spark)
 
   override val sessionManager: SessionManager = new 
SparkSQLSessionManager(spark)
+
+  def sparkSession: SparkSession = spark
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index a04a7d1..c880900 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -33,7 +33,7 @@ import 
org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
 import org.apache.kyuubi.engine.spark.events.{EngineEvent, EventLoggingService}
 import org.apache.kyuubi.ha.HighAvailabilityConf._
 import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies, 
ServiceDiscovery}
-import org.apache.kyuubi.service.{Serverable, Service, ServiceState, 
ThriftFrontendService}
+import org.apache.kyuubi.service.{Serverable, Service, ServiceState}
 import org.apache.kyuubi.util.SignalRegister
 
 case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngine") {
@@ -43,7 +43,7 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
   private val OOMHook = new Runnable { override def run(): Unit = stop() }
   private val eventLogging = new EventLoggingService(this)
   override val backendService = new SparkSQLBackendService(spark)
-  val frontendService = new ThriftFrontendService(backendService, OOMHook)
+  val frontendService = new SparkThriftFrontendService(backendService, OOMHook)
   override val discoveryService: Service = new EngineServiceDiscovery(this)
 
   override protected def supportsServiceDiscovery: Boolean = {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftFrontendService.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftFrontendService.scala
new file mode 100644
index 0000000..fb9b82a
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftFrontendService.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.hive.service.rpc.thrift.{TRenewDelegationTokenReq, 
TRenewDelegationTokenResp}
+import org.apache.spark.kyuubi.SparkContextHelper
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.service.ThriftFrontendService
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+
+class SparkThriftFrontendService private (
+    name: String,
+    be: SparkSQLBackendService,
+    oomHook: Runnable)
+  extends ThriftFrontendService(name, be, oomHook) {
+  import SparkThriftFrontendService._
+
+  private val sc = be.sparkSession.sparkContext
+
+  def this(be: SparkSQLBackendService, oomHook: Runnable) = {
+    this(classOf[SparkThriftFrontendService].getSimpleName, be, oomHook)
+  }
+
+  override def RenewDelegationToken(req: TRenewDelegationTokenReq): 
TRenewDelegationTokenResp = {
+    debug(req.toString)
+
+    // We hacked `TCLIService.Iface.RenewDelegationToken` to transfer 
Credentials from Kyuubi
+    // Server to Spark SQL engine
+    val resp = new TRenewDelegationTokenResp()
+    try {
+      val newCreds = 
KyuubiHadoopUtils.decodeCredentials(req.getDelegationToken)
+      val (hiveTokens, otherTokens) =
+        KyuubiHadoopUtils.getTokenMap(newCreds).partition(_._2.getKind == 
HIVE_DELEGATION_TOKEN)
+
+      val updateCreds = new Credentials()
+      val oldCreds = 
KyuubiHadoopUtils.getCredentialsInternal(UserGroupInformation.getCurrentUser)
+      addHiveToken(hiveTokens, oldCreds, updateCreds)
+      addOtherTokens(otherTokens, oldCreds, updateCreds)
+      if (updateCreds.numberOfTokens() > 0) {
+        SparkContextHelper.updateDelegationTokens(sc, updateCreds)
+      }
+
+      resp.setStatus(ThriftFrontendService.OK_STATUS)
+    } catch {
+      case e: Exception =>
+        warn("Error renew delegation tokens: ", e)
+        resp.setStatus(KyuubiSQLException.toTStatus(e))
+    }
+    resp
+  }
+
+  private def addHiveToken(
+      newTokens: Map[Text, Token[_ <: TokenIdentifier]],
+      oldCreds: Credentials,
+      updateCreds: Credentials): Unit = {
+    val metastoreUris = 
sc.hadoopConfiguration.getTrimmed("hive.metastore.uris", "")
+
+    // `HiveMetaStoreClient` selects the first token whose service is "" and 
kind is
+    // "HIVE_DELEGATION_TOKEN" to authenticate.
+    val oldAliasAndToken = KyuubiHadoopUtils.getTokenMap(oldCreds)
+      .find { case (_, token) =>
+        token.getKind == HIVE_DELEGATION_TOKEN && token.getService == new 
Text()
+      }
+
+    if (metastoreUris.nonEmpty && oldAliasAndToken.isDefined) {
+      // Each entry of `newTokens` is a <uris, token> pair for a metastore 
cluster.
+      // If entry's uris and engine's metastore uris have at least 1 same uri, 
we presume they
+      // represent the same metastore cluster.
+      val uriSet = metastoreUris.split(",").filter(_.nonEmpty).toSet
+      val newToken = newTokens
+        .find { case (uris, token) =>
+          val matched = uris.toString.split(",").exists(uriSet.contains) &&
+            token.getService == new Text()
+          if (!matched) {
+            debug(s"Filter out Hive token $token")
+          }
+          matched
+        }
+        .map(_._2)
+      newToken.foreach { token =>
+        if (KyuubiHadoopUtils.getTokenIssueDate(token) >
+            KyuubiHadoopUtils.getTokenIssueDate(oldAliasAndToken.get._2)) {
+          updateCreds.addToken(oldAliasAndToken.get._1, token)
+        } else {
+          warn(s"Ignore Hive token with earlier issue date: $token")
+        }
+      }
+      if (newToken.isEmpty) {
+        warn(s"No matching Hive token found for engine metastore uris 
$metastoreUris")
+      }
+    } else if (metastoreUris.isEmpty) {
+      info(s"Ignore Hive token as engine metastore uris are empty")
+    } else {
+      info(s"Ignore Hive token as engine has not Hive token ever before")
+    }
+  }
+
+  private def addOtherTokens(
+      tokens: Map[Text, Token[_ <: TokenIdentifier]],
+      oldCreds: Credentials,
+      updateCreds: Credentials): Unit = {
+    tokens.foreach { case (alias, newToken) =>
+      val oldToken = oldCreds.getToken(alias)
+      if (oldToken != null) {
+        if (KyuubiHadoopUtils.getTokenIssueDate(newToken) >
+            KyuubiHadoopUtils.getTokenIssueDate(oldToken)) {
+          updateCreds.addToken(alias, newToken)
+        } else {
+          warn(s"Ignore token with earlier issue date: $newToken")
+        }
+      } else {
+        info(s"Ignore unknown token $newToken")
+      }
+    }
+  }
+}
+
+object SparkThriftFrontendService {
+
+  val HIVE_DELEGATION_TOKEN = new Text("HIVE_DELEGATION_TOKEN")
+}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
index 667bd98..337eccb 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
@@ -17,8 +17,15 @@
 
 package org.apache.spark.kyuubi
 
+import org.apache.hadoop.security.Credentials
 import org.apache.spark.SparkContext
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.SchedulerBackend
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.local.LocalSchedulerBackend
 
+import org.apache.kyuubi.Logging
 import org.apache.kyuubi.engine.spark.events.KyuubiSparkEvent
 import org.apache.kyuubi.events.EventLogger
 
@@ -26,10 +33,25 @@ import org.apache.kyuubi.events.EventLogger
  * A place to invoke non-public APIs of [[SparkContext]], anything to be added 
here need to
  * think twice
  */
-object SparkContextHelper {
+object SparkContextHelper extends Logging {
+
   def createSparkHistoryLogger(sc: SparkContext): 
EventLogger[KyuubiSparkEvent] = {
     new SparkHistoryEventLogger(sc)
   }
+
+  def updateDelegationTokens(sc: SparkContext, creds: Credentials): Unit = {
+    val bytes = SparkHadoopUtil.get.serialize(creds)
+    sc.schedulerBackend match {
+      case _: LocalSchedulerBackend =>
+        SparkHadoopUtil.get.addDelegationTokens(bytes, sc.conf)
+      case backend: CoarseGrainedSchedulerBackend =>
+        backend.driverEndpoint.send(UpdateDelegationTokens(bytes))
+      case backend: SchedulerBackend =>
+        warn(s"Failed to update delegation tokens due to unsupported 
SchedulerBackend " +
+          s"${backend.getClass.getName}.")
+    }
+  }
+
 }
 
 /**
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index 5d6f4ac..347ab1a 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -22,11 +22,17 @@ import java.sql.{DatabaseMetaData, ResultSet, 
SQLFeatureNotSupportedException}
 import scala.collection.JavaConverters._
 import scala.util.Random
 
+import 
org.apache.hadoop.hdfs.security.token.delegation.{DelegationTokenIdentifier => 
HDFSTokenIdent}
+import org.apache.hadoop.hive.thrift.{DelegationTokenIdentifier => 
HiveTokenIdent}
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 import org.apache.hive.common.util.HiveVersionInfo
 import org.apache.hive.service.cli.HiveSQLException
 import org.apache.hive.service.rpc.thrift._
 import org.apache.hive.service.rpc.thrift.TCLIService.Iface
 import org.apache.hive.service.rpc.thrift.TOperationState._
+import org.apache.spark.kyuubi.SparkContextHelper
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.types._
 
@@ -35,6 +41,7 @@ import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
 import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
 import org.apache.kyuubi.operation.HiveJDBCTests
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.util.KyuubiHadoopUtils
 
 class SparkOperationSuite extends WithSparkSQLEngine with HiveJDBCTests {
 
@@ -761,4 +768,116 @@ class SparkOperationSuite extends WithSparkSQLEngine with 
HiveJDBCTests {
       assert(tFetchResultsResp.getStatus.getStatusCode === 
TStatusCode.SUCCESS_STATUS)
     }
   }
+
+  test("send credentials by TRenewDelegationTokenReq") {
+    // Simulate a secured SparkSQLEngine's credentials
+    val currentTime = System.currentTimeMillis()
+    val hdfsTokenAlias = new Text("HDFS1")
+    val hiveTokenAlias = new Text("hive.server2.delegation.token")
+    val creds1 = createCredentials(currentTime, hdfsTokenAlias.toString, 
hiveTokenAlias.toString)
+    // SparkSQLEngine may have token alias unknown to Kyuubi Server
+    val unknownTokenAlias = new Text("UNKNOWN")
+    val unknownToken = newHDFSToken(currentTime)
+    creds1.addToken(unknownTokenAlias, unknownToken)
+    SparkContextHelper.updateDelegationTokens(spark.sparkContext, creds1)
+
+    val metastoreUris = "thrift://localhost:9083,thrift://localhost:9084"
+
+    whenMetaStoreURIsSetTo(metastoreUris) { uris =>
+      withThriftClient { client =>
+        val req = new TOpenSessionReq()
+        req.setUsername("username")
+        req.setPassword("password")
+        val tOpenSessionResp = client.OpenSession(req)
+
+        def sendCredentials(client: TCLIService.Iface, credentials: 
Credentials): Unit = {
+          val renewDelegationTokenReq = new TRenewDelegationTokenReq()
+          
renewDelegationTokenReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
+          renewDelegationTokenReq.setDelegationToken(
+            KyuubiHadoopUtils.encodeCredentials(credentials))
+          val renewDelegationTokenResp = 
client.RenewDelegationToken(renewDelegationTokenReq)
+          assert(renewDelegationTokenResp.getStatus.getStatusCode == 
TStatusCode.SUCCESS_STATUS)
+        }
+
+        // Send new credentials
+        val creds2 = createCredentials(currentTime + 1, 
hdfsTokenAlias.toString, uris)
+        // Kyuubi Server may have extra HDFS and Hive delegation tokens
+        val extraHDFSToken = newHDFSToken(currentTime + 1)
+        creds2.addToken(new Text("HDFS2"), extraHDFSToken)
+        sendCredentials(client, creds2)
+        // SparkSQLEngine's tokens should be updated
+        var engineCredentials =
+          
KyuubiHadoopUtils.getCredentialsInternal(UserGroupInformation.getCurrentUser)
+        assert(engineCredentials.getToken(hdfsTokenAlias) == 
creds2.getToken(hdfsTokenAlias))
+        assert(
+          engineCredentials.getToken(hiveTokenAlias) == creds2.getToken(new 
Text(metastoreUris)))
+        // Unknown tokens should not be updated
+        assert(engineCredentials.getToken(unknownTokenAlias) == unknownToken)
+
+        // Send old credentials
+        val creds3 = createCredentials(currentTime, hdfsTokenAlias.toString, 
metastoreUris)
+        sendCredentials(client, creds3)
+        // SparkSQLEngine's tokens should not be updated
+        engineCredentials =
+          
KyuubiHadoopUtils.getCredentialsInternal(UserGroupInformation.getCurrentUser)
+        assert(engineCredentials.getToken(hdfsTokenAlias) == 
creds2.getToken(hdfsTokenAlias))
+        assert(
+          engineCredentials.getToken(hiveTokenAlias) == creds2.getToken(new 
Text(metastoreUris)))
+
+        // No matching tokens
+        val creds4 = createCredentials(currentTime + 2, "HDFS2", 
"thrift://localhost:9085")
+        sendCredentials(client, creds4)
+        // No token is updated
+        engineCredentials =
+          
KyuubiHadoopUtils.getCredentialsInternal(UserGroupInformation.getCurrentUser)
+        assert(engineCredentials.getToken(hdfsTokenAlias) == 
creds2.getToken(hdfsTokenAlias))
+        assert(
+          engineCredentials.getToken(hiveTokenAlias) == creds2.getToken(new 
Text(metastoreUris)))
+      }
+    }
+  }
+
+  private def whenMetaStoreURIsSetTo(uris: String)(func: String => Unit): Unit 
= {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val origin = conf.get("hive.metastore.uris", "")
+    conf.set("hive.metastore.uris", uris)
+    try func.apply(uris) finally {
+      conf.set("hive.metastore.uris", origin)
+    }
+  }
+
+
+  private def createCredentials(
+      issueDate: Long,
+      hdfsTokenAlias: String,
+      hiveTokenAlias: String): Credentials = {
+    val credentials = new Credentials()
+    credentials.addToken(new Text(hdfsTokenAlias), newHDFSToken(issueDate))
+    credentials.addToken(new Text(hiveTokenAlias), newHiveToken(issueDate))
+    credentials
+  }
+
+  private def newHDFSToken(issueDate: Long): Token[TokenIdentifier] = {
+    val who = new Text("who")
+    val tokenId = new HDFSTokenIdent(who, who, who)
+    tokenId.setIssueDate(issueDate)
+    newToken(tokenId)
+  }
+
+  private def newHiveToken(issueDate: Long): Token[TokenIdentifier] = {
+    val who = new Text("who")
+    val tokenId = new HiveTokenIdent(who, who, who)
+    tokenId.setIssueDate(issueDate)
+    newToken(tokenId)
+  }
+
+  private def newToken(tokeIdent: TokenIdentifier): Token[TokenIdentifier] = {
+    val token = new Token[TokenIdentifier]
+    token.setID(tokeIdent.getBytes)
+    token.setKind(tokeIdent.getKind)
+    val bytes = new Array[Byte](128)
+    Random.nextBytes(bytes)
+    token.setPassword(bytes)
+    token
+  }
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 7c50821..c2480ef 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -236,7 +236,7 @@ object KyuubiConf {
 
   val CREDENTIALS_RENEWAL_INTERVAL: ConfigEntry[Long] =
     buildConf("credentials.renewal.interval")
-      .doc("How often Kyuubi renews one user's DelegationTokens")
+      .doc("How often Kyuubi renews one user's delegation tokens")
       .version("1.4.0")
       .timeConf
       .createWithDefault(Duration.ofHours(1).toMillis)
@@ -251,7 +251,7 @@ object KyuubiConf {
 
   val CREDENTIALS_HADOOP_FS_ENABLED: ConfigEntry[Boolean] =
     buildConf("credentials.hadoopfs.enabled")
-      .doc("Whether to renew HadoopFS DelegationToken")
+      .doc("Whether to renew Hadoop filesystem delegation tokens")
       .version("1.4.0")
       .booleanConf
       .createWithDefault(true)
@@ -267,7 +267,7 @@ object KyuubiConf {
 
   val CREDENTIALS_HIVE_ENABLED: ConfigEntry[Boolean] =
     buildConf("credentials.hive.enabled")
-      .doc("Whether to renew HiveMetaStore DelegationToken")
+      .doc("Whether to renew Hive metastore delegation token")
       .version("1.4.0")
       .booleanConf
       .createWithDefault(true)
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftFrontendService.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftFrontendService.scala
index dbe2621..b59f98a 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftFrontendService.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftFrontendService.scala
@@ -37,7 +37,7 @@ import 
org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
 import org.apache.kyuubi.session.SessionHandle
 import org.apache.kyuubi.util.{ExecutorPoolCaptureOom, KyuubiHadoopUtils, 
NamedThreadFactory}
 
-class ThriftFrontendService private(name: String, be: BackendService, oomHook: 
Runnable)
+class ThriftFrontendService protected(name: String, be: BackendService, 
oomHook: Runnable)
   extends AbstractFrontendService(name, be) with TCLIService.Iface with 
Runnable with Logging {
 
   import ThriftFrontendService._
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index b8fb967..79efb4f 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -18,15 +18,36 @@
 package org.apache.kyuubi.util
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream}
+import java.util.{Map => JMap}
+import javax.security.auth.Subject
+
+import scala.collection.JavaConverters._
 
 import org.apache.commons.codec.binary.Base64
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.{Credentials, SecurityUtil}
+import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, SecurityUtil, 
UserGroupInformation}
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 
 import org.apache.kyuubi.config.KyuubiConf
 
 object KyuubiHadoopUtils {
 
+  private val subjectField =
+    classOf[UserGroupInformation].getDeclaredField("subject")
+  subjectField.setAccessible(true)
+
+  private val getCredentialsInternalMethod =
+    classOf[UserGroupInformation].getDeclaredMethod(
+      "getCredentialsInternal",
+      Array.empty[Class[_]]: _*)
+  getCredentialsInternalMethod.setAccessible(true)
+
+  private val tokenMapField =
+    classOf[Credentials].getDeclaredField("tokenMap")
+  tokenMapField.setAccessible(true)
+
   def newHadoopConf(conf: KyuubiConf): Configuration = {
     val hadoopConf = new Configuration()
     conf.getAll.foreach { case (k, v) => hadoopConf.set(k, v) }
@@ -55,4 +76,37 @@ object KyuubiHadoopUtils {
     creds
   }
 
+  /**
+   * Get all tokens in [[UserGroupInformation#subject]] including
+   * [[org.apache.hadoop.security.token.Token.PrivateToken]] as
+   * [[UserGroupInformation#getCredentials]] returned Credentials do not 
contain
+   * [[org.apache.hadoop.security.token.Token.PrivateToken]].
+   */
+  def getCredentialsInternal(ugi: UserGroupInformation): Credentials = {
+    // Synchronize to avoid credentials being written while cloning credentials
+    subjectField.get(ugi).asInstanceOf[Subject] synchronized {
+      new 
Credentials(getCredentialsInternalMethod.invoke(ugi).asInstanceOf[Credentials])
+    }
+  }
+
+  /**
+   * Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]] 
is not present before
+   * Hadoop 3.2.1.
+   */
+  def getTokenMap(credentials: Credentials): Map[Text, Token[_ <: 
TokenIdentifier]] = {
+    tokenMapField.get(credentials)
+      .asInstanceOf[JMap[Text, Token[_ <: TokenIdentifier]]]
+      .asScala
+      .toMap
+  }
+
+  def getTokenIssueDate(token: Token[_ <: TokenIdentifier]): Long = {
+    // It is safe to deserialize any token identifier to hdfs 
`DelegationTokenIdentifier`
+    // as all token identifiers have the same binary format.
+    val tokenIdentifier = new DelegationTokenIdentifier
+    val buf = new ByteArrayInputStream(token.getIdentifier)
+    val in = new DataInputStream(buf)
+    tokenIdentifier.readFields(in)
+    tokenIdentifier.getIssueDate
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 6a386dd..de0b278 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -203,4 +203,14 @@ class KyuubiSyncThriftClient(protocol: TProtocol) extends 
TCLIService.Client(pro
     ThriftUtils.verifyTStatus(resp.getStatus)
     resp.getResults
   }
+
+  def sendCredentials(encodedCredentials: String): Unit = {
+    // We hacked `TCLIService.Iface.RenewDelegationToken` to transfer 
Credentials to Spark SQL
+    // engine
+    val req = new TRenewDelegationTokenReq()
+    req.setSessionHandle(_remoteSessionHandle)
+    req.setDelegationToken(encodedCredentials)
+    val resp = RenewDelegationToken(req)
+    ThriftUtils.verifyTStatus(resp.getStatus)
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index 5d01790..09ddf01 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -147,6 +147,10 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
       sessionId: String,
       appUser: String,
       send: String => Unit): Unit = {
+    if (renewalExecutor.isEmpty) {
+      return
+    }
+
     val userRef = getOrCreateUserCredentialsRef(appUser)
     val sessionEpoch = getSessionCredentialsEpoch(sessionId)
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 61ef636..20bb6db 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -85,7 +85,7 @@ private[kyuubi] class EngineRef(
   }
 
   // Launcher of the engine
-  private val appUser: String = shareLevel match {
+  private[kyuubi] val appUser: String = shareLevel match {
     case SERVER => Utils.currentUser
     case _ => user
   }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 779b4ad..1f731bc 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -19,9 +19,7 @@ package org.apache.kyuubi.operation
 
 import scala.collection.JavaConverters._
 
-import org.apache.hive.service.rpc.thrift.TFetchOrientation
-import org.apache.hive.service.rpc.thrift.TFetchResultsReq
-import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq
+import org.apache.hive.service.rpc.thrift.{TFetchOrientation, 
TFetchResultsReq, TGetOperationStatusReq}
 import org.apache.hive.service.rpc.thrift.TOperationState._
 
 import org.apache.kyuubi.KyuubiSQLException
@@ -32,7 +30,7 @@ import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.server.EventLoggingService
-import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, 
Session}
 
 class ExecuteStatement(
     session: Session,
@@ -120,11 +118,21 @@ class ExecuteStatement(
           val ke = KyuubiSQLException(s"UNKNOWN STATE for $statement")
           setOperationException(ke)
       }
+      sendCredentialsIfNeeded()
     }
     // see if anymore log could be fetched
     fetchQueryLog()
   } catch onError()
 
+  private def sendCredentialsIfNeeded(): Unit = {
+    val appUser = session.asInstanceOf[KyuubiSessionImpl].engine.appUser
+    val sessionManager = 
session.sessionManager.asInstanceOf[KyuubiSessionManager]
+    sessionManager.credentialsManager.sendCredentialsIfNeeded(
+      session.handle.identifier.toString,
+      appUser,
+      client.sendCredentials)
+  }
+
   private def fetchQueryLog(): Unit = {
     getOperationLog.foreach { logger =>
       try {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 9f0067c..cb8bf70 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -53,7 +53,8 @@ class KyuubiSessionImpl(
     case (key, value) => sessionConf.set(key, value)
   }
 
-  private val engine: EngineRef = new EngineRef(sessionConf, user)
+  val engine: EngineRef = new EngineRef(sessionConf, user)
+
   private val sessionEvent = KyuubiSessionEvent(this)
   EventLoggingService.onEvent(sessionEvent)
 

Reply via email to