This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cbb4e7da692 [SPARK-39646][SQL] Make setCurrentDatabase compatible with
3 layer namespace
cbb4e7da692 is described below
commit cbb4e7da6924f62a6b5272a5684faac9f132fcfd
Author: Zach Schuermann <[email protected]>
AuthorDate: Sat Jul 2 08:26:40 2022 +0800
[SPARK-39646][SQL] Make setCurrentDatabase compatible with 3 layer namespace
### What changes were proposed in this pull request?
Change `setCurrentDatabase` catalog API to support 3 layer namespace. We
use `sparkSession.sessionState.catalogManager.currentNamespace` for the
currentDatabase now.
### Why are the changes needed?
`setCurrentDatabase` doesn't support 3 layer namespace.
### Does this PR introduce _any_ user-facing change?
Yes. This PR introduces a backwards-compatible API change to support 3
layer namespace (e.g. catalog.database.table) for `setCurrentDatabase`.
### How was this patch tested?
UT
Closes #36969 from schuermannator/3l-setCurrentDatabse.
Authored-by: Zach Schuermann <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
R/pkg/tests/fulltests/test_sparkSQL.R | 4 +--
.../apache/spark/sql/internal/CatalogImpl.scala | 9 ++++---
.../apache/spark/sql/internal/CatalogSuite.scala | 29 ++++++++++++++++++++++
3 files changed, 37 insertions(+), 5 deletions(-)
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 2acb7a9ceba..0f984d0022a 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -4015,8 +4015,8 @@ test_that("catalog APIs, currentDatabase,
setCurrentDatabase, listDatabases", {
expect_equal(currentDatabase(), "default")
expect_error(setCurrentDatabase("default"), NA)
expect_error(setCurrentDatabase("zxwtyswklpf"),
- paste0("Error in setCurrentDatabase : analysis error - Database
",
- "'zxwtyswklpf' does not exist"))
+ paste0("Error in setCurrentDatabase : no such database -
Database ",
+ "'zxwtyswklpf' not found"))
dbs <- collect(listDatabases())
expect_equal(names(dbs), c("name", "catalog", "description", "locationUri"))
expect_equal(which(dbs[, 1] == "default"), 1)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 742ca5ccb1e..c276fbb677c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -59,15 +59,18 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
/**
* Returns the current default database in this session.
*/
- override def currentDatabase: String = sessionCatalog.getCurrentDatabase
+ override def currentDatabase: String =
+ sparkSession.sessionState.catalogManager.currentNamespace.toSeq.quoted
/**
* Sets the current default database in this session.
*/
@throws[AnalysisException]("database does not exist")
override def setCurrentDatabase(dbName: String): Unit = {
- requireDatabaseExists(dbName)
- sessionCatalog.setCurrentDatabase(dbName)
+ // we assume dbName will not include the catalog prefix. e.g. if you call
+ // setCurrentDatabase("catalog.db") it will search for a database
catalog.db in the catalog.
+ val ident =
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+ sparkSession.sessionState.catalogManager.setCurrentNamespace(ident.toArray)
}
/**
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 6e6138c91dd..7e4933b3407 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -866,4 +866,33 @@ class CatalogSuite extends SharedSparkSession with
AnalysisTest with BeforeAndAf
sql(s"CREATE NAMESPACE $qualified")
assert(spark.catalog.getDatabase(qualified).name === db)
}
+
+ test("three layer namespace compatibility - set current database") {
+ spark.catalog.setCurrentCatalog("testcat")
+ // namespace with the same name as catalog
+ sql("CREATE NAMESPACE testcat.testcat.my_db")
+ spark.catalog.setCurrentDatabase("testcat.my_db")
+ assert(spark.catalog.currentDatabase == "testcat.my_db")
+ // sessionCatalog still reports 'default' as current database
+ assert(sessionCatalog.getCurrentDatabase == "default")
+ val e = intercept[AnalysisException] {
+ spark.catalog.setCurrentDatabase("my_db")
+ }.getMessage
+ assert(e.contains("my_db"))
+
+ // check that we can fall back to old sessionCatalog
+ createDatabase("hive_db")
+ val err = intercept[AnalysisException] {
+ spark.catalog.setCurrentDatabase("hive_db")
+ }.getMessage
+ assert(err.contains("hive_db"))
+ spark.catalog.setCurrentCatalog("spark_catalog")
+ spark.catalog.setCurrentDatabase("hive_db")
+ assert(spark.catalog.currentDatabase == "hive_db")
+ assert(sessionCatalog.getCurrentDatabase == "hive_db")
+ val e3 = intercept[AnalysisException] {
+ spark.catalog.setCurrentDatabase("unknown_db")
+ }.getMessage
+ assert(e3.contains("unknown_db"))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]