This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a0c49e791 [KYUUBI #7138] Respect 
`kyuubi.session.engine.spark.initialize.sql` set by cllient in shared engine 
mode
9a0c49e791 is described below

commit 9a0c49e79135cd90368986176591a80d29634231
Author: liangzhaoyuan <[email protected]>
AuthorDate: Mon Aug 4 14:46:05 2025 +0800

    [KYUUBI #7138] Respect `kyuubi.session.engine.spark.initialize.sql` set by 
cllient in shared engine mode
    
    ### Why are the changes needed?
    <img width="1860" height="908" alt="image" 
src="https://github.com/user-attachments/assets/ec445237-be62-405f-992e-56e10156407f";
 />
    **Current Behavior:**
    
    When "kyuubi.engine.share.level = USER/GROUP/SERVER", the first client 
(Client A) calling openSession creates a Kyuubi-Spark-SQL-Engine (Spark 
Driver), where the initialization SQL configured in 
"kyuubi.session.engine.spark.initialize.sql" takes effect.
    
    Subsequent clients (e.g., Client B) connecting via openSession will reuse 
the existing Kyuubi-Spark-SQL-Engine (Spark Driver) created in step 1, where 
the initialization SQL configured in 
"kyuubi.session.engine.spark.initialize.sql" becomes ineffective.
    
    **Why This Capability Is Needed:**
    
    Currently, kyuubi.session.engine.spark.initialize.sql only applies to the 
first openSession client. All subsequent SQL operations inherit the 
initialization SQL configuration from the first client (this appears to be a 
potential bug).
    
    Client A may need to set "USE dbA" in its current SQL context, while Client 
B may need "USE dbB" in its own context - such scenarios should be supported.
    
    ### How was this patch tested?
    Tested on local Kyuubi/Spark cluster. No existing unit tests cover this 
scenario. Please point me to any relevant tests so I can add them
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #7138 from 1358035421/lc/spark_session_init_sql.
    
    Closes #7138
    
    338d8aace [Cheng Pan] remove dash
    1beecc456 [Cheng Pan] fix
    6c7f9a13e [liangzhaoyuan] update migration-guide.md
    492adb6c4 [liangzhaoyuan] fix review comments
    f0e9320be [1358035421] Merge branch 'master' into lc/spark_session_init_sql
    021455322 [liangzhaoyuan] update migration-guide.md
    b4e61cf89 [liangzhaoyuan] ut
    ca4c71253 [Cheng Pan] Update 
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
    da92544f1 [liangzhaoyuan] fix
    c1a38d584 [liangzhaoyuan] Support executing 
kyuubi.session.engine.spark.initialize.sql on session initialization
    
    Lead-authored-by: liangzhaoyuan <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Co-authored-by: 1358035421 <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/deployment/migration-guide.md                 |  2 +
 .../spark/session/SparkSQLSessionManager.scala     | 19 ++++---
 .../session/MultiSessionSuiteInitSQLSuite.scala    | 66 ++++++++++++++++++++++
 3 files changed, 80 insertions(+), 7 deletions(-)

diff --git a/docs/deployment/migration-guide.md 
b/docs/deployment/migration-guide.md
index 359ab26bce..ff6449ee68 100644
--- a/docs/deployment/migration-guide.md
+++ b/docs/deployment/migration-guide.md
@@ -23,6 +23,8 @@
 
 * Since Kyuubi 1.11, if the engine is running in cluster mode, Kyuubi will 
respect the `kyuubi.session.engine.startup.waitCompletion` config to determine 
whether to wait for the engine completion or not. If the engine is running in 
client mode, Kyuubi will always wait for the engine completion. And for Spark 
engine, Kyuubi will append the `spark.yarn.submit.waitAppCompletion` and 
`spark.kubernetes.submission.waitAppCompletion` configs to the engine conf 
based on the value of `kyuubi.sess [...]
 
+* Since Kyuubi 1.11, the configuration 
`kyuubi.session.engine.spark.initialize.sql` set by the client (via session 
configuration) is now correctly applied to every session in shared engines 
(USER, GROUP, SERVER). Previously, only the value set on the server side was 
applied and only for the first session when the engine started. Now, 
session-level settings provided by each client are respected.
+
 ## Upgrading from Kyuubi 1.9 to 1.10
 
 * Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the 
future, please use `kyuubi-beeline` instead.
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 7144188a4d..a4048b78b9 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -105,7 +105,7 @@ class SparkSQLSessionManager private (name: String, spark: 
SparkSession)
     userIsolatedSparkSessionThread.foreach(_.shutdown())
   }
 
-  private def getOrNewSparkSession(user: String): SparkSession = {
+  private def getOrNewSparkSession(user: String, sessionConf: Map[String, 
String]): SparkSession = {
     if (singleSparkSession) {
       spark
     } else {
@@ -113,8 +113,8 @@ class SparkSQLSessionManager private (name: String, spark: 
SparkSession)
         // it's unnecessary to create a new spark session in connection share 
level
         // since the session is only one
         case CONNECTION => spark
-        case USER => newSparkSession(spark)
-        case GROUP | SERVER if userIsolatedSparkSession => 
newSparkSession(spark)
+        case USER => newSparkSession(spark, sessionConf)
+        case GROUP | SERVER if userIsolatedSparkSession => 
newSparkSession(spark, sessionConf)
         case GROUP | SERVER =>
           userIsolatedCacheLock.synchronized {
             if (userIsolatedCache.containsKey(user)) {
@@ -123,7 +123,7 @@ class SparkSQLSessionManager private (name: String, spark: 
SparkSession)
               userIsolatedCache.get(user)
             } else {
               userIsolatedCacheCount.put(user, (1, System.currentTimeMillis()))
-              val newSession = newSparkSession(spark)
+              val newSession = newSparkSession(spark, sessionConf)
               userIsolatedCache.put(user, newSession)
               newSession
             }
@@ -132,11 +132,16 @@ class SparkSQLSessionManager private (name: String, 
spark: SparkSession)
     }
   }
 
-  private def newSparkSession(rootSparkSession: SparkSession): SparkSession = {
+  private def newSparkSession(
+      rootSparkSession: SparkSession,
+      sessionConf: Map[String, String]): SparkSession = {
     val newSparkSession = rootSparkSession.newSession()
     KyuubiSparkUtil.initializeSparkSession(
       newSparkSession,
-      conf.get(ENGINE_SESSION_SPARK_INITIALIZE_SQL))
+      sessionConf.get(ENGINE_SESSION_SPARK_INITIALIZE_SQL.key)
+        .filter(_.nonEmpty)
+        .map(_.split(";").toSeq)
+        .getOrElse(conf.get(ENGINE_SESSION_SPARK_INITIALIZE_SQL)))
     newSparkSession
   }
 
@@ -150,7 +155,7 @@ class SparkSQLSessionManager private (name: String, spark: 
SparkSession)
       getSessionOption).getOrElse {
       val sparkSession =
         try {
-          getOrNewSparkSession(user)
+          getOrNewSparkSession(user, conf)
         } catch {
           case e: Exception => throw KyuubiSQLException(e)
         }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/MultiSessionSuiteInitSQLSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/MultiSessionSuiteInitSQLSuite.scala
new file mode 100644
index 0000000000..f7b91ca315
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/MultiSessionSuiteInitSQLSuite.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.session
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TExecuteStatementReq, 
TFetchResultsReq, TOpenSessionReq}
+
+class MultiSessionSuiteInitSQLSuite extends WithSparkSQLEngine with 
HiveJDBCTestHelper {
+
+  override def withKyuubiConf: Map[String, String] = {
+    Map(
+      ENGINE_SHARE_LEVEL.key -> "SERVER",
+      ENGINE_SINGLE_SPARK_SESSION.key -> "false")
+  }
+
+  override protected def jdbcUrl: String =
+    
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;#spark.ui.enabled=false"
+
+  test("isolated user spark session") {
+    Seq("abc", "xyz").foreach { value =>
+      withThriftClient(Some(user)) { client =>
+        val req = new TOpenSessionReq()
+        req.setUsername("user")
+        req.setPassword("anonymous")
+        req.setConfiguration(Map(
+          ENGINE_SHARE_LEVEL.key -> "SERVER",
+          ENGINE_SINGLE_SPARK_SESSION.key -> "false",
+          ENGINE_SESSION_SPARK_INITIALIZE_SQL.key -> s"SET 
varA=$value").asJava)
+        val tOpenSessionResp = client.OpenSession(req)
+        val tExecuteStatementReq = new TExecuteStatementReq()
+        
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
+        tExecuteStatementReq.setStatement("SELECT '${varA}'")
+        tExecuteStatementReq.setRunAsync(false)
+        val tExecuteStatementResp = 
client.ExecuteStatement(tExecuteStatementReq)
+
+        val operationHandle = tExecuteStatementResp.getOperationHandle
+        val tFetchResultsReq = new TFetchResultsReq()
+        tFetchResultsReq.setOperationHandle(operationHandle)
+        tFetchResultsReq.setFetchType(0)
+        tFetchResultsReq.setMaxRows(1)
+        val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
+        val ret = 
tFetchResultsResp.getResults.getColumns.get(0).getStringVal.getValues.get(0)
+        assert(ret === value)
+      }
+    }
+  }
+}

Reply via email to