This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 6a01f65 [KYUUBI #916] Send DelegationTokens to SQL engines
6a01f65 is described below
commit 6a01f65d3ec418dd4b12ace1d30189edebb04c0c
Author: zhouyifan279 <[email protected]>
AuthorDate: Tue Sep 14 16:37:37 2021 +0800
[KYUUBI #916] Send DelegationTokens to SQL engines
### _Why are the changes needed?_
This PR finishes issue #916
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1074 from zhouyifan279/KYUUBI#916.
Closes #916
28176c7e [zhouyifan279] [KYUUBI #916] Send DelegationTokens to SQL engines
b6c931bd [zhouyifan279] [KYUUBI #916] Send DelegationTokens to SQL engines
Authored-by: zhouyifan279 <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
docs/deployment/settings.md | 6 +-
.../engine/spark/SparkSQLBackendService.scala | 2 +
.../kyuubi/engine/spark/SparkSQLEngine.scala | 4 +-
.../engine/spark/SparkThriftFrontendService.scala | 140 +++++++++++++++++++++
.../apache/spark/kyuubi/SparkContextHelper.scala | 24 +++-
.../spark/operation/SparkOperationSuite.scala | 119 ++++++++++++++++++
.../org/apache/kyuubi/config/KyuubiConf.scala | 6 +-
.../kyuubi/service/ThriftFrontendService.scala | 2 +-
.../org/apache/kyuubi/util/KyuubiHadoopUtils.scala | 56 ++++++++-
.../kyuubi/client/KyuubiSyncThriftClient.scala | 10 ++
.../credentials/HadoopCredentialsManager.scala | 4 +
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 2 +-
.../apache/kyuubi/operation/ExecuteStatement.scala | 16 ++-
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 3 +-
14 files changed, 377 insertions(+), 17 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index a3c3bcf..a77eaed 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -151,10 +151,10 @@
kyuubi\.backend\.server<br>\.exec\.pool\.wait\.queue<br>\.size|<div style='width
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
-kyuubi\.credentials<br>\.hadoopfs\.enabled|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Whether to renew HadoopFS
DelegationToken</div>|<div style='width: 30pt'>boolean</div>|<div style='width:
20pt'>1.4.0</div>
+kyuubi\.credentials<br>\.hadoopfs\.enabled|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Whether to renew Hadoop filesystem delegation
tokens</div>|<div style='width: 30pt'>boolean</div>|<div style='width:
20pt'>1.4.0</div>
kyuubi\.credentials<br>\.hadoopfs\.uris|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Extra Hadoop filesystem URIs for which to
request delegation tokens. The filesystem that hosts fs.defaultFS does not need
to be listed here.</div>|<div style='width: 30pt'>seq</div>|<div style='width:
20pt'>1.4.0</div>
-kyuubi\.credentials<br>\.hive\.enabled|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Whether to renew HiveMetaStore
DelegationToken</div>|<div style='width: 30pt'>boolean</div>|<div style='width:
20pt'>1.4.0</div>
-kyuubi\.credentials<br>\.renewal\.interval|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT1H</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>How often Kyuubi renews one user's
DelegationTokens</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.4.0</div>
+kyuubi\.credentials<br>\.hive\.enabled|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Whether to renew Hive metastore delegation
token</div>|<div style='width: 30pt'>boolean</div>|<div style='width:
20pt'>1.4.0</div>
+kyuubi\.credentials<br>\.renewal\.interval|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT1H</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>How often Kyuubi renews one user's delegation
tokens</div>|<div style='width: 30pt'>duration</div>|<div style='width:
20pt'>1.4.0</div>
kyuubi\.credentials<br>\.renewal\.retryWait|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>How long to wait before retrying to fetch new
credentials after a failure.</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.4.0</div>
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala
index fb43e64..691631d 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala
@@ -38,4 +38,6 @@ class SparkSQLBackendService(name: String, spark:
SparkSession)
def this(spark: SparkSession) =
this(classOf[SparkSQLBackendService].getSimpleName, spark)
override val sessionManager: SessionManager = new
SparkSQLSessionManager(spark)
+
+ def sparkSession: SparkSession = spark
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index a04a7d1..c880900 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -33,7 +33,7 @@ import
org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EventLoggingService}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies,
ServiceDiscovery}
-import org.apache.kyuubi.service.{Serverable, Service, ServiceState,
ThriftFrontendService}
+import org.apache.kyuubi.service.{Serverable, Service, ServiceState}
import org.apache.kyuubi.util.SignalRegister
case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngine") {
@@ -43,7 +43,7 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
private val OOMHook = new Runnable { override def run(): Unit = stop() }
private val eventLogging = new EventLoggingService(this)
override val backendService = new SparkSQLBackendService(spark)
- val frontendService = new ThriftFrontendService(backendService, OOMHook)
+ val frontendService = new SparkThriftFrontendService(backendService, OOMHook)
override val discoveryService: Service = new EngineServiceDiscovery(this)
override protected def supportsServiceDiscovery: Boolean = {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftFrontendService.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftFrontendService.scala
new file mode 100644
index 0000000..fb9b82a
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftFrontendService.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.hive.service.rpc.thrift.{TRenewDelegationTokenReq,
TRenewDelegationTokenResp}
+import org.apache.spark.kyuubi.SparkContextHelper
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.service.ThriftFrontendService
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+
+class SparkThriftFrontendService private (
+ name: String,
+ be: SparkSQLBackendService,
+ oomHook: Runnable)
+ extends ThriftFrontendService(name, be, oomHook) {
+ import SparkThriftFrontendService._
+
+ private val sc = be.sparkSession.sparkContext
+
+ def this(be: SparkSQLBackendService, oomHook: Runnable) = {
+ this(classOf[SparkThriftFrontendService].getSimpleName, be, oomHook)
+ }
+
+ override def RenewDelegationToken(req: TRenewDelegationTokenReq):
TRenewDelegationTokenResp = {
+ debug(req.toString)
+
+ // We hacked `TCLIService.Iface.RenewDelegationToken` to transfer
Credentials from Kyuubi
+ // Server to Spark SQL engine
+ val resp = new TRenewDelegationTokenResp()
+ try {
+ val newCreds =
KyuubiHadoopUtils.decodeCredentials(req.getDelegationToken)
+ val (hiveTokens, otherTokens) =
+ KyuubiHadoopUtils.getTokenMap(newCreds).partition(_._2.getKind ==
HIVE_DELEGATION_TOKEN)
+
+ val updateCreds = new Credentials()
+ val oldCreds =
KyuubiHadoopUtils.getCredentialsInternal(UserGroupInformation.getCurrentUser)
+ addHiveToken(hiveTokens, oldCreds, updateCreds)
+ addOtherTokens(otherTokens, oldCreds, updateCreds)
+ if (updateCreds.numberOfTokens() > 0) {
+ SparkContextHelper.updateDelegationTokens(sc, updateCreds)
+ }
+
+ resp.setStatus(ThriftFrontendService.OK_STATUS)
+ } catch {
+ case e: Exception =>
+ warn("Error renew delegation tokens: ", e)
+ resp.setStatus(KyuubiSQLException.toTStatus(e))
+ }
+ resp
+ }
+
+ private def addHiveToken(
+ newTokens: Map[Text, Token[_ <: TokenIdentifier]],
+ oldCreds: Credentials,
+ updateCreds: Credentials): Unit = {
+ val metastoreUris =
sc.hadoopConfiguration.getTrimmed("hive.metastore.uris", "")
+
+ // `HiveMetaStoreClient` selects the first token whose service is "" and
kind is
+ // "HIVE_DELEGATION_TOKEN" to authenticate.
+ val oldAliasAndToken = KyuubiHadoopUtils.getTokenMap(oldCreds)
+ .find { case (_, token) =>
+ token.getKind == HIVE_DELEGATION_TOKEN && token.getService == new
Text()
+ }
+
+ if (metastoreUris.nonEmpty && oldAliasAndToken.isDefined) {
+ // Each entry of `newTokens` is a <uris, token> pair for a metastore
cluster.
+ // If entry's uris and engine's metastore uris have at least 1 same uri,
we presume they
+ // represent the same metastore cluster.
+ val uriSet = metastoreUris.split(",").filter(_.nonEmpty).toSet
+ val newToken = newTokens
+ .find { case (uris, token) =>
+ val matched = uris.toString.split(",").exists(uriSet.contains) &&
+ token.getService == new Text()
+ if (!matched) {
+ debug(s"Filter out Hive token $token")
+ }
+ matched
+ }
+ .map(_._2)
+ newToken.foreach { token =>
+ if (KyuubiHadoopUtils.getTokenIssueDate(token) >
+ KyuubiHadoopUtils.getTokenIssueDate(oldAliasAndToken.get._2)) {
+ updateCreds.addToken(oldAliasAndToken.get._1, token)
+ } else {
+ warn(s"Ignore Hive token with earlier issue date: $token")
+ }
+ }
+ if (newToken.isEmpty) {
+ warn(s"No matching Hive token found for engine metastore uris
$metastoreUris")
+ }
+ } else if (metastoreUris.isEmpty) {
+ info(s"Ignore Hive token as engine metastore uris are empty")
+ } else {
+ info(s"Ignore Hive token as engine has not Hive token ever before")
+ }
+ }
+
+ private def addOtherTokens(
+ tokens: Map[Text, Token[_ <: TokenIdentifier]],
+ oldCreds: Credentials,
+ updateCreds: Credentials): Unit = {
+ tokens.foreach { case (alias, newToken) =>
+ val oldToken = oldCreds.getToken(alias)
+ if (oldToken != null) {
+ if (KyuubiHadoopUtils.getTokenIssueDate(newToken) >
+ KyuubiHadoopUtils.getTokenIssueDate(oldToken)) {
+ updateCreds.addToken(alias, newToken)
+ } else {
+ warn(s"Ignore token with earlier issue date: $newToken")
+ }
+ } else {
+ info(s"Ignore unknown token $newToken")
+ }
+ }
+ }
+}
+
+object SparkThriftFrontendService {
+
+ val HIVE_DELEGATION_TOKEN = new Text("HIVE_DELEGATION_TOKEN")
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
index 667bd98..337eccb 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
@@ -17,8 +17,15 @@
package org.apache.spark.kyuubi
+import org.apache.hadoop.security.Credentials
import org.apache.spark.SparkContext
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.SchedulerBackend
+import
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.local.LocalSchedulerBackend
+import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.spark.events.KyuubiSparkEvent
import org.apache.kyuubi.events.EventLogger
@@ -26,10 +33,25 @@ import org.apache.kyuubi.events.EventLogger
* A place to invoke non-public APIs of [[SparkContext]], anything to be added
here need to
* think twice
*/
-object SparkContextHelper {
+object SparkContextHelper extends Logging {
+
def createSparkHistoryLogger(sc: SparkContext):
EventLogger[KyuubiSparkEvent] = {
new SparkHistoryEventLogger(sc)
}
+
+ def updateDelegationTokens(sc: SparkContext, creds: Credentials): Unit = {
+ val bytes = SparkHadoopUtil.get.serialize(creds)
+ sc.schedulerBackend match {
+ case _: LocalSchedulerBackend =>
+ SparkHadoopUtil.get.addDelegationTokens(bytes, sc.conf)
+ case backend: CoarseGrainedSchedulerBackend =>
+ backend.driverEndpoint.send(UpdateDelegationTokens(bytes))
+ case backend: SchedulerBackend =>
+ warn(s"Failed to update delegation tokens due to unsupported
SchedulerBackend " +
+ s"${backend.getClass.getName}.")
+ }
+ }
+
}
/**
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index 5d6f4ac..347ab1a 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -22,11 +22,17 @@ import java.sql.{DatabaseMetaData, ResultSet,
SQLFeatureNotSupportedException}
import scala.collection.JavaConverters._
import scala.util.Random
+import
org.apache.hadoop.hdfs.security.token.delegation.{DelegationTokenIdentifier =>
HDFSTokenIdent}
+import org.apache.hadoop.hive.thrift.{DelegationTokenIdentifier =>
HiveTokenIdent}
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hive.common.util.HiveVersionInfo
import org.apache.hive.service.cli.HiveSQLException
import org.apache.hive.service.rpc.thrift._
import org.apache.hive.service.rpc.thrift.TCLIService.Iface
import org.apache.hive.service.rpc.thrift.TOperationState._
+import org.apache.spark.kyuubi.SparkContextHelper
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.types._
@@ -35,6 +41,7 @@ import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
import org.apache.kyuubi.operation.HiveJDBCTests
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.util.KyuubiHadoopUtils
class SparkOperationSuite extends WithSparkSQLEngine with HiveJDBCTests {
@@ -761,4 +768,116 @@ class SparkOperationSuite extends WithSparkSQLEngine with
HiveJDBCTests {
assert(tFetchResultsResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
}
}
+
+ test("send credentials by TRenewDelegationTokenReq") {
+ // Simulate a secured SparkSQLEngine's credentials
+ val currentTime = System.currentTimeMillis()
+ val hdfsTokenAlias = new Text("HDFS1")
+ val hiveTokenAlias = new Text("hive.server2.delegation.token")
+ val creds1 = createCredentials(currentTime, hdfsTokenAlias.toString,
hiveTokenAlias.toString)
+ // SparkSQLEngine may have token alias unknown to Kyuubi Server
+ val unknownTokenAlias = new Text("UNKNOWN")
+ val unknownToken = newHDFSToken(currentTime)
+ creds1.addToken(unknownTokenAlias, unknownToken)
+ SparkContextHelper.updateDelegationTokens(spark.sparkContext, creds1)
+
+ val metastoreUris = "thrift://localhost:9083,thrift://localhost:9084"
+
+ whenMetaStoreURIsSetTo(metastoreUris) { uris =>
+ withThriftClient { client =>
+ val req = new TOpenSessionReq()
+ req.setUsername("username")
+ req.setPassword("password")
+ val tOpenSessionResp = client.OpenSession(req)
+
+ def sendCredentials(client: TCLIService.Iface, credentials:
Credentials): Unit = {
+ val renewDelegationTokenReq = new TRenewDelegationTokenReq()
+
renewDelegationTokenReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
+ renewDelegationTokenReq.setDelegationToken(
+ KyuubiHadoopUtils.encodeCredentials(credentials))
+ val renewDelegationTokenResp =
client.RenewDelegationToken(renewDelegationTokenReq)
+ assert(renewDelegationTokenResp.getStatus.getStatusCode ==
TStatusCode.SUCCESS_STATUS)
+ }
+
+ // Send new credentials
+ val creds2 = createCredentials(currentTime + 1,
hdfsTokenAlias.toString, uris)
+ // Kyuubi Server may have extra HDFS and Hive delegation tokens
+ val extraHDFSToken = newHDFSToken(currentTime + 1)
+ creds2.addToken(new Text("HDFS2"), extraHDFSToken)
+ sendCredentials(client, creds2)
+ // SparkSQLEngine's tokens should be updated
+ var engineCredentials =
+
KyuubiHadoopUtils.getCredentialsInternal(UserGroupInformation.getCurrentUser)
+ assert(engineCredentials.getToken(hdfsTokenAlias) ==
creds2.getToken(hdfsTokenAlias))
+ assert(
+ engineCredentials.getToken(hiveTokenAlias) == creds2.getToken(new
Text(metastoreUris)))
+ // Unknown tokens should not be updated
+ assert(engineCredentials.getToken(unknownTokenAlias) == unknownToken)
+
+ // Send old credentials
+ val creds3 = createCredentials(currentTime, hdfsTokenAlias.toString,
metastoreUris)
+ sendCredentials(client, creds3)
+ // SparkSQLEngine's tokens should not be updated
+ engineCredentials =
+
KyuubiHadoopUtils.getCredentialsInternal(UserGroupInformation.getCurrentUser)
+ assert(engineCredentials.getToken(hdfsTokenAlias) ==
creds2.getToken(hdfsTokenAlias))
+ assert(
+ engineCredentials.getToken(hiveTokenAlias) == creds2.getToken(new
Text(metastoreUris)))
+
+ // No matching tokens
+ val creds4 = createCredentials(currentTime + 2, "HDFS2",
"thrift://localhost:9085")
+ sendCredentials(client, creds4)
+ // No token is updated
+ engineCredentials =
+
KyuubiHadoopUtils.getCredentialsInternal(UserGroupInformation.getCurrentUser)
+ assert(engineCredentials.getToken(hdfsTokenAlias) ==
creds2.getToken(hdfsTokenAlias))
+ assert(
+ engineCredentials.getToken(hiveTokenAlias) == creds2.getToken(new
Text(metastoreUris)))
+ }
+ }
+ }
+
+ private def whenMetaStoreURIsSetTo(uris: String)(func: String => Unit): Unit
= {
+ val conf = spark.sparkContext.hadoopConfiguration
+ val origin = conf.get("hive.metastore.uris", "")
+ conf.set("hive.metastore.uris", uris)
+ try func.apply(uris) finally {
+ conf.set("hive.metastore.uris", origin)
+ }
+ }
+
+
+ private def createCredentials(
+ issueDate: Long,
+ hdfsTokenAlias: String,
+ hiveTokenAlias: String): Credentials = {
+ val credentials = new Credentials()
+ credentials.addToken(new Text(hdfsTokenAlias), newHDFSToken(issueDate))
+ credentials.addToken(new Text(hiveTokenAlias), newHiveToken(issueDate))
+ credentials
+ }
+
+ private def newHDFSToken(issueDate: Long): Token[TokenIdentifier] = {
+ val who = new Text("who")
+ val tokenId = new HDFSTokenIdent(who, who, who)
+ tokenId.setIssueDate(issueDate)
+ newToken(tokenId)
+ }
+
+ private def newHiveToken(issueDate: Long): Token[TokenIdentifier] = {
+ val who = new Text("who")
+ val tokenId = new HiveTokenIdent(who, who, who)
+ tokenId.setIssueDate(issueDate)
+ newToken(tokenId)
+ }
+
+ private def newToken(tokeIdent: TokenIdentifier): Token[TokenIdentifier] = {
+ val token = new Token[TokenIdentifier]
+ token.setID(tokeIdent.getBytes)
+ token.setKind(tokeIdent.getKind)
+ val bytes = new Array[Byte](128)
+ Random.nextBytes(bytes)
+ token.setPassword(bytes)
+ token
+ }
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 7c50821..c2480ef 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -236,7 +236,7 @@ object KyuubiConf {
val CREDENTIALS_RENEWAL_INTERVAL: ConfigEntry[Long] =
buildConf("credentials.renewal.interval")
- .doc("How often Kyuubi renews one user's DelegationTokens")
+ .doc("How often Kyuubi renews one user's delegation tokens")
.version("1.4.0")
.timeConf
.createWithDefault(Duration.ofHours(1).toMillis)
@@ -251,7 +251,7 @@ object KyuubiConf {
val CREDENTIALS_HADOOP_FS_ENABLED: ConfigEntry[Boolean] =
buildConf("credentials.hadoopfs.enabled")
- .doc("Whether to renew HadoopFS DelegationToken")
+ .doc("Whether to renew Hadoop filesystem delegation tokens")
.version("1.4.0")
.booleanConf
.createWithDefault(true)
@@ -267,7 +267,7 @@ object KyuubiConf {
val CREDENTIALS_HIVE_ENABLED: ConfigEntry[Boolean] =
buildConf("credentials.hive.enabled")
- .doc("Whether to renew HiveMetaStore DelegationToken")
+ .doc("Whether to renew Hive metastore delegation token")
.version("1.4.0")
.booleanConf
.createWithDefault(true)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftFrontendService.scala
index dbe2621..b59f98a 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftFrontendService.scala
@@ -37,7 +37,7 @@ import
org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{ExecutorPoolCaptureOom, KyuubiHadoopUtils,
NamedThreadFactory}
-class ThriftFrontendService private(name: String, be: BackendService, oomHook:
Runnable)
+class ThriftFrontendService protected(name: String, be: BackendService,
oomHook: Runnable)
extends AbstractFrontendService(name, be) with TCLIService.Iface with
Runnable with Logging {
import ThriftFrontendService._
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index b8fb967..79efb4f 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -18,15 +18,36 @@
package org.apache.kyuubi.util
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream,
DataOutputStream}
+import java.util.{Map => JMap}
+import javax.security.auth.Subject
+
+import scala.collection.JavaConverters._
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.{Credentials, SecurityUtil}
+import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, SecurityUtil,
UserGroupInformation}
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.kyuubi.config.KyuubiConf
object KyuubiHadoopUtils {
+ private val subjectField =
+ classOf[UserGroupInformation].getDeclaredField("subject")
+ subjectField.setAccessible(true)
+
+ private val getCredentialsInternalMethod =
+ classOf[UserGroupInformation].getDeclaredMethod(
+ "getCredentialsInternal",
+ Array.empty[Class[_]]: _*)
+ getCredentialsInternalMethod.setAccessible(true)
+
+ private val tokenMapField =
+ classOf[Credentials].getDeclaredField("tokenMap")
+ tokenMapField.setAccessible(true)
+
def newHadoopConf(conf: KyuubiConf): Configuration = {
val hadoopConf = new Configuration()
conf.getAll.foreach { case (k, v) => hadoopConf.set(k, v) }
@@ -55,4 +76,37 @@ object KyuubiHadoopUtils {
creds
}
+ /**
+ * Get all tokens in [[UserGroupInformation#subject]] including
+ * [[org.apache.hadoop.security.token.Token.PrivateToken]] as
+ * [[UserGroupInformation#getCredentials]] returned Credentials do not
contain
+ * [[org.apache.hadoop.security.token.Token.PrivateToken]].
+ */
+ def getCredentialsInternal(ugi: UserGroupInformation): Credentials = {
+ // Synchronize to avoid credentials being written while cloning credentials
+ subjectField.get(ugi).asInstanceOf[Subject] synchronized {
+ new
Credentials(getCredentialsInternalMethod.invoke(ugi).asInstanceOf[Credentials])
+ }
+ }
+
+ /**
+ * Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]]
is not present before
+ * Hadoop 3.2.1.
+ */
+ def getTokenMap(credentials: Credentials): Map[Text, Token[_ <:
TokenIdentifier]] = {
+ tokenMapField.get(credentials)
+ .asInstanceOf[JMap[Text, Token[_ <: TokenIdentifier]]]
+ .asScala
+ .toMap
+ }
+
+ def getTokenIssueDate(token: Token[_ <: TokenIdentifier]): Long = {
+ // It is safe to deserialize any token identifier to hdfs
`DelegationTokenIdentifier`
+ // as all token identifiers have the same binary format.
+ val tokenIdentifier = new DelegationTokenIdentifier
+ val buf = new ByteArrayInputStream(token.getIdentifier)
+ val in = new DataInputStream(buf)
+ tokenIdentifier.readFields(in)
+ tokenIdentifier.getIssueDate
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 6a386dd..de0b278 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -203,4 +203,14 @@ class KyuubiSyncThriftClient(protocol: TProtocol) extends
TCLIService.Client(pro
ThriftUtils.verifyTStatus(resp.getStatus)
resp.getResults
}
+
+ def sendCredentials(encodedCredentials: String): Unit = {
+ // We hacked `TCLIService.Iface.RenewDelegationToken` to transfer
Credentials to Spark SQL
+ // engine
+ val req = new TRenewDelegationTokenReq()
+ req.setSessionHandle(_remoteSessionHandle)
+ req.setDelegationToken(encodedCredentials)
+ val resp = RenewDelegationToken(req)
+ ThriftUtils.verifyTStatus(resp.getStatus)
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index 5d01790..09ddf01 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -147,6 +147,10 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
sessionId: String,
appUser: String,
send: String => Unit): Unit = {
+ if (renewalExecutor.isEmpty) {
+ return
+ }
+
val userRef = getOrCreateUserCredentialsRef(appUser)
val sessionEpoch = getSessionCredentialsEpoch(sessionId)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 61ef636..20bb6db 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -85,7 +85,7 @@ private[kyuubi] class EngineRef(
}
// Launcher of the engine
- private val appUser: String = shareLevel match {
+ private[kyuubi] val appUser: String = shareLevel match {
case SERVER => Utils.currentUser
case _ => user
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 779b4ad..1f731bc 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -19,9 +19,7 @@ package org.apache.kyuubi.operation
import scala.collection.JavaConverters._
-import org.apache.hive.service.rpc.thrift.TFetchOrientation
-import org.apache.hive.service.rpc.thrift.TFetchResultsReq
-import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq
+import org.apache.hive.service.rpc.thrift.{TFetchOrientation,
TFetchResultsReq, TGetOperationStatusReq}
import org.apache.hive.service.rpc.thrift.TOperationState._
import org.apache.kyuubi.KyuubiSQLException
@@ -32,7 +30,7 @@ import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.server.EventLoggingService
-import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager,
Session}
class ExecuteStatement(
session: Session,
@@ -120,11 +118,21 @@ class ExecuteStatement(
val ke = KyuubiSQLException(s"UNKNOWN STATE for $statement")
setOperationException(ke)
}
+ sendCredentialsIfNeeded()
}
// see if anymore log could be fetched
fetchQueryLog()
} catch onError()
+ private def sendCredentialsIfNeeded(): Unit = {
+ val appUser = session.asInstanceOf[KyuubiSessionImpl].engine.appUser
+ val sessionManager =
session.sessionManager.asInstanceOf[KyuubiSessionManager]
+ sessionManager.credentialsManager.sendCredentialsIfNeeded(
+ session.handle.identifier.toString,
+ appUser,
+ client.sendCredentials)
+ }
+
private def fetchQueryLog(): Unit = {
getOperationLog.foreach { logger =>
try {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 9f0067c..cb8bf70 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -53,7 +53,8 @@ class KyuubiSessionImpl(
case (key, value) => sessionConf.set(key, value)
}
- private val engine: EngineRef = new EngineRef(sessionConf, user)
+ val engine: EngineRef = new EngineRef(sessionConf, user)
+
private val sessionEvent = KyuubiSessionEvent(this)
EventLoggingService.onEvent(sessionEvent)