This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new 7d7a38a [KYUUBI #1936][FOLLOWUP] Send credentials when opening
session and wait for completion
7d7a38a is described below
commit 7d7a38a7307980610ade367241c9b819969ef249
Author: Tianlin Liao <[email protected]>
AuthorDate: Wed Mar 16 09:32:53 2022 +0800
[KYUUBI #1936][FOLLOWUP] Send credentials when opening session and wait for
completion
### _Why are the changes needed?_
Follow up #1936
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2092 from lightning-L/kyuubi-1936.
Closes #1936
2bb2c10d [Tianlin Liao] [KYUUBI #1936][FOLLOWUP] Send credentials when
opening session and wait for completion
Authored-by: Tianlin Liao <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
(cherry picked from commit eb4d289068db84790defe3538d3e9defe95715ac)
Signed-off-by: ulysses-you <[email protected]>
---
docs/deployment/settings.md | 1 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 8 +++
.../scala/org/apache/kyuubi/util/ThreadUtils.scala | 17 ++++++-
.../apache/kyuubi/credentials/CredentialsRef.scala | 17 ++++++-
.../credentials/HadoopCredentialsManager.scala | 57 ++++++++++++----------
.../HadoopCredentialsManagerSuite.scala | 4 ++
6 files changed, 77 insertions(+), 27 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 5a78291..d84577d 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -161,6 +161,7 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.credentials.hive.enabled</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Whether to renew Hive
metastore delegation token</div>|<div style='width: 30pt'>boolean</div>|<div
style='width: 20pt'>1.4.0</div>
<code>kyuubi.credentials.renewal.interval</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT1H</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>How often Kyuubi renews one
user's delegation tokens</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.4.0</div>
<code>kyuubi.credentials.renewal.retry.wait</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>How long to wait before
retrying to fetch new credentials after a failure.</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
+<code>kyuubi.credentials.update.wait.timeout</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>How long to wait until
credentials are ready.</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.5.0</div>
### Delegation
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index af08056..73ce963 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -231,6 +231,14 @@ object KyuubiConf {
.checkValue(t => t > 0, "must be positive integer")
.createWithDefault(Duration.ofMinutes(1).toMillis)
+ val CREDENTIALS_UPDATE_WAIT_TIMEOUT: ConfigEntry[Long] =
+ buildConf("credentials.update.wait.timeout")
+ .doc("How long to wait until credentials are ready.")
+ .version("1.5.0")
+ .timeConf
+ .checkValue(t => t > 0, "must be positive integer")
+ .createWithDefault(Duration.ofMinutes(1).toMillis)
+
val CREDENTIALS_HADOOP_FS_ENABLED: ConfigEntry[Boolean] =
buildConf("credentials.hadoopfs.enabled")
.doc("Whether to renew Hadoop filesystem delegation tokens")
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index 5809ff0..a540b95 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -19,7 +19,10 @@ package org.apache.kyuubi.util
import java.util.concurrent.{LinkedBlockingQueue, ScheduledExecutorService,
ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
-import org.apache.kyuubi.Logging
+import scala.concurrent.Awaitable
+import scala.concurrent.duration.Duration
+
+import org.apache.kyuubi.{KyuubiException, Logging}
object ThreadUtils extends Logging {
@@ -49,4 +52,16 @@ object ThreadUtils extends Logging {
executor.allowCoreThreadTimeOut(true)
executor
}
+
+ def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ try {
+ // `awaitPermission` is not actually used anywhere so it's safe to pass
in null here.
+ // See SPARK-13747.
+ val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+ awaitable.result(atMost)(awaitPermission)
+ } catch {
+ case e: Exception =>
+ throw new KyuubiException("Exception thrown in awaitResult: ", e)
+ }
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
index 9ccf445..396d969 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala
@@ -17,10 +17,15 @@
package org.apache.kyuubi.credentials
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
import org.apache.hadoop.security.Credentials
import org.apache.kyuubi.credentials.CredentialsRef.UNSET_EPOCH
-import org.apache.kyuubi.util.KyuubiHadoopUtils
+import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
class CredentialsRef(appUser: String) {
@@ -29,6 +34,12 @@ class CredentialsRef(appUser: String) {
private var encodedCredentials: String = _
+ private val credentialFuture = new AtomicReference[Future[Unit]]()
+
+ def setFuture(future: Future[Unit]): Unit = {
+ credentialFuture.set(future)
+ }
+
def getEpoch: Long = epoch
def getAppUser: String = appUser
@@ -42,6 +53,10 @@ class CredentialsRef(appUser: String) {
epoch += 1
}
+ def waitUntilReady(timeout: Duration): Unit = {
+ Option(credentialFuture.get).foreach(ThreadUtils.awaitResult(_, timeout))
+ }
+
}
object CredentialsRef {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index 5040520..2ebda60 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -21,12 +21,15 @@ import java.util.ServiceLoader
import java.util.concurrent._
import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
-import org.apache.kyuubi.{KyuubiException, Logging}
+import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.service.AbstractService
@@ -84,6 +87,7 @@ class HadoopCredentialsManager private (name: String) extends
AbstractService(na
private var providers: Map[String, HadoopDelegationTokenProvider] = _
private var renewalInterval: Long = _
private var renewalRetryWait: Long = _
+ private var credentialsWaitTimeout: Long = _
private var hadoopConf: Configuration = _
private[credentials] var renewalExecutor: Option[ScheduledExecutorService] =
None
@@ -111,6 +115,7 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
renewalInterval = conf.get(CREDENTIALS_RENEWAL_INTERVAL)
renewalRetryWait = conf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)
+ credentialsWaitTimeout = conf.get(CREDENTIALS_UPDATE_WAIT_TIMEOUT)
super.initialize(conf)
}
@@ -147,12 +152,12 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
sessionId: String,
appUser: String,
send: String => Unit,
- onetime: Boolean = false): Unit = {
+ waitUntilCredentialsReady: Boolean = false): Unit = {
if (renewalExecutor.isEmpty) {
return
}
- val userRef = getOrCreateUserCredentialsRef(appUser, onetime)
+ val userRef = getOrCreateUserCredentialsRef(appUser,
waitUntilCredentialsReady)
val sessionEpoch = getSessionCredentialsEpoch(sessionId)
if (userRef.getEpoch > sessionEpoch) {
@@ -184,19 +189,19 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
// Visible for testing.
private[credentials] def getOrCreateUserCredentialsRef(
appUser: String,
- onetime: Boolean = false): CredentialsRef = {
+ waitUntilCredentialsReady: Boolean = false): CredentialsRef = {
val ref = userCredentialsRefMap.computeIfAbsent(
appUser,
appUser => {
val ref = new CredentialsRef(appUser)
- scheduleRenewal(ref, 0, onetime)
+ val credentialsFuture: Future[Unit] = scheduleRenewal(ref, 0,
waitUntilCredentialsReady)
+ ref.setFuture(credentialsFuture)
info(s"Created CredentialsRef for user $appUser and scheduled a
renewal task")
ref
})
- // schedule renewal task when encodedCredentials are invalid
- if (onetime && ref.getEncodedCredentials == null) {
- scheduleRenewal(ref, 0, onetime)
+ if (waitUntilCredentialsReady) {
+ ref.waitUntilReady(Duration(credentialsWaitTimeout,
TimeUnit.MILLISECONDS))
}
ref
@@ -212,17 +217,24 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
providers.contains(serviceName)
}
+ private def updateCredentials(userRef: CredentialsRef): Unit = {
+ val creds = new Credentials()
+ providers.values
+ .foreach(_.obtainDelegationTokens(userRef.getAppUser, creds))
+ userRef.updateCredentials(creds)
+ }
+
private def scheduleRenewal(
userRef: CredentialsRef,
delay: Long,
- waitCompletion: Boolean = false): Unit = {
+ waitUntilCredentialsReady: Boolean = false): Future[Unit] = {
+ val promise = Promise[Unit]()
+
val renewalTask = new Runnable {
override def run(): Unit = {
try {
- val creds = new Credentials()
- providers.values
- .foreach(_.obtainDelegationTokens(userRef.getAppUser, creds))
- userRef.updateCredentials(creds)
+ promise.trySuccess(updateCredentials(userRef))
+
scheduleRenewal(userRef, renewalInterval)
} catch {
case _: InterruptedException =>
@@ -233,24 +245,19 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
s" $renewalRetryWait ms",
e)
scheduleRenewal(userRef, renewalRetryWait)
- // throw exception when one-time execution fails,
- // so that client side can be aware of this
- if (waitCompletion) {
- throw new KyuubiException(s"One-time execution failed for token
update task " +
- s"for ${userRef.getAppUser}")
+ if (waitUntilCredentialsReady) {
+ promise.tryFailure(e)
}
}
}
}
- if (waitCompletion) {
- renewalTask.run()
- } else {
- renewalExecutor.foreach { executor =>
- info(s"Scheduling renewal in $delay ms.")
- executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
- }
+ renewalExecutor.foreach { executor =>
+ info(s"Scheduling renewal in $delay ms.")
+ executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
}
+
+ promise.future
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
index ff08422..c5647c4 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
@@ -99,6 +99,10 @@ class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
withStartedManager(kyuubiConf) { manager =>
val userRef = manager.getOrCreateUserCredentialsRef(appUser, true)
assert(userRef.getEpoch == 0)
+
+ eventually(timeout(1100.milliseconds), interval(100.milliseconds)) {
+ assert(userRef.getEpoch == 1)
+ }
}
}