This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new eb4d289  [KYUUBI #1936][FOLLOWUP] Send credentials when opening 
session and wait for completion
eb4d289 is described below

commit eb4d289068db84790defe3538d3e9defe95715ac
Author: Tianlin Liao <[email protected]>
AuthorDate: Wed Mar 16 09:32:53 2022 +0800

    [KYUUBI #1936][FOLLOWUP] Send credentials when opening session and wait for 
completion
    
    ### _Why are the changes needed?_
    
    Follow up #1936
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2092 from lightning-L/kyuubi-1936.
    
    Closes #1936
    
    2bb2c10d [Tianlin Liao] [KYUUBI #1936][FOLLOWUP] Send credentials when 
opening session and wait for completion
    
    Authored-by: Tianlin Liao <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 docs/deployment/settings.md                        |  1 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  8 +++
 .../scala/org/apache/kyuubi/util/ThreadUtils.scala | 17 ++++++-
 .../apache/kyuubi/credentials/CredentialsRef.scala | 17 ++++++-
 .../credentials/HadoopCredentialsManager.scala     | 57 ++++++++++++----------
 .../HadoopCredentialsManagerSuite.scala            |  4 ++
 6 files changed, 77 insertions(+), 27 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 08cbc88..51b2fd8 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -163,6 +163,7 @@ Key | Default | Meaning | Type | Since
 <code>kyuubi.credentials.hive.enabled</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Whether to renew Hive 
metastore delegation token</div>|<div style='width: 30pt'>boolean</div>|<div 
style='width: 20pt'>1.4.0</div>
 <code>kyuubi.credentials.renewal.interval</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>PT1H</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>How often Kyuubi renews one 
user's delegation tokens</div>|<div style='width: 30pt'>duration</div>|<div 
style='width: 20pt'>1.4.0</div>
 <code>kyuubi.credentials.renewal.retry.wait</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>How long to wait before 
retrying to fetch new credentials after a failure.</div>|<div style='width: 
30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
+<code>kyuubi.credentials.update.wait.timeout</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>How long to wait until 
credentials are ready.</div>|<div style='width: 30pt'>duration</div>|<div 
style='width: 20pt'>1.5.0</div>
 
 
 ### Delegation
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 70dc9cb..ebf84bc 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -246,6 +246,14 @@ object KyuubiConf {
       .checkValue(t => t > 0, "must be positive integer")
       .createWithDefault(Duration.ofMinutes(1).toMillis)
 
+  val CREDENTIALS_UPDATE_WAIT_TIMEOUT: ConfigEntry[Long] =
+    buildConf("credentials.update.wait.timeout")
+      .doc("How long to wait until credentials are ready.")
+      .version("1.5.0")
+      .timeConf
+      .checkValue(t => t > 0, "must be positive integer")
+      .createWithDefault(Duration.ofMinutes(1).toMillis)
+
   val CREDENTIALS_HADOOP_FS_ENABLED: ConfigEntry[Boolean] =
     buildConf("credentials.hadoopfs.enabled")
       .doc("Whether to renew Hadoop filesystem delegation tokens")
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index 5809ff0..a540b95 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -19,7 +19,10 @@ package org.apache.kyuubi.util
 
 import java.util.concurrent.{LinkedBlockingQueue, ScheduledExecutorService, 
ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
 
-import org.apache.kyuubi.Logging
+import scala.concurrent.Awaitable
+import scala.concurrent.duration.Duration
+
+import org.apache.kyuubi.{KyuubiException, Logging}
 
 object ThreadUtils extends Logging {
 
@@ -49,4 +52,16 @@ object ThreadUtils extends Logging {
     executor.allowCoreThreadTimeOut(true)
     executor
   }
+
+  def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
+    try {
+      // `awaitPermission` is not actually used anywhere so it's safe to pass 
in null here.
+      // See SPARK-13747.
+      val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+      awaitable.result(atMost)(awaitPermission)
+    } catch {
+      case e: Exception =>
+        throw new KyuubiException("Exception thrown in awaitResult: ", e)
+    }
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
index 9ccf445..396d969 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
@@ -17,10 +17,15 @@
 
 package org.apache.kyuubi.credentials
 
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
 import org.apache.hadoop.security.Credentials
 
 import org.apache.kyuubi.credentials.CredentialsRef.UNSET_EPOCH
-import org.apache.kyuubi.util.KyuubiHadoopUtils
+import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
 
 class CredentialsRef(appUser: String) {
 
@@ -29,6 +34,12 @@ class CredentialsRef(appUser: String) {
 
   private var encodedCredentials: String = _
 
+  private val credentialFuture = new AtomicReference[Future[Unit]]()
+
+  def setFuture(future: Future[Unit]): Unit = {
+    credentialFuture.set(future)
+  }
+
   def getEpoch: Long = epoch
 
   def getAppUser: String = appUser
@@ -42,6 +53,10 @@ class CredentialsRef(appUser: String) {
     epoch += 1
   }
 
+  def waitUntilReady(timeout: Duration): Unit = {
+    Option(credentialFuture.get).foreach(ThreadUtils.awaitResult(_, timeout))
+  }
+
 }
 
 object CredentialsRef {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index 5040520..2ebda60 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -21,12 +21,15 @@ import java.util.ServiceLoader
 import java.util.concurrent._
 
 import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
 import scala.util.{Failure, Success, Try}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.security.Credentials
 
-import org.apache.kyuubi.{KyuubiException, Logging}
+import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.service.AbstractService
@@ -84,6 +87,7 @@ class HadoopCredentialsManager private (name: String) extends 
AbstractService(na
   private var providers: Map[String, HadoopDelegationTokenProvider] = _
   private var renewalInterval: Long = _
   private var renewalRetryWait: Long = _
+  private var credentialsWaitTimeout: Long = _
   private var hadoopConf: Configuration = _
 
   private[credentials] var renewalExecutor: Option[ScheduledExecutorService] = 
None
@@ -111,6 +115,7 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
 
     renewalInterval = conf.get(CREDENTIALS_RENEWAL_INTERVAL)
     renewalRetryWait = conf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)
+    credentialsWaitTimeout = conf.get(CREDENTIALS_UPDATE_WAIT_TIMEOUT)
     super.initialize(conf)
   }
 
@@ -147,12 +152,12 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
       sessionId: String,
       appUser: String,
       send: String => Unit,
-      onetime: Boolean = false): Unit = {
+      waitUntilCredentialsReady: Boolean = false): Unit = {
     if (renewalExecutor.isEmpty) {
       return
     }
 
-    val userRef = getOrCreateUserCredentialsRef(appUser, onetime)
+    val userRef = getOrCreateUserCredentialsRef(appUser, 
waitUntilCredentialsReady)
     val sessionEpoch = getSessionCredentialsEpoch(sessionId)
 
     if (userRef.getEpoch > sessionEpoch) {
@@ -184,19 +189,19 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
   // Visible for testing.
   private[credentials] def getOrCreateUserCredentialsRef(
       appUser: String,
-      onetime: Boolean = false): CredentialsRef = {
+      waitUntilCredentialsReady: Boolean = false): CredentialsRef = {
     val ref = userCredentialsRefMap.computeIfAbsent(
       appUser,
       appUser => {
         val ref = new CredentialsRef(appUser)
-        scheduleRenewal(ref, 0, onetime)
+        val credentialsFuture: Future[Unit] = scheduleRenewal(ref, 0, 
waitUntilCredentialsReady)
+        ref.setFuture(credentialsFuture)
         info(s"Created CredentialsRef for user $appUser and scheduled a 
renewal task")
         ref
       })
 
-    // schedule renewal task when encodedCredentials are invalid
-    if (onetime && ref.getEncodedCredentials == null) {
-      scheduleRenewal(ref, 0, onetime)
+    if (waitUntilCredentialsReady) {
+      ref.waitUntilReady(Duration(credentialsWaitTimeout, 
TimeUnit.MILLISECONDS))
     }
 
     ref
@@ -212,17 +217,24 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
     providers.contains(serviceName)
   }
 
+  private def updateCredentials(userRef: CredentialsRef): Unit = {
+    val creds = new Credentials()
+    providers.values
+      .foreach(_.obtainDelegationTokens(userRef.getAppUser, creds))
+    userRef.updateCredentials(creds)
+  }
+
   private def scheduleRenewal(
       userRef: CredentialsRef,
       delay: Long,
-      waitCompletion: Boolean = false): Unit = {
+      waitUntilCredentialsReady: Boolean = false): Future[Unit] = {
+    val promise = Promise[Unit]()
+
     val renewalTask = new Runnable {
       override def run(): Unit = {
         try {
-          val creds = new Credentials()
-          providers.values
-            .foreach(_.obtainDelegationTokens(userRef.getAppUser, creds))
-          userRef.updateCredentials(creds)
+          promise.trySuccess(updateCredentials(userRef))
+
           scheduleRenewal(userRef, renewalInterval)
         } catch {
           case _: InterruptedException =>
@@ -233,24 +245,19 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
                 s" $renewalRetryWait ms",
               e)
             scheduleRenewal(userRef, renewalRetryWait)
-            // throw exception when one-time execution fails,
-            // so that client side can be aware of this
-            if (waitCompletion) {
-              throw new KyuubiException(s"One-time execution failed for token 
update task " +
-                s"for ${userRef.getAppUser}")
+            if (waitUntilCredentialsReady) {
+              promise.tryFailure(e)
             }
         }
       }
     }
 
-    if (waitCompletion) {
-      renewalTask.run()
-    } else {
-      renewalExecutor.foreach { executor =>
-        info(s"Scheduling renewal in $delay ms.")
-        executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
-      }
+    renewalExecutor.foreach { executor =>
+      info(s"Scheduling renewal in $delay ms.")
+      executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
     }
+
+    promise.future
   }
 
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
index ff08422..c5647c4 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
@@ -99,6 +99,10 @@ class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
     withStartedManager(kyuubiConf) { manager =>
       val userRef = manager.getOrCreateUserCredentialsRef(appUser, true)
       assert(userRef.getEpoch == 0)
+
+      eventually(timeout(1100.milliseconds), interval(100.milliseconds)) {
+        assert(userRef.getEpoch == 1)
+      }
     }
   }
 

Reply via email to