This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f8056f4  [KYUUBI #1007] Implement delegation token renewal framework
f8056f4 is described below

commit f8056f458bb943f2fbd022039caf1d3d86aa144f
Author: zhouyifan279 <[email protected]>
AuthorDate: Fri Sep 3 18:40:30 2021 +0800

    [KYUUBI #1007] Implement delegation token renewal framework
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    This PR finishes sub-task #1007 under umbrella issue #913.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1015 from zhouyifan279/KYUUBI#1007.
    
    Closes #1007
    
    94263804 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal 
framework
    6cea96d0 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal 
framework
    dbffcf15 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal 
framework
    57100271 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal 
framework
    
    Authored-by: zhouyifan279 <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 docs/deployment/settings.md                        |   8 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  14 ++
 .../org/apache/kyuubi/util/KyuubiHadoopUtils.scala |  24 +-
 .../kyuubi/util/KyuubiHadoopUtilsSuite.scala       |  24 ++
 .../apache/kyuubi/credentials/CredentialsRef.scala |  35 ++-
 .../credentials/HadoopCredentialsManager.scala     | 261 +++++++++++++++++++++
 .../credentials/HadoopDelegationProvider.scala     |  53 +++++
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  |   1 +
 .../kyuubi/session/KyuubiSessionManager.scala      |   3 +
 ...yuubi.credentials.HadoopDelegationTokenProvider |  20 ++
 .../HadoopCredentialsManagerSuite.scala            | 222 ++++++++++++++++++
 11 files changed, 653 insertions(+), 12 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index c5b488f..edc121b 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -147,6 +147,14 @@ kyuubi\.backend\.server<br>\.exec\.pool\.size|<div 
style='width: 65pt;word-wrap:
 kyuubi\.backend\.server<br>\.exec\.pool\.wait\.queue<br>\.size|<div 
style='width: 65pt;word-wrap: break-word;white-space: normal'>100</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>Size of the wait 
queue for the operation execution thread pool of Kyuubi server</div>|<div 
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
 
 
+### Credentials
+
+Key | Default | Meaning | Type | Since
+--- | --- | --- | --- | ---
+kyuubi\.credentials<br>\.renewal\.interval|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT1H</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>How often Kyuubi renews one user's 
DelegationTokens</div>|<div style='width: 30pt'>duration</div>|<div 
style='width: 20pt'>1.4.0</div>
+kyuubi\.credentials<br>\.renewal\.retryWait|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>How long to wait before retrying to fetch new 
credentials after a failure.</div>|<div style='width: 30pt'>duration</div>|<div 
style='width: 20pt'>1.4.0</div>
+
+
 ### Delegation
 
 Key | Default | Meaning | Type | Since
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 28611cd..ad7b17e 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -232,6 +232,20 @@ object KyuubiConf {
     .timeConf
     .createWithDefault(Duration.ofHours(3).toMillis)
 
+  val CREDENTIALS_RENEWAL_INTERVAL: ConfigEntry[Long] =
+    buildConf("credentials.renewal.interval")
+      .doc("How often Kyuubi renews one user's DelegationTokens")
+      .version("1.4.0")
+      .timeConf
+      .createWithDefault(Duration.ofHours(1).toMillis)
+
+  val CREDENTIALS_RENEWAL_RETRY_WAIT: ConfigEntry[Long] =
+    buildConf("credentials.renewal.retryWait")
+      .doc("How long to wait before retrying to fetch new credentials after a 
failure.")
+      .version("1.4.0")
+      .timeConf
+      .checkValue(t => t > 0, "must be positive integer")
+      .createWithDefault(Duration.ofMinutes(1).toMillis)
 
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   //                              Frontend Service Configuration               
                  //
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index e6a009b..b8fb967 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -17,8 +17,11 @@
 
 package org.apache.kyuubi.util
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream}
+
+import org.apache.commons.codec.binary.Base64
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.SecurityUtil
+import org.apache.hadoop.security.{Credentials, SecurityUtil}
 
 import org.apache.kyuubi.config.KyuubiConf
 
@@ -33,4 +36,23 @@ object KyuubiHadoopUtils {
   def getServerPrincipal(principal: String): String = {
     SecurityUtil.getServerPrincipal(principal, "0.0.0.0")
   }
+
+  def encodeCredentials(creds: Credentials): String = {
+    val byteStream = new ByteArrayOutputStream
+    creds.writeTokenStorageToStream(new DataOutputStream(byteStream))
+
+    val encoder = new Base64(0, null, false)
+    encoder.encodeToString(byteStream.toByteArray)
+  }
+
+  def decodeCredentials(newValue: String): Credentials = {
+    val decoder = new Base64(0, null, false)
+    val decoded = decoder.decode(newValue)
+
+    val byteStream = new ByteArrayInputStream(decoded)
+    val creds = new Credentials()
+    creds.readTokenStorageStream(new DataInputStream(byteStream))
+    creds
+  }
+
 }
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala
index 03fc055..9a6d767 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala
@@ -17,8 +17,15 @@
 
 package org.apache.kyuubi.util
 
+import scala.util.Random
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.Token
+
 import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.authentication.KyuubiDelegationTokenIdentifier
 
 class KyuubiHadoopUtilsSuite extends KyuubiFunSuite {
 
@@ -35,4 +42,21 @@ class KyuubiHadoopUtilsSuite extends KyuubiFunSuite {
     assert(hadoopConf.get(xyz) === "abc")
     assert(hadoopConf.get(test) === "t")
   }
+
+  test("encode/decode credentials") {
+    val identifier = new KyuubiDelegationTokenIdentifier()
+    val password = new Array[Byte](128)
+    Random.nextBytes(password)
+    val token = new Token[KyuubiDelegationTokenIdentifier](
+      identifier.getBytes,
+      password,
+      identifier.getKind,
+      new Text(""))
+    val credentials = new Credentials()
+    credentials.addToken(token.getKind, token)
+
+    val decoded = KyuubiHadoopUtils.decodeCredentials(
+      KyuubiHadoopUtils.encodeCredentials(credentials))
+    assert(decoded.getToken(token.getKind) == 
credentials.getToken(token.getKind))
+  }
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
similarity index 55%
copy from 
kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
copy to 
kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
index e6a009b..9ccf445 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
@@ -15,22 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.util
+package org.apache.kyuubi.credentials
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.SecurityUtil
+import org.apache.hadoop.security.Credentials
 
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.credentials.CredentialsRef.UNSET_EPOCH
+import org.apache.kyuubi.util.KyuubiHadoopUtils
 
-object KyuubiHadoopUtils {
+class CredentialsRef(appUser: String) {
 
-  def newHadoopConf(conf: KyuubiConf): Configuration = {
-    val hadoopConf = new Configuration()
-    conf.getAll.foreach { case (k, v) => hadoopConf.set(k, v) }
-    hadoopConf
+  @volatile
+  private var epoch = UNSET_EPOCH
+
+  private var encodedCredentials: String = _
+
+  def getEpoch: Long = epoch
+
+  def getAppUser: String = appUser
+
+  def getEncodedCredentials: String = {
+    encodedCredentials
   }
 
-  def getServerPrincipal(principal: String): String = {
-    SecurityUtil.getServerPrincipal(principal, "0.0.0.0")
+  def updateCredentials(creds: Credentials): Unit = {
+    encodedCredentials = KyuubiHadoopUtils.encodeCredentials(creds)
+    epoch += 1
   }
+
+}
+
+object CredentialsRef {
+  val UNSET_EPOCH: Long = -1L
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
new file mode 100644
index 0000000..253778d
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import java.util.ServiceLoader
+import java.util.concurrent._
+
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.service.AbstractService
+import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
+
+/**
+ * [[HadoopCredentialsManager]] manages and renews delegation tokens, which 
are used by SQL engines
+ * to access kerberos secured services.
+ *
+ * Delegation tokens are sent to SQL engines by calling 
[[sendCredentialsIfNeeded]].
+ * [[sendCredentialsIfNeeded]] executes the following steps:
+ * <ol>
+ * <li>
+ *   Get or create a cached [[CredentialsRef]](contains delegation tokens) 
object by key
+ *   appUser. If [[CredentialsRef]] is newly created, spawn a scheduled task 
to renew the
+ *   delegation tokens.
+ * </li>
+ * <li>
+ *   Get or create a cached session credentials epoch object by key sessionId.
+ * </li>
+ * <li>
+ *   Compare [[CredentialsRef]] epoch with session credentials epoch. (Both 
epochs are set
+ *   to -1 when created. [[CredentialsRef]] epoch is increased when delegation 
tokens are
+ *   renewed.)
+ * </li>
+ * <li>
+ *   If epochs are equal, return. Else, send delegation tokens to the SQL 
engine.
+ * </li>
+ * <li>
+ *   If sending succeeds, set session credentials epoch to [[CredentialsRef]] 
epoch. Else,
+ *   record the exception and return.
+ * </li>
+ * </ol>
+ *
+ * @note
+ * <ol>
+ * <li>
+ *   Session credentials epochs are created in session scope and should be 
removed using
+ *   [[removeSessionCredentialsEpoch]] when session closes.
+ * </li>
+ * <li>
+ *   [[HadoopCredentialsManager]] does not renew and send credentials if no 
provider is left after
+ *   initialize.
+ * </li>
+ * </ol>
+ */
+class HadoopCredentialsManager private (name: String) extends 
AbstractService(name)
+    with Logging {
+
+  def this() = this(classOf[HadoopCredentialsManager].getSimpleName)
+
+  private val userCredentialsRefMap = new ConcurrentHashMap[String, 
CredentialsRef]()
+  private val sessionCredentialsEpochMap = new ConcurrentHashMap[String, 
Long]()
+
+  private var providers: Map[String, HadoopDelegationTokenProvider] = _
+  private var renewalInterval: Long = _
+  private var renewalRetryWait: Long = _
+  private var hadoopConf: Configuration = _
+
+  private[credentials] var renewalExecutor: Option[ScheduledExecutorService] = 
None
+
+  override def initialize(conf: KyuubiConf): Unit = {
+    hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
+    providers = HadoopCredentialsManager.loadProviders(conf)
+      .filter { case (_, provider) =>
+        val required = provider.delegationTokensRequired(hadoopConf, conf)
+        if (!required) {
+          warn(s"Service ${provider.serviceName} does not require a token." +
+            s" Check your configuration to see if security is disabled or 
not.")
+        }
+        required
+      }
+
+    if (providers.isEmpty) {
+      warn("No delegation token is required by services.")
+    } else {
+      info("Using the following builtin delegation token providers: " +
+        s"${providers.keys.mkString(", ")}.")
+    }
+
+    renewalInterval = conf.get(CREDENTIALS_RENEWAL_INTERVAL)
+    renewalRetryWait = conf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)
+    super.initialize(conf)
+  }
+
+  override def start(): Unit = {
+    if (providers.nonEmpty) {
+      renewalExecutor =
+        Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("Delegation 
Token Renewal Thread"))
+    }
+    super.start()
+  }
+
+  override def stop(): Unit = {
+    renewalExecutor.foreach { executor =>
+      executor.shutdownNow()
+      try {
+        executor.awaitTermination(10, TimeUnit.SECONDS)
+      } catch {
+        case _: InterruptedException =>
+      }
+    }
+    super.stop()
+  }
+
+  /**
+   * Send credentials to SQL engine which the specified session is talking to 
if
+   * [[HadoopCredentialsManager]] has a newer credentials.
+   *
+   * @param sessionId Specify the session which is talking with SQL engine
+   * @param appUser  User identity that the SQL engine uses.
+   * @param send     Function to send encoded credentials to SQL engine
+   */
+  def sendCredentialsIfNeeded(
+      sessionId: String,
+      appUser: String,
+      send: String => Unit): Unit = {
+    val userRef = getOrCreateUserCredentialsRef(appUser)
+    val sessionEpoch = getSessionCredentialsEpoch(sessionId)
+
+    if (userRef.getEpoch > sessionEpoch) {
+      val currentEpoch = userRef.getEpoch
+      val currentCreds = userRef.getEncodedCredentials
+      info(s"Send new credentials with epoch $currentEpoch to SQL engine 
through session " +
+        s"$sessionId")
+      Try(send(currentCreds)) match {
+        case Success(_) =>
+          info(s"Update session credentials epoch from $sessionEpoch to 
$currentEpoch")
+          sessionCredentialsEpochMap.put(sessionId, currentEpoch)
+        case Failure(exception) =>
+          warn(
+            s"Failed to send new credentials to SQL engine through session 
$sessionId",
+            exception)
+      }
+    }
+  }
+
+  /**
+   * Remove session credentials epoch corresponding to `sessionId`.
+   *
+   * @param sessionId KyuubiSession id
+   */
+  def removeSessionCredentialsEpoch(sessionId: String): Unit = {
+    sessionCredentialsEpochMap.remove(sessionId)
+  }
+
+  // Visible for testing.
+  private[credentials] def getOrCreateUserCredentialsRef(appUser: String): 
CredentialsRef =
+    userCredentialsRefMap.computeIfAbsent(
+      appUser,
+      appUser => {
+        val ref = new CredentialsRef(appUser)
+        scheduleRenewal(ref, 0)
+        info(s"Created CredentialsRef for user $appUser and scheduled a 
renewal task")
+        ref
+      })
+
+  // Visible for testing.
+  private[credentials] def getSessionCredentialsEpoch(sessionId: String): Long 
= {
+    sessionCredentialsEpochMap.getOrDefault(sessionId, 
CredentialsRef.UNSET_EPOCH)
+  }
+
+  // Visible for testing.
+  private[credentials] def containsProvider(serviceName: String): Boolean = {
+    providers.contains(serviceName)
+  }
+
+  private def scheduleRenewal(userRef: CredentialsRef, delay: Long): Unit = {
+    val renewalTask = new Runnable {
+      override def run(): Unit = {
+        try {
+          val creds = new Credentials()
+          providers.values
+            .foreach(_.obtainDelegationTokens(hadoopConf, conf, 
userRef.getAppUser, creds))
+          userRef.updateCredentials(creds)
+          scheduleRenewal(userRef, renewalInterval)
+        } catch {
+          case _: InterruptedException =>
+          // Server is shutting down
+          case e: Exception =>
+            warn(
+              s"Failed to update tokens for ${userRef.getAppUser}, try again 
in" +
+                s" $renewalRetryWait ms",
+              e)
+            scheduleRenewal(userRef, renewalRetryWait)
+        }
+      }
+    }
+
+    renewalExecutor.foreach { executor =>
+      info(s"Scheduling renewal in $delay ms.")
+      executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
+    }
+  }
+
+}
+
+object HadoopCredentialsManager extends Logging {
+
+  private val providerEnabledConfig = "kyuubi.credentials.%s.enabled"
+
+  def loadProviders(kyuubiConf: KyuubiConf): Map[String, 
HadoopDelegationTokenProvider] = {
+    val loader =
+      ServiceLoader.load(classOf[HadoopDelegationTokenProvider], 
getClass.getClassLoader)
+    val providers = mutable.ArrayBuffer[HadoopDelegationTokenProvider]()
+
+    val iterator = loader.iterator
+    while (iterator.hasNext) {
+      try {
+        providers += iterator.next
+      } catch {
+        case t: Throwable =>
+          warn(s"Failed to load built in provider.", t)
+      }
+    }
+
+    // Filter out providers for which kyuubi.credentials.{service}.enabled is 
false.
+    providers
+      .filter { p => HadoopCredentialsManager.isServiceEnabled(kyuubiConf, 
p.serviceName) }
+      .map { p => (p.serviceName, p) }
+      .toMap
+  }
+
+  def isServiceEnabled(kyuubiConf: KyuubiConf, serviceName: String): Boolean = 
{
+    val key = providerEnabledConfig.format(serviceName)
+    kyuubiConf
+      .getOption(key)
+      .map(_.toBoolean)
+      .getOrElse(true)
+  }
+
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
new file mode 100644
index 0000000..cba5761
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+
+trait HadoopDelegationTokenProvider extends Logging {
+
+  /**
+   * Name of the service to provide delegation tokens. This name should be 
unique. Kyuubi will
+   * internally use this name to differentiate delegation token providers.
+   */
+  def serviceName: String
+
+  /**
+   * Returns true if delegation tokens are required for this service. By 
default, it is based on
+   * whether Hadoop security is enabled.
+   */
+  def delegationTokensRequired(hadoopConf: Configuration, kyuubiConf: 
KyuubiConf): Boolean
+
+  /**
+   * Obtain delegation tokens for this service.
+   *
+   * @param hadoopConf Configuration of current Hadoop Compatible system.
+   * @param owner      DelegationToken owner.
+   * @param creds      Credentials to add tokens and security keys to.
+   */
+  def obtainDelegationTokens(
+    hadoopConf: Configuration,
+    kyuubiConf: KyuubiConf,
+    owner: String,
+    creds: Credentials): Unit
+
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 8d416e6..973e4c2 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -94,6 +94,7 @@ class KyuubiSessionImpl(
   override def close(): Unit = {
     super.close()
     sessionManager.operationManager.removeConnection(handle)
+    
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
     try {
       if (client != null) client.closeSession()
     } catch {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index bd2aeab..760c57e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -22,6 +22,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.credentials.HadoopCredentialsManager
 import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.KyuubiOperationManager
@@ -31,8 +32,10 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   def this() = this(classOf[KyuubiSessionManager].getSimpleName)
 
   val operationManager = new KyuubiOperationManager()
+  val credentialsManager = new HadoopCredentialsManager()
 
   override def initialize(conf: KyuubiConf): Unit = {
+    addService(credentialsManager)
     super.initialize(conf)
   }
 
diff --git 
a/kyuubi-server/src/test/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
 
b/kyuubi-server/src/test/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
new file mode 100644
index 0000000..4064562
--- /dev/null
+++ 
b/kyuubi-server/src/test/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.kyuubi.credentials.ExceptionThrowingDelegationTokenProvider
+org.apache.kyuubi.credentials.UnRequiredDelegationTokenProvider
+org.apache.kyuubi.credentials.UnstableDelegationTokenProvider
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
new file mode 100644
index 0000000..f1a30e0
--- /dev/null
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import java.io.IOException
+import java.util.UUID
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+
+class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
+
+  private val sessionId = UUID.randomUUID().toString
+  private val appUser = "who"
+  private val send = (_: String) => {}
+
+  private def withStartedManager(kyuubiConf: KyuubiConf)(f: 
HadoopCredentialsManager => Unit)
+      : Unit = {
+    val manager = new HadoopCredentialsManager()
+    manager.initialize(kyuubiConf)
+    manager.start()
+
+    try f(manager)
+    finally manager.stop()
+  }
+
+  test("load default providers") {
+    ExceptionThrowingDelegationTokenProvider.constructed = false
+    val providers = HadoopCredentialsManager.loadProviders(new 
KyuubiConf(false))
+    assert(providers.contains("unstable"))
+    assert(providers.contains("unrequired"))
+    // This checks that providers are loaded independently and they have no 
effect on each other
+    assert(ExceptionThrowingDelegationTokenProvider.constructed)
+    assert(!providers.contains("throw"))
+  }
+
+  test("disable a provider") {
+    val kyuubiConf =
+      new KyuubiConf(false)
+        .set("kyuubi.credentials.unstable.enabled", "false")
+    val providers = HadoopCredentialsManager.loadProviders(kyuubiConf)
+    assert(!providers.contains("unstable"))
+  }
+
+  test("filter providers when initialize") {
+    // Filter out providers if `delegationTokensRequired` returns false.
+    val manager = new HadoopCredentialsManager()
+    manager.initialize(new KyuubiConf(false))
+    assert(!manager.containsProvider("unrequired"))
+  }
+
+  test("no provider left after initialize") {
+    val kyuubiConf =
+      new KyuubiConf(false)
+        .set("kyuubi.credentials.unstable.enabled", "false")
+    withStartedManager(kyuubiConf) { manager =>
+      // All providers are filtered out either because of being disabled or
+      // because does not require a token
+      assert(manager.renewalExecutor.isEmpty)
+    }
+  }
+
+  test("schedule credentials renewal") {
+    val kyuubiConf = new KyuubiConf(false)
+      .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
+    withStartedManager(kyuubiConf) { manager =>
+      val userRef = manager.getOrCreateUserCredentialsRef(appUser)
+      // Tolerate 100 ms delay
+      eventually(timeout(1100.milliseconds), interval(100.milliseconds)) {
+        assert(userRef.getEpoch == 1)
+      }
+    }
+  }
+
+  test("schedule credentials renewal retry when failed") {
+    val kyuubiConf = new KyuubiConf(false)
+      .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
+      .set(KyuubiConf.CREDENTIALS_RENEWAL_RETRY_WAIT, 1000L)
+    withStartedManager(kyuubiConf) { manager =>
+      try {
+        UnstableDelegationTokenProvider.throwException = true
+
+        val userRef = manager.getOrCreateUserCredentialsRef(appUser)
+        // Tolerate 100 ms delay
+        eventually(timeout(2100.milliseconds), interval(100.milliseconds)) {
+          // 1 scheduled call and 2 scheduled retrying call
+          assert(UnstableDelegationTokenProvider.exceptionCount == 3)
+        }
+        assert(userRef.getEpoch == CredentialsRef.UNSET_EPOCH)
+      } finally {
+        UnstableDelegationTokenProvider.throwException = false
+      }
+    }
+  }
+
+  test("send credentials if needed") {
+    val kyuubiConf = new KyuubiConf(false)
+      .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
+    withStartedManager(kyuubiConf) { manager =>
+      // Trigger UserCredentialsRef's initialization
+      val userRef = manager.getOrCreateUserCredentialsRef(appUser)
+      eventually(interval(100.milliseconds)) {
+        assert(userRef.getEpoch == 0)
+      }
+
+      manager.sendCredentialsIfNeeded(sessionId, appUser, send)
+
+      val sessionEpoch = manager.getSessionCredentialsEpoch(sessionId)
+      assert(sessionEpoch == userRef.getEpoch)
+    }
+  }
+
+  test("credentials sending failure") {
+    withStartedManager(new KyuubiConf(false)) { manager =>
+      // Trigger UserCredentialsRef's initialization
+      val userRef = manager.getOrCreateUserCredentialsRef(appUser)
+      eventually(interval(100.milliseconds)) {
+        assert(userRef.getEpoch == 0)
+      }
+
+      var called = false
+      manager.sendCredentialsIfNeeded(
+        sessionId,
+        appUser,
+        _ => {
+          called = true
+          throw new IOException
+        })
+
+      assert(called)
+      assert(manager.getSessionCredentialsEpoch(sessionId) == 
CredentialsRef.UNSET_EPOCH)
+    }
+  }
+}
+
+private class ExceptionThrowingDelegationTokenProvider extends 
HadoopDelegationTokenProvider {
+  ExceptionThrowingDelegationTokenProvider.constructed = true
+  throw new IllegalArgumentException
+
+  override def serviceName: String = "throw"
+
+  override def delegationTokensRequired(
+      hadoopConf: Configuration,
+      kyuubiConf: KyuubiConf): Boolean = true
+
+  override def obtainDelegationTokens(
+      hadoopConf: Configuration,
+      kyuubiConf: KyuubiConf,
+      owner: String,
+      creds: Credentials): Unit = {}
+
+}
+
+private object ExceptionThrowingDelegationTokenProvider {
+  var constructed = false
+}
+
+private class UnRequiredDelegationTokenProvider extends 
HadoopDelegationTokenProvider {
+
+  override def serviceName: String = "unrequired"
+
+  override def delegationTokensRequired(
+      hadoopConf: Configuration,
+      kyuubiConf: KyuubiConf): Boolean = false
+
+  override def obtainDelegationTokens(
+      hadoopConf: Configuration,
+      kyuubiConf: KyuubiConf,
+      owner: String,
+      creds: Credentials): Unit = {}
+
+}
+
+private class UnstableDelegationTokenProvider extends 
HadoopDelegationTokenProvider {
+
+  override def serviceName: String = "unstable"
+
+  override def delegationTokensRequired(
+      hadoopConf: Configuration,
+      kyuubiConf: KyuubiConf): Boolean = true
+
+  override def obtainDelegationTokens(
+      hadoopConf: Configuration,
+      kyuubiConf: KyuubiConf,
+      owner: String,
+      creds: Credentials): Unit = {
+    if (UnstableDelegationTokenProvider.throwException) {
+      UnstableDelegationTokenProvider.exceptionCount += 1
+      throw new IllegalArgumentException
+    }
+  }
+
+}
+
+private object UnstableDelegationTokenProvider {
+
+  @volatile
+  var throwException: Boolean = false
+
+  @volatile
+  var exceptionCount = 0
+
+}

Reply via email to