This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 9a0c49e791 [KYUUBI #7138] Respect
`kyuubi.session.engine.spark.initialize.sql` set by cllient in shared engine
mode
9a0c49e791 is described below
commit 9a0c49e79135cd90368986176591a80d29634231
Author: liangzhaoyuan <[email protected]>
AuthorDate: Mon Aug 4 14:46:05 2025 +0800
[KYUUBI #7138] Respect `kyuubi.session.engine.spark.initialize.sql` set by
cllient in shared engine mode
### Why are the changes needed?
<img width="1860" height="908" alt="image"
src="https://github.com/user-attachments/assets/ec445237-be62-405f-992e-56e10156407f"
/>
**Current Behavior:**
When "kyuubi.engine.share.level = USER/GROUP/SERVER", the first client
(Client A) calling openSession creates a Kyuubi-Spark-SQL-Engine (Spark
Driver), where the initialization SQL configured in
"kyuubi.session.engine.spark.initialize.sql" takes effect.
Subsequent clients (e.g., Client B) connecting via openSession will reuse
the existing Kyuubi-Spark-SQL-Engine (Spark Driver) created in step 1, where
the initialization SQL configured in
"kyuubi.session.engine.spark.initialize.sql" becomes ineffective.
**Why This Capability Is Needed:**
Currently, kyuubi.session.engine.spark.initialize.sql only applies to the
first openSession client. All subsequent SQL operations inherit the
initialization SQL configuration from the first client (this appears to be a
potential bug).
Client A may need to set "USE dbA" in its current SQL context, while Client
B may need "USE dbB" in its own context - such scenarios should be supported.
### How was this patch tested?
Tested on local Kyuubi/Spark cluster. No existing unit tests cover this
scenario. Please point me to any relevant tests so I can add them
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #7138 from 1358035421/lc/spark_session_init_sql.
Closes #7138
338d8aace [Cheng Pan] remove dash
1beecc456 [Cheng Pan] fix
6c7f9a13e [liangzhaoyuan] update migration-guide.md
492adb6c4 [liangzhaoyuan] fix review comments
f0e9320be [1358035421] Merge branch 'master' into lc/spark_session_init_sql
021455322 [liangzhaoyuan] update migration-guide.md
b4e61cf89 [liangzhaoyuan] ut
ca4c71253 [Cheng Pan] Update
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
da92544f1 [liangzhaoyuan] fix
c1a38d584 [liangzhaoyuan] Support executing
kyuubi.session.engine.spark.initialize.sql on session initialization
Lead-authored-by: liangzhaoyuan <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Co-authored-by: 1358035421 <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/migration-guide.md | 2 +
.../spark/session/SparkSQLSessionManager.scala | 19 ++++---
.../session/MultiSessionSuiteInitSQLSuite.scala | 66 ++++++++++++++++++++++
3 files changed, 80 insertions(+), 7 deletions(-)
diff --git a/docs/deployment/migration-guide.md
b/docs/deployment/migration-guide.md
index 359ab26bce..ff6449ee68 100644
--- a/docs/deployment/migration-guide.md
+++ b/docs/deployment/migration-guide.md
@@ -23,6 +23,8 @@
* Since Kyuubi 1.11, if the engine is running in cluster mode, Kyuubi will
respect the `kyuubi.session.engine.startup.waitCompletion` config to determine
whether to wait for the engine completion or not. If the engine is running in
client mode, Kyuubi will always wait for the engine completion. And for Spark
engine, Kyuubi will append the `spark.yarn.submit.waitAppCompletion` and
`spark.kubernetes.submission.waitAppCompletion` configs to the engine conf
based on the value of `kyuubi.sess [...]
+* Since Kyuubi 1.11, the configuration
`kyuubi.session.engine.spark.initialize.sql` set by the client (via session
configuration) is now correctly applied to every session in shared engines
(USER, GROUP, SERVER). Previously, only the value set on the server side was
applied and only for the first session when the engine started. Now,
session-level settings provided by each client are respected.
+
## Upgrading from Kyuubi 1.9 to 1.10
* Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the
future, please use `kyuubi-beeline` instead.
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 7144188a4d..a4048b78b9 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -105,7 +105,7 @@ class SparkSQLSessionManager private (name: String, spark:
SparkSession)
userIsolatedSparkSessionThread.foreach(_.shutdown())
}
- private def getOrNewSparkSession(user: String): SparkSession = {
+ private def getOrNewSparkSession(user: String, sessionConf: Map[String,
String]): SparkSession = {
if (singleSparkSession) {
spark
} else {
@@ -113,8 +113,8 @@ class SparkSQLSessionManager private (name: String, spark:
SparkSession)
// it's unnecessary to create a new spark session in connection share
level
// since the session is only one
case CONNECTION => spark
- case USER => newSparkSession(spark)
- case GROUP | SERVER if userIsolatedSparkSession =>
newSparkSession(spark)
+ case USER => newSparkSession(spark, sessionConf)
+ case GROUP | SERVER if userIsolatedSparkSession =>
newSparkSession(spark, sessionConf)
case GROUP | SERVER =>
userIsolatedCacheLock.synchronized {
if (userIsolatedCache.containsKey(user)) {
@@ -123,7 +123,7 @@ class SparkSQLSessionManager private (name: String, spark:
SparkSession)
userIsolatedCache.get(user)
} else {
userIsolatedCacheCount.put(user, (1, System.currentTimeMillis()))
- val newSession = newSparkSession(spark)
+ val newSession = newSparkSession(spark, sessionConf)
userIsolatedCache.put(user, newSession)
newSession
}
@@ -132,11 +132,16 @@ class SparkSQLSessionManager private (name: String,
spark: SparkSession)
}
}
- private def newSparkSession(rootSparkSession: SparkSession): SparkSession = {
+ private def newSparkSession(
+ rootSparkSession: SparkSession,
+ sessionConf: Map[String, String]): SparkSession = {
val newSparkSession = rootSparkSession.newSession()
KyuubiSparkUtil.initializeSparkSession(
newSparkSession,
- conf.get(ENGINE_SESSION_SPARK_INITIALIZE_SQL))
+ sessionConf.get(ENGINE_SESSION_SPARK_INITIALIZE_SQL.key)
+ .filter(_.nonEmpty)
+ .map(_.split(";").toSeq)
+ .getOrElse(conf.get(ENGINE_SESSION_SPARK_INITIALIZE_SQL)))
newSparkSession
}
@@ -150,7 +155,7 @@ class SparkSQLSessionManager private (name: String, spark:
SparkSession)
getSessionOption).getOrElse {
val sparkSession =
try {
- getOrNewSparkSession(user)
+ getOrNewSparkSession(user, conf)
} catch {
case e: Exception => throw KyuubiSQLException(e)
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/MultiSessionSuiteInitSQLSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/MultiSessionSuiteInitSQLSuite.scala
new file mode 100644
index 0000000000..f7b91ca315
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/MultiSessionSuiteInitSQLSuite.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.session
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TExecuteStatementReq,
TFetchResultsReq, TOpenSessionReq}
+
+class MultiSessionSuiteInitSQLSuite extends WithSparkSQLEngine with
HiveJDBCTestHelper {
+
+ override def withKyuubiConf: Map[String, String] = {
+ Map(
+ ENGINE_SHARE_LEVEL.key -> "SERVER",
+ ENGINE_SINGLE_SPARK_SESSION.key -> "false")
+ }
+
+ override protected def jdbcUrl: String =
+
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;#spark.ui.enabled=false"
+
+ test("isolated user spark session") {
+ Seq("abc", "xyz").foreach { value =>
+ withThriftClient(Some(user)) { client =>
+ val req = new TOpenSessionReq()
+ req.setUsername("user")
+ req.setPassword("anonymous")
+ req.setConfiguration(Map(
+ ENGINE_SHARE_LEVEL.key -> "SERVER",
+ ENGINE_SINGLE_SPARK_SESSION.key -> "false",
+ ENGINE_SESSION_SPARK_INITIALIZE_SQL.key -> s"SET
varA=$value").asJava)
+ val tOpenSessionResp = client.OpenSession(req)
+ val tExecuteStatementReq = new TExecuteStatementReq()
+
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
+ tExecuteStatementReq.setStatement("SELECT '${varA}'")
+ tExecuteStatementReq.setRunAsync(false)
+ val tExecuteStatementResp =
client.ExecuteStatement(tExecuteStatementReq)
+
+ val operationHandle = tExecuteStatementResp.getOperationHandle
+ val tFetchResultsReq = new TFetchResultsReq()
+ tFetchResultsReq.setOperationHandle(operationHandle)
+ tFetchResultsReq.setFetchType(0)
+ tFetchResultsReq.setMaxRows(1)
+ val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
+ val ret =
tFetchResultsResp.getResults.getColumns.get(0).getStringVal.getValues.get(0)
+ assert(ret === value)
+ }
+ }
+ }
+}