This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f8056f4 [KYUUBI #1007] Implement delegation token renewal framework
f8056f4 is described below
commit f8056f458bb943f2fbd022039caf1d3d86aa144f
Author: zhouyifan279 <[email protected]>
AuthorDate: Fri Sep 3 18:40:30 2021 +0800
[KYUUBI #1007] Implement delegation token renewal framework
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
This PR finishes sub-task #1007 under umbrella issue #913.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1015 from zhouyifan279/KYUUBI#1007.
Closes #1007
94263804 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal
framework
6cea96d0 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal
framework
dbffcf15 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal
framework
57100271 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal
framework
Authored-by: zhouyifan279 <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
docs/deployment/settings.md | 8 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 14 ++
.../org/apache/kyuubi/util/KyuubiHadoopUtils.scala | 24 +-
.../kyuubi/util/KyuubiHadoopUtilsSuite.scala | 24 ++
.../apache/kyuubi/credentials/CredentialsRef.scala | 35 ++-
.../credentials/HadoopCredentialsManager.scala | 261 +++++++++++++++++++++
.../credentials/HadoopDelegationProvider.scala | 53 +++++
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 1 +
.../kyuubi/session/KyuubiSessionManager.scala | 3 +
...yuubi.credentials.HadoopDelegationTokenProvider | 20 ++
.../HadoopCredentialsManagerSuite.scala | 222 ++++++++++++++++++
11 files changed, 653 insertions(+), 12 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index c5b488f..edc121b 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -147,6 +147,14 @@ kyuubi\.backend\.server<br>\.exec\.pool\.size|<div
style='width: 65pt;word-wrap:
kyuubi\.backend\.server<br>\.exec\.pool\.wait\.queue<br>\.size|<div
style='width: 65pt;word-wrap: break-word;white-space: normal'>100</div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>Size of the wait
queue for the operation execution thread pool of Kyuubi server</div>|<div
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
+### Credentials
+
+Key | Default | Meaning | Type | Since
+--- | --- | --- | --- | ---
+kyuubi\.credentials<br>\.renewal\.interval|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT1H</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>How often Kyuubi renews one user's
DelegationTokens</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.4.0</div>
+kyuubi\.credentials<br>\.renewal\.retryWait|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>How long to wait before retrying to fetch new
credentials after a failure.</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.4.0</div>
+
+
### Delegation
Key | Default | Meaning | Type | Since
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 28611cd..ad7b17e 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -232,6 +232,20 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofHours(3).toMillis)
+ val CREDENTIALS_RENEWAL_INTERVAL: ConfigEntry[Long] =
+ buildConf("credentials.renewal.interval")
+ .doc("How often Kyuubi renews one user's DelegationTokens")
+ .version("1.4.0")
+ .timeConf
+ .createWithDefault(Duration.ofHours(1).toMillis)
+
+ val CREDENTIALS_RENEWAL_RETRY_WAIT: ConfigEntry[Long] =
+ buildConf("credentials.renewal.retryWait")
+ .doc("How long to wait before retrying to fetch new credentials after a
failure.")
+ .version("1.4.0")
+ .timeConf
+ .checkValue(t => t > 0, "must be positive integer")
+ .createWithDefault(Duration.ofMinutes(1).toMillis)
/////////////////////////////////////////////////////////////////////////////////////////////////
// Frontend Service Configuration
//
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index e6a009b..b8fb967 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -17,8 +17,11 @@
package org.apache.kyuubi.util
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream,
DataOutputStream}
+
+import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.SecurityUtil
+import org.apache.hadoop.security.{Credentials, SecurityUtil}
import org.apache.kyuubi.config.KyuubiConf
@@ -33,4 +36,23 @@ object KyuubiHadoopUtils {
def getServerPrincipal(principal: String): String = {
SecurityUtil.getServerPrincipal(principal, "0.0.0.0")
}
+
+ def encodeCredentials(creds: Credentials): String = {
+ val byteStream = new ByteArrayOutputStream
+ creds.writeTokenStorageToStream(new DataOutputStream(byteStream))
+
+ val encoder = new Base64(0, null, false)
+ encoder.encodeToString(byteStream.toByteArray)
+ }
+
+ def decodeCredentials(newValue: String): Credentials = {
+ val decoder = new Base64(0, null, false)
+ val decoded = decoder.decode(newValue)
+
+ val byteStream = new ByteArrayInputStream(decoded)
+ val creds = new Credentials()
+ creds.readTokenStorageStream(new DataInputStream(byteStream))
+ creds
+ }
+
}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala
index 03fc055..9a6d767 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala
@@ -17,8 +17,15 @@
package org.apache.kyuubi.util
+import scala.util.Random
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.Token
+
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.authentication.KyuubiDelegationTokenIdentifier
class KyuubiHadoopUtilsSuite extends KyuubiFunSuite {
@@ -35,4 +42,21 @@ class KyuubiHadoopUtilsSuite extends KyuubiFunSuite {
assert(hadoopConf.get(xyz) === "abc")
assert(hadoopConf.get(test) === "t")
}
+
+ test("encode/decode credentials") {
+ val identifier = new KyuubiDelegationTokenIdentifier()
+ val password = new Array[Byte](128)
+ Random.nextBytes(password)
+ val token = new Token[KyuubiDelegationTokenIdentifier](
+ identifier.getBytes,
+ password,
+ identifier.getKind,
+ new Text(""))
+ val credentials = new Credentials()
+ credentials.addToken(token.getKind, token)
+
+ val decoded = KyuubiHadoopUtils.decodeCredentials(
+ KyuubiHadoopUtils.encodeCredentials(credentials))
+ assert(decoded.getToken(token.getKind) ==
credentials.getToken(token.getKind))
+ }
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
similarity index 55%
copy from
kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
copy to
kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
index e6a009b..9ccf445 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
@@ -15,22 +15,35 @@
* limitations under the License.
*/
-package org.apache.kyuubi.util
+package org.apache.kyuubi.credentials
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.SecurityUtil
+import org.apache.hadoop.security.Credentials
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.credentials.CredentialsRef.UNSET_EPOCH
+import org.apache.kyuubi.util.KyuubiHadoopUtils
-object KyuubiHadoopUtils {
+class CredentialsRef(appUser: String) {
- def newHadoopConf(conf: KyuubiConf): Configuration = {
- val hadoopConf = new Configuration()
- conf.getAll.foreach { case (k, v) => hadoopConf.set(k, v) }
- hadoopConf
+ @volatile
+ private var epoch = UNSET_EPOCH
+
+ private var encodedCredentials: String = _
+
+ def getEpoch: Long = epoch
+
+ def getAppUser: String = appUser
+
+ def getEncodedCredentials: String = {
+ encodedCredentials
}
- def getServerPrincipal(principal: String): String = {
- SecurityUtil.getServerPrincipal(principal, "0.0.0.0")
+ def updateCredentials(creds: Credentials): Unit = {
+ encodedCredentials = KyuubiHadoopUtils.encodeCredentials(creds)
+ epoch += 1
}
+
+}
+
+object CredentialsRef {
+ val UNSET_EPOCH: Long = -1L
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
new file mode 100644
index 0000000..253778d
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import java.util.ServiceLoader
+import java.util.concurrent._
+
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.service.AbstractService
+import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
+
+/**
+ * [[HadoopCredentialsManager]] manages and renews delegation tokens, which
are used by SQL engines
+ * to access kerberos secured services.
+ *
+ * Delegation tokens are sent to SQL engines by calling
[[sendCredentialsIfNeeded]].
+ * [[sendCredentialsIfNeeded]] executes the following steps:
+ * <ol>
+ * <li>
+ * Get or create a cached [[CredentialsRef]](contains delegation tokens)
object by key
+ * appUser. If [[CredentialsRef]] is newly created, spawn a scheduled task
to renew the
+ * delegation tokens.
+ * </li>
+ * <li>
+ * Get or create a cached session credentials epoch object by key sessionId.
+ * </li>
+ * <li>
+ * Compare [[CredentialsRef]] epoch with session credentials epoch. (Both
epochs are set
+ * to -1 when created. [[CredentialsRef]] epoch is increased when delegation
tokens are
+ * renewed.)
+ * </li>
+ * <li>
+ * If epochs are equal, return. Else, send delegation tokens to the SQL
engine.
+ * </li>
+ * <li>
+ * If sending succeeds, set session credentials epoch to [[CredentialsRef]]
epoch. Else,
+ * record the exception and return.
+ * </li>
+ * </ol>
+ *
+ * @note
+ * <ol>
+ * <li>
+ * Session credentials epochs are created in session scope and should be
removed using
+ * [[removeSessionCredentialsEpoch]] when session closes.
+ * </li>
+ * <li>
+ * [[HadoopCredentialsManager]] does not renew and send credentials if no
provider is left after
+ * initialize.
+ * </li>
+ * </ol>
+ */
+class HadoopCredentialsManager private (name: String) extends
AbstractService(name)
+ with Logging {
+
+ def this() = this(classOf[HadoopCredentialsManager].getSimpleName)
+
+ private val userCredentialsRefMap = new ConcurrentHashMap[String,
CredentialsRef]()
+ private val sessionCredentialsEpochMap = new ConcurrentHashMap[String,
Long]()
+
+ private var providers: Map[String, HadoopDelegationTokenProvider] = _
+ private var renewalInterval: Long = _
+ private var renewalRetryWait: Long = _
+ private var hadoopConf: Configuration = _
+
+ private[credentials] var renewalExecutor: Option[ScheduledExecutorService] =
None
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
+ providers = HadoopCredentialsManager.loadProviders(conf)
+ .filter { case (_, provider) =>
+ val required = provider.delegationTokensRequired(hadoopConf, conf)
+ if (!required) {
+ warn(s"Service ${provider.serviceName} does not require a token." +
+ s" Check your configuration to see if security is disabled or
not.")
+ }
+ required
+ }
+
+ if (providers.isEmpty) {
+ warn("No delegation token is required by services.")
+ } else {
+ info("Using the following builtin delegation token providers: " +
+ s"${providers.keys.mkString(", ")}.")
+ }
+
+ renewalInterval = conf.get(CREDENTIALS_RENEWAL_INTERVAL)
+ renewalRetryWait = conf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)
+ super.initialize(conf)
+ }
+
+ override def start(): Unit = {
+ if (providers.nonEmpty) {
+ renewalExecutor =
+ Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("Delegation
Token Renewal Thread"))
+ }
+ super.start()
+ }
+
+ override def stop(): Unit = {
+ renewalExecutor.foreach { executor =>
+ executor.shutdownNow()
+ try {
+ executor.awaitTermination(10, TimeUnit.SECONDS)
+ } catch {
+ case _: InterruptedException =>
+ }
+ }
+ super.stop()
+ }
+
+ /**
+ * Send credentials to SQL engine which the specified session is talking to
if
+ * [[HadoopCredentialsManager]] has a newer credentials.
+ *
+ * @param sessionId Specify the session which is talking with SQL engine
+ * @param appUser User identity that the SQL engine uses.
+ * @param send Function to send encoded credentials to SQL engine
+ */
+ def sendCredentialsIfNeeded(
+ sessionId: String,
+ appUser: String,
+ send: String => Unit): Unit = {
+ val userRef = getOrCreateUserCredentialsRef(appUser)
+ val sessionEpoch = getSessionCredentialsEpoch(sessionId)
+
+ if (userRef.getEpoch > sessionEpoch) {
+ val currentEpoch = userRef.getEpoch
+ val currentCreds = userRef.getEncodedCredentials
+ info(s"Send new credentials with epoch $currentEpoch to SQL engine
through session " +
+ s"$sessionId")
+ Try(send(currentCreds)) match {
+ case Success(_) =>
+ info(s"Update session credentials epoch from $sessionEpoch to
$currentEpoch")
+ sessionCredentialsEpochMap.put(sessionId, currentEpoch)
+ case Failure(exception) =>
+ warn(
+ s"Failed to send new credentials to SQL engine through session
$sessionId",
+ exception)
+ }
+ }
+ }
+
+ /**
+ * Remove session credentials epoch corresponding to `sessionId`.
+ *
+ * @param sessionId KyuubiSession id
+ */
+ def removeSessionCredentialsEpoch(sessionId: String): Unit = {
+ sessionCredentialsEpochMap.remove(sessionId)
+ }
+
+ // Visible for testing.
+ private[credentials] def getOrCreateUserCredentialsRef(appUser: String):
CredentialsRef =
+ userCredentialsRefMap.computeIfAbsent(
+ appUser,
+ appUser => {
+ val ref = new CredentialsRef(appUser)
+ scheduleRenewal(ref, 0)
+ info(s"Created CredentialsRef for user $appUser and scheduled a
renewal task")
+ ref
+ })
+
+ // Visible for testing.
+ private[credentials] def getSessionCredentialsEpoch(sessionId: String): Long
= {
+ sessionCredentialsEpochMap.getOrDefault(sessionId,
CredentialsRef.UNSET_EPOCH)
+ }
+
+ // Visible for testing.
+ private[credentials] def containsProvider(serviceName: String): Boolean = {
+ providers.contains(serviceName)
+ }
+
+ private def scheduleRenewal(userRef: CredentialsRef, delay: Long): Unit = {
+ val renewalTask = new Runnable {
+ override def run(): Unit = {
+ try {
+ val creds = new Credentials()
+ providers.values
+ .foreach(_.obtainDelegationTokens(hadoopConf, conf,
userRef.getAppUser, creds))
+ userRef.updateCredentials(creds)
+ scheduleRenewal(userRef, renewalInterval)
+ } catch {
+ case _: InterruptedException =>
+ // Server is shutting down
+ case e: Exception =>
+ warn(
+ s"Failed to update tokens for ${userRef.getAppUser}, try again
in" +
+ s" $renewalRetryWait ms",
+ e)
+ scheduleRenewal(userRef, renewalRetryWait)
+ }
+ }
+ }
+
+ renewalExecutor.foreach { executor =>
+ info(s"Scheduling renewal in $delay ms.")
+ executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
+ }
+ }
+
+}
+
+object HadoopCredentialsManager extends Logging {
+
+ private val providerEnabledConfig = "kyuubi.credentials.%s.enabled"
+
+ def loadProviders(kyuubiConf: KyuubiConf): Map[String,
HadoopDelegationTokenProvider] = {
+ val loader =
+ ServiceLoader.load(classOf[HadoopDelegationTokenProvider],
getClass.getClassLoader)
+ val providers = mutable.ArrayBuffer[HadoopDelegationTokenProvider]()
+
+ val iterator = loader.iterator
+ while (iterator.hasNext) {
+ try {
+ providers += iterator.next
+ } catch {
+ case t: Throwable =>
+ warn(s"Failed to load built in provider.", t)
+ }
+ }
+
+ // Filter out providers for which kyuubi.credentials.{service}.enabled is
false.
+ providers
+ .filter { p => HadoopCredentialsManager.isServiceEnabled(kyuubiConf,
p.serviceName) }
+ .map { p => (p.serviceName, p) }
+ .toMap
+ }
+
+ def isServiceEnabled(kyuubiConf: KyuubiConf, serviceName: String): Boolean =
{
+ val key = providerEnabledConfig.format(serviceName)
+ kyuubiConf
+ .getOption(key)
+ .map(_.toBoolean)
+ .getOrElse(true)
+ }
+
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
new file mode 100644
index 0000000..cba5761
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+
+trait HadoopDelegationTokenProvider extends Logging {
+
+ /**
+ * Name of the service to provide delegation tokens. This name should be
unique. Kyuubi will
+ * internally use this name to differentiate delegation token providers.
+ */
+ def serviceName: String
+
+ /**
+ * Returns true if delegation tokens are required for this service. By
default, it is based on
+ * whether Hadoop security is enabled.
+ */
+ def delegationTokensRequired(hadoopConf: Configuration, kyuubiConf:
KyuubiConf): Boolean
+
+ /**
+ * Obtain delegation tokens for this service.
+ *
+ * @param hadoopConf Configuration of current Hadoop Compatible system.
+ * @param owner DelegationToken owner.
+ * @param creds Credentials to add tokens and security keys to.
+ */
+ def obtainDelegationTokens(
+ hadoopConf: Configuration,
+ kyuubiConf: KyuubiConf,
+ owner: String,
+ creds: Credentials): Unit
+
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 8d416e6..973e4c2 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -94,6 +94,7 @@ class KyuubiSessionImpl(
override def close(): Unit = {
super.close()
sessionManager.operationManager.removeConnection(handle)
+
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
try {
if (client != null) client.closeSession()
} catch {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index bd2aeab..760c57e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -22,6 +22,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.credentials.HadoopCredentialsManager
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.KyuubiOperationManager
@@ -31,8 +32,10 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
def this() = this(classOf[KyuubiSessionManager].getSimpleName)
val operationManager = new KyuubiOperationManager()
+ val credentialsManager = new HadoopCredentialsManager()
override def initialize(conf: KyuubiConf): Unit = {
+ addService(credentialsManager)
super.initialize(conf)
}
diff --git
a/kyuubi-server/src/test/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
b/kyuubi-server/src/test/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
new file mode 100644
index 0000000..4064562
--- /dev/null
+++
b/kyuubi-server/src/test/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.kyuubi.credentials.ExceptionThrowingDelegationTokenProvider
+org.apache.kyuubi.credentials.UnRequiredDelegationTokenProvider
+org.apache.kyuubi.credentials.UnstableDelegationTokenProvider
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
new file mode 100644
index 0000000..f1a30e0
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import java.io.IOException
+import java.util.UUID
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+
+class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
+
+ private val sessionId = UUID.randomUUID().toString
+ private val appUser = "who"
+ private val send = (_: String) => {}
+
+ private def withStartedManager(kyuubiConf: KyuubiConf)(f:
HadoopCredentialsManager => Unit)
+ : Unit = {
+ val manager = new HadoopCredentialsManager()
+ manager.initialize(kyuubiConf)
+ manager.start()
+
+ try f(manager)
+ finally manager.stop()
+ }
+
+ test("load default providers") {
+ ExceptionThrowingDelegationTokenProvider.constructed = false
+ val providers = HadoopCredentialsManager.loadProviders(new
KyuubiConf(false))
+ assert(providers.contains("unstable"))
+ assert(providers.contains("unrequired"))
+ // This checks that providers are loaded independently and they have no
effect on each other
+ assert(ExceptionThrowingDelegationTokenProvider.constructed)
+ assert(!providers.contains("throw"))
+ }
+
+ test("disable a provider") {
+ val kyuubiConf =
+ new KyuubiConf(false)
+ .set("kyuubi.credentials.unstable.enabled", "false")
+ val providers = HadoopCredentialsManager.loadProviders(kyuubiConf)
+ assert(!providers.contains("unstable"))
+ }
+
+ test("filter providers when initialize") {
+ // Filter out providers if `delegationTokensRequired` returns false.
+ val manager = new HadoopCredentialsManager()
+ manager.initialize(new KyuubiConf(false))
+ assert(!manager.containsProvider("unrequired"))
+ }
+
+ test("no provider left after initialize") {
+ val kyuubiConf =
+ new KyuubiConf(false)
+ .set("kyuubi.credentials.unstable.enabled", "false")
+ withStartedManager(kyuubiConf) { manager =>
+ // All providers are filtered out either because of being disabled or
+ // because does not require a token
+ assert(manager.renewalExecutor.isEmpty)
+ }
+ }
+
+ test("schedule credentials renewal") {
+ val kyuubiConf = new KyuubiConf(false)
+ .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
+ withStartedManager(kyuubiConf) { manager =>
+ val userRef = manager.getOrCreateUserCredentialsRef(appUser)
+ // Tolerate 100 ms delay
+ eventually(timeout(1100.milliseconds), interval(100.milliseconds)) {
+ assert(userRef.getEpoch == 1)
+ }
+ }
+ }
+
+ test("schedule credentials renewal retry when failed") {
+ val kyuubiConf = new KyuubiConf(false)
+ .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
+ .set(KyuubiConf.CREDENTIALS_RENEWAL_RETRY_WAIT, 1000L)
+ withStartedManager(kyuubiConf) { manager =>
+ try {
+ UnstableDelegationTokenProvider.throwException = true
+
+ val userRef = manager.getOrCreateUserCredentialsRef(appUser)
+ // Tolerate 100 ms delay
+ eventually(timeout(2100.milliseconds), interval(100.milliseconds)) {
+ // 1 scheduled call and 2 scheduled retrying call
+ assert(UnstableDelegationTokenProvider.exceptionCount == 3)
+ }
+ assert(userRef.getEpoch == CredentialsRef.UNSET_EPOCH)
+ } finally {
+ UnstableDelegationTokenProvider.throwException = false
+ }
+ }
+ }
+
+ test("send credentials if needed") {
+ val kyuubiConf = new KyuubiConf(false)
+ .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
+ withStartedManager(kyuubiConf) { manager =>
+ // Trigger UserCredentialsRef's initialization
+ val userRef = manager.getOrCreateUserCredentialsRef(appUser)
+ eventually(interval(100.milliseconds)) {
+ assert(userRef.getEpoch == 0)
+ }
+
+ manager.sendCredentialsIfNeeded(sessionId, appUser, send)
+
+ val sessionEpoch = manager.getSessionCredentialsEpoch(sessionId)
+ assert(sessionEpoch == userRef.getEpoch)
+ }
+ }
+
+ test("credentials sending failure") {
+ withStartedManager(new KyuubiConf(false)) { manager =>
+ // Trigger UserCredentialsRef's initialization
+ val userRef = manager.getOrCreateUserCredentialsRef(appUser)
+ eventually(interval(100.milliseconds)) {
+ assert(userRef.getEpoch == 0)
+ }
+
+ var called = false
+ manager.sendCredentialsIfNeeded(
+ sessionId,
+ appUser,
+ _ => {
+ called = true
+ throw new IOException
+ })
+
+ assert(called)
+ assert(manager.getSessionCredentialsEpoch(sessionId) ==
CredentialsRef.UNSET_EPOCH)
+ }
+ }
+}
+
+private class ExceptionThrowingDelegationTokenProvider extends
HadoopDelegationTokenProvider {
+ ExceptionThrowingDelegationTokenProvider.constructed = true
+ throw new IllegalArgumentException
+
+ override def serviceName: String = "throw"
+
+ override def delegationTokensRequired(
+ hadoopConf: Configuration,
+ kyuubiConf: KyuubiConf): Boolean = true
+
+ override def obtainDelegationTokens(
+ hadoopConf: Configuration,
+ kyuubiConf: KyuubiConf,
+ owner: String,
+ creds: Credentials): Unit = {}
+
+}
+
+private object ExceptionThrowingDelegationTokenProvider {
+ var constructed = false
+}
+
+private class UnRequiredDelegationTokenProvider extends
HadoopDelegationTokenProvider {
+
+ override def serviceName: String = "unrequired"
+
+ override def delegationTokensRequired(
+ hadoopConf: Configuration,
+ kyuubiConf: KyuubiConf): Boolean = false
+
+ override def obtainDelegationTokens(
+ hadoopConf: Configuration,
+ kyuubiConf: KyuubiConf,
+ owner: String,
+ creds: Credentials): Unit = {}
+
+}
+
+private class UnstableDelegationTokenProvider extends
HadoopDelegationTokenProvider {
+
+ override def serviceName: String = "unstable"
+
+ override def delegationTokensRequired(
+ hadoopConf: Configuration,
+ kyuubiConf: KyuubiConf): Boolean = true
+
+ override def obtainDelegationTokens(
+ hadoopConf: Configuration,
+ kyuubiConf: KyuubiConf,
+ owner: String,
+ creds: Credentials): Unit = {
+ if (UnstableDelegationTokenProvider.throwException) {
+ UnstableDelegationTokenProvider.exceptionCount += 1
+ throw new IllegalArgumentException
+ }
+ }
+
+}
+
+private object UnstableDelegationTokenProvider {
+
+ @volatile
+ var throwException: Boolean = false
+
+ @volatile
+ var exceptionCount = 0
+
+}