This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b0d297c6d13 [SPARK-39645][SQL] Make getDatabase and listDatabases
compatible with 3 layer namespace
b0d297c6d13 is described below
commit b0d297c6d13715108022b5cbdcdeefeb17551678
Author: Zach Schuermann <[email protected]>
AuthorDate: Fri Jul 1 09:41:36 2022 +0800
[SPARK-39645][SQL] Make getDatabase and listDatabases compatible with 3
layer namespace
### What changes were proposed in this pull request?
Change `getDatabase` and `listDatabases` catalog API to support 3 layer
namespace. If the database exists in the sessionCatalog, we return that.
Otherwise, parse the name as 3 layer name and use V2 catalog. Furthermore,
`Database` class is augmented with `catalog` (a nullable String). The original
constructor is retained for backwards compatibilty.
### Why are the changes needed?
`getDatabase`/`listDatabases` don't support 3 layer namespace.
### Does this PR introduce _any_ user-facing change?
Yes. This PR introduces a backwards-compatible API change to support 3
layer namespace (e.g. `catalog.database.table`). Additionally, the `Database`
type includes a new field `catalog`.
### How was this patch tested?
UT
Closes #36968 from schuermannator/3l-catalog.
Authored-by: Zach Schuermann <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
R/pkg/tests/fulltests/test_sparkSQL.R | 2 +-
.../org/apache/spark/sql/catalog/interface.scala | 7 +++
.../apache/spark/sql/internal/CatalogImpl.scala | 43 +++++++++++++++--
.../apache/spark/sql/internal/CatalogSuite.scala | 54 +++++++++++++++++++++-
4 files changed, 98 insertions(+), 8 deletions(-)
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R
b/R/pkg/tests/fulltests/test_sparkSQL.R
index f0abc96613d..2acb7a9ceba 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -4018,7 +4018,7 @@ test_that("catalog APIs, currentDatabase,
setCurrentDatabase, listDatabases", {
paste0("Error in setCurrentDatabase : analysis error - Database
",
"'zxwtyswklpf' does not exist"))
dbs <- collect(listDatabases())
- expect_equal(names(dbs), c("name", "description", "locationUri"))
+ expect_equal(names(dbs), c("name", "catalog", "description", "locationUri"))
expect_equal(which(dbs[, 1] == "default"), 1)
})
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
index 84839d2d1fd..1f6cb678f1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
@@ -49,6 +49,7 @@ class CatalogMetadata(
* A database in Spark, as returned by the `listDatabases` method defined in
[[Catalog]].
*
* @param name name of the database.
+ * @param catalog name of the catalog that the table belongs to.
* @param description description of the database.
* @param locationUri path (in the form of a uri) to data files.
* @since 2.0.0
@@ -56,13 +57,19 @@ class CatalogMetadata(
@Stable
class Database(
val name: String,
+ @Nullable val catalog: String,
@Nullable val description: String,
val locationUri: String)
extends DefinedByConstructorParams {
+ def this(name: String, description: String, locationUri: String) = {
+ this(name, null, description, locationUri)
+ }
+
override def toString: String = {
"Database[" +
s"name='$name', " +
+ Option(catalog).map { c => s"catalog='$c', " }.getOrElse("") +
Option(description).map { d => s"description='$d', " }.getOrElse("") +
s"path='$locationUri']"
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 98220f3b229..742ca5ccb1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -26,10 +26,10 @@ import
org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace,
ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedNamespace,
UnresolvedTable, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTable,
LocalRelation, RecoverPartitions, ShowTables, SubqueryAlias, TableSpec, View}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable,
LocalRelation, RecoverPartitions, ShowNamespaces, ShowTables, SubqueryAlias,
TableSpec, View}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier,
SupportsNamespaces, TableCatalog}
-import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper,
IdentifierHelper, TransformHelper}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin,
Identifier, SupportsNamespaces, TableCatalog}
+import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper,
IdentifierHelper, MultipartIdentifierHelper, TransformHelper}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -74,7 +74,11 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
* Returns a list of databases available across all sessions.
*/
override def listDatabases(): Dataset[Database] = {
- val databases = sessionCatalog.listDatabases().map(makeDatabase)
+ val catalog = currentCatalog()
+ val plan = ShowNamespaces(UnresolvedNamespace(Seq(catalog)), None)
+ val databases = sparkSession.sessionState.executePlan(plan).toRdd.collect()
+ .map(row => catalog + "." + row.getString(0))
+ .map(getDatabase)
CatalogImpl.makeDataset(databases, sparkSession)
}
@@ -297,7 +301,36 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
* `Database` can be found.
*/
override def getDatabase(dbName: String): Database = {
- makeDatabase(dbName)
+ // `dbName` could be either a single database name (behavior in Spark 3.3
and prior) or a
+ // qualified namespace with catalog name. To maintain backwards
compatibility, we first assume
+ // it's a single database name and return the database from sessionCatalog
if it exists.
+ // Otherwise we try 3-part name parsing and locate the database. If the
parased identifier
+ // contains both catalog name and database name, we then search the
database in the catalog.
+ if (sessionCatalog.databaseExists(dbName) ||
sessionCatalog.isGlobalTempViewDB(dbName)) {
+ makeDatabase(dbName)
+ } else {
+ val ident =
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+ val plan = UnresolvedNamespace(ident)
+ val resolved = sparkSession.sessionState.executePlan(plan).analyzed
+ resolved match {
+ case ResolvedNamespace(catalog: SupportsNamespaces, namespace) =>
+ val metadata = catalog.loadNamespaceMetadata(namespace.toArray)
+ new Database(
+ name = namespace.quoted,
+ catalog = catalog.name,
+ description = metadata.get(SupportsNamespaces.PROP_COMMENT),
+ locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION))
+ // similar to databaseExists: if the catalog doesn't support
namespaces, we assume it's an
+ // implicit namespace, which exists but has no metadata.
+ case ResolvedNamespace(catalog: CatalogPlugin, namespace) =>
+ new Database(
+ name = namespace.quoted,
+ catalog = catalog.name,
+ description = null,
+ locationUri = null)
+ case _ => new Database(name = dbName, description = null, locationUri
= null)
+ }
+ }
}
/**
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 07c21fff712..6e6138c91dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -159,6 +159,13 @@ class CatalogSuite extends SharedSparkSession with
AnalysisTest with BeforeAndAf
Set("default", "my_db2"))
}
+ test("list databases with current catalog") {
+ spark.catalog.setCurrentCatalog("testcat")
+ sql(s"CREATE NAMESPACE testcat.my_db")
+ sql(s"CREATE NAMESPACE testcat.my_db2")
+ assert(spark.catalog.listDatabases().collect().map(_.name).toSet ==
Set("my_db", "my_db2"))
+ }
+
test("list tables") {
assert(spark.catalog.listTables().collect().isEmpty)
createTable("my_table1")
@@ -356,7 +363,7 @@ class CatalogSuite extends SharedSparkSession with
AnalysisTest with BeforeAndAf
}
test("catalog classes format in Dataset.show") {
- val db = new Database("nama", "descripta", "locata")
+ val db = new Database("nama", "cataloa", "descripta", "locata")
val table = new Table("nama", "cataloa", Array("databasa"), "descripta",
"typa",
isTemporary = false)
val function = new Function("nama", "databasa", "descripta", "classa",
isTemporary = false)
@@ -366,7 +373,7 @@ class CatalogSuite extends SharedSparkSession with
AnalysisTest with BeforeAndAf
val tableFields = ScalaReflection.getConstructorParameterValues(table)
val functionFields =
ScalaReflection.getConstructorParameterValues(function)
val columnFields = ScalaReflection.getConstructorParameterValues(column)
- assert(dbFields == Seq("nama", "descripta", "locata"))
+ assert(dbFields == Seq("nama", "cataloa", "descripta", "locata"))
assert(Seq(tableFields(0), tableFields(1), tableFields(3), tableFields(4),
tableFields(5)) ==
Seq("nama", "cataloa", "descripta", "typa", false))
assert(tableFields(2).asInstanceOf[Array[String]].sameElements(Array("databasa")))
@@ -816,4 +823,47 @@ class CatalogSuite extends SharedSparkSession with
AnalysisTest with BeforeAndAf
assert(spark.table(tableName).collect().length == 0)
}
}
+
+ test("three layer namespace compatibility - get database") {
+ val catalogsAndDatabases =
+ Seq(("testcat", "somedb"), ("testcat", "ns.somedb"), ("spark_catalog",
"somedb"))
+ catalogsAndDatabases.foreach { case (catalog, dbName) =>
+ val qualifiedDb = s"$catalog.$dbName"
+ sql(s"CREATE NAMESPACE $qualifiedDb COMMENT '$qualifiedDb' LOCATION
'/test/location'")
+ val db = spark.catalog.getDatabase(qualifiedDb)
+ assert(db.name === dbName)
+ assert(db.description === qualifiedDb)
+ assert(db.locationUri === "file:/test/location")
+ }
+
+ // test without qualifier
+ val name = "testns"
+ sql(s"CREATE NAMESPACE testcat.$name COMMENT '$name'")
+ spark.catalog.setCurrentCatalog("testcat")
+ val db = spark.catalog.getDatabase(name)
+ assert(db.name === name)
+ assert(db.description === name)
+
+ intercept[AnalysisException](spark.catalog.getDatabase("randomcat.db10"))
+ }
+
+ test("three layer namespace compatibility - get database, same in hive and
testcat") {
+ // create 'testdb' in hive and testcat
+ val dbName = "testdb"
+ sql(s"CREATE NAMESPACE spark_catalog.$dbName COMMENT 'hive database'")
+ sql(s"CREATE NAMESPACE testcat.$dbName COMMENT 'testcat namespace'")
+ sql("SET CATALOG testcat")
+ // should still return the database in Hive
+ val db = spark.catalog.getDatabase(dbName)
+ assert(db.name === dbName)
+ assert(db.description === "hive database")
+ }
+
+ test("get database when there is `default` catalog") {
+ spark.conf.set("spark.sql.catalog.default",
classOf[InMemoryCatalog].getName)
+ val db = "testdb"
+ val qualified = s"default.$db"
+ sql(s"CREATE NAMESPACE $qualified")
+ assert(spark.catalog.getDatabase(qualified).name === db)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]