This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new 15aba5d [KYUUBI #1936] Send credentials when opening session and wait
for completion
15aba5d is described below
commit 15aba5de365b65b50c3a18c932262d4bf3fab823
Author: Tianlin Liao <[email protected]>
AuthorDate: Thu Mar 10 09:40:59 2022 +0800
[KYUUBI #1936] Send credentials when opening session and wait for completion
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
To close #1936
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2059 from lightning-L/kyuubi-1936.
Closes #1936
e9c83ef6 [Tianlin Liao] [KYUUBI #1936] send credentials when opening
session and wait for completion
Authored-by: Tianlin Liao <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
(cherry picked from commit 8e983a198ea8c57d5c62b0c19b507096dba60bb5)
Signed-off-by: ulysses-you <[email protected]>
---
.../credentials/HadoopCredentialsManager.scala | 44 +++++++++++++++++-----
.../org/apache/kyuubi/operation/LaunchEngine.scala | 17 ++++++++-
.../HadoopCredentialsManagerSuite.scala | 20 +++++++++-
3 files changed, 69 insertions(+), 12 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index ec80ebc..5040520 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -26,7 +26,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.service.AbstractService
@@ -146,12 +146,13 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
def sendCredentialsIfNeeded(
sessionId: String,
appUser: String,
- send: String => Unit): Unit = {
+ send: String => Unit,
+ onetime: Boolean = false): Unit = {
if (renewalExecutor.isEmpty) {
return
}
- val userRef = getOrCreateUserCredentialsRef(appUser)
+ val userRef = getOrCreateUserCredentialsRef(appUser, onetime)
val sessionEpoch = getSessionCredentialsEpoch(sessionId)
if (userRef.getEpoch > sessionEpoch) {
@@ -181,16 +182,26 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
}
// Visible for testing.
- private[credentials] def getOrCreateUserCredentialsRef(appUser: String):
CredentialsRef =
- userCredentialsRefMap.computeIfAbsent(
+ private[credentials] def getOrCreateUserCredentialsRef(
+ appUser: String,
+ onetime: Boolean = false): CredentialsRef = {
+ val ref = userCredentialsRefMap.computeIfAbsent(
appUser,
appUser => {
val ref = new CredentialsRef(appUser)
- scheduleRenewal(ref, 0)
+ scheduleRenewal(ref, 0, onetime)
info(s"Created CredentialsRef for user $appUser and scheduled a
renewal task")
ref
})
+ // schedule renewal task when encodedCredentials are invalid
+ if (onetime && ref.getEncodedCredentials == null) {
+ scheduleRenewal(ref, 0, onetime)
+ }
+
+ ref
+ }
+
// Visible for testing.
private[credentials] def getSessionCredentialsEpoch(sessionId: String): Long
= {
sessionCredentialsEpochMap.getOrDefault(sessionId,
CredentialsRef.UNSET_EPOCH)
@@ -201,7 +212,10 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
providers.contains(serviceName)
}
- private def scheduleRenewal(userRef: CredentialsRef, delay: Long): Unit = {
+ private def scheduleRenewal(
+ userRef: CredentialsRef,
+ delay: Long,
+ waitCompletion: Boolean = false): Unit = {
val renewalTask = new Runnable {
override def run(): Unit = {
try {
@@ -219,13 +233,23 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
s" $renewalRetryWait ms",
e)
scheduleRenewal(userRef, renewalRetryWait)
+ // throw exception when one-time execution fails,
+ // so that client side can be aware of this
+ if (waitCompletion) {
+ throw new KyuubiException(s"One-time execution failed for token
update task " +
+ s"for ${userRef.getAppUser}")
+ }
}
}
}
- renewalExecutor.foreach { executor =>
- info(s"Scheduling renewal in $delay ms.")
- executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
+ if (waitCompletion) {
+ renewalTask.run()
+ } else {
+ renewalExecutor.foreach { executor =>
+ info(s"Scheduling renewal in $delay ms.")
+ executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
+ }
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
index 8e087df..41a6cad 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.operation
import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.session.KyuubiSessionImpl
+import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager}
class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync:
Boolean)
extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
@@ -49,6 +49,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val
shouldRunAsync: Bool
setState(OperationState.RUNNING)
try {
session.openEngineSession(getOperationLog)
+ renewEngineCredentials()
setState(OperationState.FINISHED)
} catch onError()
}
@@ -59,4 +60,18 @@ class LaunchEngine(session: KyuubiSessionImpl, override val
shouldRunAsync: Bool
if (!shouldRunAsync) getBackgroundHandle.get()
}
+
+ private def renewEngineCredentials(): Unit = {
+ val sessionManager =
session.sessionManager.asInstanceOf[KyuubiSessionManager]
+ try {
+ sessionManager.credentialsManager.sendCredentialsIfNeeded(
+ session.handle.identifier.toString,
+ session.user,
+ client.sendCredentials,
+ true)
+ } catch {
+ case e: Exception =>
+ error(s"Failed to renew engine credentials when launching engine", e)
+ }
+ }
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
index a721c3c..ff08422 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
-import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
import org.apache.kyuubi.config.KyuubiConf
class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
@@ -93,6 +93,24 @@ class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
}
}
+ test("execute credentials renewal task and wait for completion") {
+ val kyuubiConf = new KyuubiConf(false)
+ .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
+ withStartedManager(kyuubiConf) { manager =>
+ val userRef = manager.getOrCreateUserCredentialsRef(appUser, true)
+ assert(userRef.getEpoch == 0)
+ }
+ }
+
+ test("throw exception when credential renewal fails") {
+ val kyuubiConf = new KyuubiConf(false)
+ .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
+ withStartedManager(kyuubiConf) { manager =>
+ UnstableDelegationTokenProvider.throwException = true
+
assertThrows[KyuubiException](manager.getOrCreateUserCredentialsRef(appUser,
true))
+ }
+ }
+
test("schedule credentials renewal retry when failed") {
val kyuubiConf = new KyuubiConf(false)
.set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)