This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new 15aba5d  [KYUUBI #1936] Send credentials when opening session and wait 
for completion
15aba5d is described below

commit 15aba5de365b65b50c3a18c932262d4bf3fab823
Author: Tianlin Liao <[email protected]>
AuthorDate: Thu Mar 10 09:40:59 2022 +0800

    [KYUUBI #1936] Send credentials when opening session and wait for completion
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    To close #1936
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2059 from lightning-L/kyuubi-1936.
    
    Closes #1936
    
    e9c83ef6 [Tianlin Liao] [KYUUBI #1936] send credentials when opening 
session and wait for completion
    
    Authored-by: Tianlin Liao <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
    (cherry picked from commit 8e983a198ea8c57d5c62b0c19b507096dba60bb5)
    Signed-off-by: ulysses-you <[email protected]>
---
 .../credentials/HadoopCredentialsManager.scala     | 44 +++++++++++++++++-----
 .../org/apache/kyuubi/operation/LaunchEngine.scala | 17 ++++++++-
 .../HadoopCredentialsManagerSuite.scala            | 20 +++++++++-
 3 files changed, 69 insertions(+), 12 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index ec80ebc..5040520 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -26,7 +26,7 @@ import scala.util.{Failure, Success, Try}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.security.Credentials
 
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiException, Logging}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.service.AbstractService
@@ -146,12 +146,13 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
   def sendCredentialsIfNeeded(
       sessionId: String,
       appUser: String,
-      send: String => Unit): Unit = {
+      send: String => Unit,
+      onetime: Boolean = false): Unit = {
     if (renewalExecutor.isEmpty) {
       return
     }
 
-    val userRef = getOrCreateUserCredentialsRef(appUser)
+    val userRef = getOrCreateUserCredentialsRef(appUser, onetime)
     val sessionEpoch = getSessionCredentialsEpoch(sessionId)
 
     if (userRef.getEpoch > sessionEpoch) {
@@ -181,16 +182,26 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
   }
 
   // Visible for testing.
-  private[credentials] def getOrCreateUserCredentialsRef(appUser: String): 
CredentialsRef =
-    userCredentialsRefMap.computeIfAbsent(
+  private[credentials] def getOrCreateUserCredentialsRef(
+      appUser: String,
+      onetime: Boolean = false): CredentialsRef = {
+    val ref = userCredentialsRefMap.computeIfAbsent(
       appUser,
       appUser => {
         val ref = new CredentialsRef(appUser)
-        scheduleRenewal(ref, 0)
+        scheduleRenewal(ref, 0, onetime)
         info(s"Created CredentialsRef for user $appUser and scheduled a 
renewal task")
         ref
       })
 
+    // schedule renewal task when encodedCredentials are invalid
+    if (onetime && ref.getEncodedCredentials == null) {
+      scheduleRenewal(ref, 0, onetime)
+    }
+
+    ref
+  }
+
   // Visible for testing.
   private[credentials] def getSessionCredentialsEpoch(sessionId: String): Long 
= {
     sessionCredentialsEpochMap.getOrDefault(sessionId, 
CredentialsRef.UNSET_EPOCH)
@@ -201,7 +212,10 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
     providers.contains(serviceName)
   }
 
-  private def scheduleRenewal(userRef: CredentialsRef, delay: Long): Unit = {
+  private def scheduleRenewal(
+      userRef: CredentialsRef,
+      delay: Long,
+      waitCompletion: Boolean = false): Unit = {
     val renewalTask = new Runnable {
       override def run(): Unit = {
         try {
@@ -219,13 +233,23 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
                 s" $renewalRetryWait ms",
               e)
             scheduleRenewal(userRef, renewalRetryWait)
+            // throw exception when one-time execution fails,
+            // so that client side can be aware of this
+            if (waitCompletion) {
+              throw new KyuubiException(s"One-time execution failed for token 
update task " +
+                s"for ${userRef.getAppUser}")
+            }
         }
       }
     }
 
-    renewalExecutor.foreach { executor =>
-      info(s"Scheduling renewal in $delay ms.")
-      executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
+    if (waitCompletion) {
+      renewalTask.run()
+    } else {
+      renewalExecutor.foreach { executor =>
+        info(s"Scheduling renewal in $delay ms.")
+        executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
+      }
     }
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
index 8e087df..41a6cad 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -18,7 +18,7 @@
 package org.apache.kyuubi.operation
 
 import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.session.KyuubiSessionImpl
+import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager}
 
 class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: 
Boolean)
   extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
@@ -49,6 +49,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val 
shouldRunAsync: Bool
       setState(OperationState.RUNNING)
       try {
         session.openEngineSession(getOperationLog)
+        renewEngineCredentials()
         setState(OperationState.FINISHED)
       } catch onError()
     }
@@ -59,4 +60,18 @@ class LaunchEngine(session: KyuubiSessionImpl, override val 
shouldRunAsync: Bool
 
     if (!shouldRunAsync) getBackgroundHandle.get()
   }
+
+  private def renewEngineCredentials(): Unit = {
+    val sessionManager = 
session.sessionManager.asInstanceOf[KyuubiSessionManager]
+    try {
+      sessionManager.credentialsManager.sendCredentialsIfNeeded(
+        session.handle.identifier.toString,
+        session.user,
+        client.sendCredentials,
+        true)
+    } catch {
+      case e: Exception =>
+        error(s"Failed to renew engine credentials when launching engine", e)
+    }
+  }
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
index a721c3c..ff08422 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.security.Credentials
 import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
-import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
 import org.apache.kyuubi.config.KyuubiConf
 
 class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
@@ -93,6 +93,24 @@ class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
     }
   }
 
+  test("execute credentials renewal task and wait for completion") {
+    val kyuubiConf = new KyuubiConf(false)
+      .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
+    withStartedManager(kyuubiConf) { manager =>
+      val userRef = manager.getOrCreateUserCredentialsRef(appUser, true)
+      assert(userRef.getEpoch == 0)
+    }
+  }
+
+  test("throw exception when credential renewal fails") {
+    val kyuubiConf = new KyuubiConf(false)
+      .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
+    withStartedManager(kyuubiConf) { manager =>
+      UnstableDelegationTokenProvider.throwException = true
+      
assertThrows[KyuubiException](manager.getOrCreateUserCredentialsRef(appUser, 
true))
+    }
+  }
+
   test("schedule credentials renewal retry when failed") {
     val kyuubiConf = new KyuubiConf(false)
       .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)

Reply via email to