This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 2b5dc8a2b [KYUUBI #4522] `use:catalog` should execute before than
`use:database`
2b5dc8a2b is described below
commit 2b5dc8a2b2f507086fb47647e6ca823082b104c5
Author: senmiaoliu <[email protected]>
AuthorDate: Tue Apr 4 10:56:43 2023 +0800
[KYUUBI #4522] `use:catalog` should execute before than `use:database`
### _Why are the changes needed?_
close #4522
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4648 from lsm1/fix/kyuubi_4522.
Closes #4522
e06046899 [senmiaoliu] use foreach
bd83d6623 [senmiaoliu] spilt narmalizedConf
4d8445aac [senmiaoliu] avoid sort
eda34d480 [senmiaoliu] use catalog first
Authored-by: senmiaoliu <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit f0796ec0783b386ef3530da73ac744e375986ca5)
Signed-off-by: Cheng Pan <[email protected]>
---
.../engine/flink/session/FlinkSessionImpl.scala | 45 +++++++++++++---------
.../engine/spark/session/SparkSessionImpl.scala | 41 ++++++++++++--------
.../engine/trino/session/TrinoSessionImpl.scala | 8 +++-
3 files changed, 58 insertions(+), 36 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index 03d9ce42e..953cd7302 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -53,25 +53,34 @@ class FlinkSessionImpl(
override def open(): Unit = {
executor.openSession(handle.identifier.toString)
- normalizedConf.foreach {
- case ("use:catalog", catalog) =>
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
- try {
- tableEnv.useCatalog(catalog)
- } catch {
- case NonFatal(e) =>
+
+ val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition {
case (k, _) =>
+ Array("use:catalog", "use:database").contains(k)
+ }
+
+ useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
+ val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+ try {
+ tableEnv.useCatalog(catalog)
+ } catch {
+ case NonFatal(e) =>
+ throw e
+ }
+ }
+
+ useCatalogAndDatabaseConf.get("use:database").foreach { database =>
+ val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+ try {
+ tableEnv.useDatabase(database)
+ } catch {
+ case NonFatal(e) =>
+ if (database != "default") {
throw e
- }
- case ("use:database", database) =>
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
- try {
- tableEnv.useDatabase(database)
- } catch {
- case NonFatal(e) =>
- if (database != "default") {
- throw e
- }
- }
+ }
+ }
+ }
+
+ otherConf.foreach {
case (key, value) => setModifiableConfig(key, value)
}
super.open()
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 78164ff5f..96fc43e85 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -54,22 +54,31 @@ class SparkSessionImpl(
private val sessionEvent = SessionEvent(this)
override def open(): Unit = {
- normalizedConf.foreach {
- case ("use:catalog", catalog) =>
- try {
- SparkCatalogShim().setCurrentCatalog(spark, catalog)
- } catch {
- case e if e.getMessage.contains("Cannot find catalog plugin class
for catalog") =>
- warn(e.getMessage())
- }
- case ("use:database", database) =>
- try {
- SparkCatalogShim().setCurrentDatabase(spark, database)
- } catch {
- case e
- if database == "default" && e.getMessage != null &&
- e.getMessage.contains("not found") =>
- }
+
+ val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition {
case (k, _) =>
+ Array("use:catalog", "use:database").contains(k)
+ }
+
+ useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
+ try {
+ SparkCatalogShim().setCurrentCatalog(spark, catalog)
+ } catch {
+ case e if e.getMessage.contains("Cannot find catalog plugin class for
catalog") =>
+ warn(e.getMessage())
+ }
+ }
+
+ useCatalogAndDatabaseConf.get("use:database").foreach { database =>
+ try {
+ SparkCatalogShim().setCurrentDatabase(spark, database)
+ } catch {
+ case e
+ if database == "default" && e.getMessage != null &&
+ e.getMessage.contains("not found") =>
+ }
+ }
+
+ otherConf.foreach {
case (key, value) => setModifiableConfig(key, value)
}
KDFRegistry.registerAll(spark)
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
index a19d74d58..6bd3b6dae 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
@@ -53,10 +53,14 @@ class TrinoSessionImpl(
private val sessionEvent = TrinoSessionEvent(this)
override def open(): Unit = {
- normalizedConf.foreach {
+
+ val (useCatalogAndDatabaseConf, _) = normalizedConf.partition { case (k,
_) =>
+ Array("use:catalog", "use:database").contains(k)
+ }
+
+ useCatalogAndDatabaseConf.foreach {
case ("use:catalog", catalog) => catalogName = catalog
case ("use:database", database) => databaseName = database
- case _ => // do nothing
}
val httpClient = new OkHttpClient.Builder().build()