This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 06bc924df [KYUUBI #4731] Support batch session conf advisor
06bc924df is described below

commit 06bc924df569cc7485cd09c6cb712e5e8cfb4e26
Author: fwang12 <[email protected]>
AuthorDate: Thu Apr 20 17:47:52 2023 +0800

    [KYUUBI #4731] Support batch session conf advisor
    
    ### _Why are the changes needed?_
    
    As title.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4731 from turboFei/batch_session_conf_overlay.
    
    Closes #4731
    
    5cf73d1db [fwang12] save
    
    Authored-by: fwang12 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit ec8596c9eee9b31c8d715ee4f53d53cd8b441304)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    | 23 +++++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 228890a1e..94859a08c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -68,14 +68,27 @@ class KyuubiBatchSessionImpl(
   override val sessionIdleTimeoutThreshold: Long =
     sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT)
 
-  // TODO: Support batch conf advisor
   override val normalizedConf: Map[String, String] = {
     sessionConf.getBatchConf(batchRequest.getBatchType) ++
       sessionManager.validateBatchConf(batchRequest.getConf.asScala.toMap)
   }
 
+  private val optimizedConf: Map[String, String] = {
+    val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
+      user,
+      normalizedConf.asJava)
+    if (confOverlay != null) {
+      val overlayConf = new KyuubiConf(false)
+      confOverlay.asScala.foreach { case (k, v) => overlayConf.set(k, v) }
+      normalizedConf ++ overlayConf.getBatchConf(batchRequest.getBatchType)
+    } else {
+      warn(s"the server plugin return null value for user: $user, ignore it")
+      normalizedConf
+    }
+  }
+
   override lazy val name: Option[String] = Option(batchRequest.getName).orElse(
-    normalizedConf.get(KyuubiConf.SESSION_NAME.key))
+    optimizedConf.get(KyuubiConf.SESSION_NAME.key))
 
   // whether the resource file is from uploading
   private[kyuubi] val isResourceUploaded: Boolean = batchRequest.getConf
@@ -88,7 +101,7 @@ class KyuubiBatchSessionImpl(
       name.orNull,
       batchRequest.getResource,
       batchRequest.getClassName,
-      normalizedConf,
+      optimizedConf,
       batchRequest.getArgs.asScala,
       recoveryMetadata)
 
@@ -115,7 +128,7 @@ class KyuubiBatchSessionImpl(
   override def checkSessionAccessPathURIs(): Unit = {
     KyuubiApplicationManager.checkApplicationAccessPaths(
       batchRequest.getBatchType,
-      normalizedConf,
+      optimizedConf,
       sessionManager.getConf)
     if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE
       && !isResourceUploaded) {
@@ -140,7 +153,7 @@ class KyuubiBatchSessionImpl(
         resource = batchRequest.getResource,
         className = batchRequest.getClassName,
         requestName = name.orNull,
-        requestConf = normalizedConf,
+        requestConf = optimizedConf,
         requestArgs = batchRequest.getArgs.asScala,
         createTime = createTime,
         engineType = batchRequest.getBatchType,

Reply via email to