This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b0d297c6d13 [SPARK-39645][SQL] Make getDatabase and listDatabases 
compatible with 3 layer namespace
b0d297c6d13 is described below

commit b0d297c6d13715108022b5cbdcdeefeb17551678
Author: Zach Schuermann <[email protected]>
AuthorDate: Fri Jul 1 09:41:36 2022 +0800

    [SPARK-39645][SQL] Make getDatabase and listDatabases compatible with 3 
layer namespace
    
    ### What changes were proposed in this pull request?
    
    Change `getDatabase` and `listDatabases` catalog API to support 3 layer 
namespace. If the database exists in the sessionCatalog, we return that. 
Otherwise, parse the name as 3 layer name and use V2 catalog. Furthermore, 
`Database` class is augmented with `catalog` (a nullable String). The original 
constructor is retained for backwards compatibilty.
    
    ### Why are the changes needed?
    
    `getDatabase`/`listDatabases` don't support 3 layer namespace.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. This PR introduces a backwards-compatible API change to support 3 
layer namespace (e.g. `catalog.database.table`). Additionally, the `Database` 
type includes a new field `catalog`.
    
    ### How was this patch tested?
    
    UT
    
    Closes #36968 from schuermannator/3l-catalog.
    
    Authored-by: Zach Schuermann <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 R/pkg/tests/fulltests/test_sparkSQL.R              |  2 +-
 .../org/apache/spark/sql/catalog/interface.scala   |  7 +++
 .../apache/spark/sql/internal/CatalogImpl.scala    | 43 +++++++++++++++--
 .../apache/spark/sql/internal/CatalogSuite.scala   | 54 +++++++++++++++++++++-
 4 files changed, 98 insertions(+), 8 deletions(-)

diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index f0abc96613d..2acb7a9ceba 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -4018,7 +4018,7 @@ test_that("catalog APIs, currentDatabase, 
setCurrentDatabase, listDatabases", {
                paste0("Error in setCurrentDatabase : analysis error - Database 
",
                "'zxwtyswklpf' does not exist"))
   dbs <- collect(listDatabases())
-  expect_equal(names(dbs), c("name", "description", "locationUri"))
+  expect_equal(names(dbs), c("name", "catalog", "description", "locationUri"))
   expect_equal(which(dbs[, 1] == "default"), 1)
 })
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
index 84839d2d1fd..1f6cb678f1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
@@ -49,6 +49,7 @@ class CatalogMetadata(
  * A database in Spark, as returned by the `listDatabases` method defined in 
[[Catalog]].
  *
  * @param name name of the database.
+ * @param catalog name of the catalog that the table belongs to.
  * @param description description of the database.
  * @param locationUri path (in the form of a uri) to data files.
  * @since 2.0.0
@@ -56,13 +57,19 @@ class CatalogMetadata(
 @Stable
 class Database(
     val name: String,
+    @Nullable val catalog: String,
     @Nullable val description: String,
     val locationUri: String)
   extends DefinedByConstructorParams {
 
+  def this(name: String, description: String, locationUri: String) = {
+    this(name, null, description, locationUri)
+  }
+
   override def toString: String = {
     "Database[" +
       s"name='$name', " +
+      Option(catalog).map { c => s"catalog='$c', " }.getOrElse("") +
       Option(description).map { d => s"description='$d', " }.getOrElse("") +
       s"path='$locationUri']"
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 98220f3b229..742ca5ccb1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -26,10 +26,10 @@ import 
org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti
 import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, 
ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedNamespace, 
UnresolvedTable, UnresolvedTableOrView}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, 
LocalRelation, RecoverPartitions, ShowTables, SubqueryAlias, TableSpec, View}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, 
LocalRelation, RecoverPartitions, ShowNamespaces, ShowTables, SubqueryAlias, 
TableSpec, View}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, 
SupportsNamespaces, TableCatalog}
-import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, 
IdentifierHelper, TransformHelper}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
Identifier, SupportsNamespaces, TableCatalog}
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, 
IdentifierHelper, MultipartIdentifierHelper, TransformHelper}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -74,7 +74,11 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * Returns a list of databases available across all sessions.
    */
   override def listDatabases(): Dataset[Database] = {
-    val databases = sessionCatalog.listDatabases().map(makeDatabase)
+    val catalog = currentCatalog()
+    val plan = ShowNamespaces(UnresolvedNamespace(Seq(catalog)), None)
+    val databases = sparkSession.sessionState.executePlan(plan).toRdd.collect()
+      .map(row => catalog + "." + row.getString(0))
+      .map(getDatabase)
     CatalogImpl.makeDataset(databases, sparkSession)
   }
 
@@ -297,7 +301,36 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * `Database` can be found.
    */
   override def getDatabase(dbName: String): Database = {
-    makeDatabase(dbName)
+    // `dbName` could be either a single database name (behavior in Spark 3.3 
and prior) or a
+    // qualified namespace with catalog name. To maintain backwards 
compatibility, we first assume
+    // it's a single database name and return the database from sessionCatalog 
if it exists.
+    // Otherwise we try 3-part name parsing and locate the database. If the 
parased identifier
+    // contains both catalog name and database name, we then search the 
database in the catalog.
+    if (sessionCatalog.databaseExists(dbName) || 
sessionCatalog.isGlobalTempViewDB(dbName)) {
+      makeDatabase(dbName)
+    } else {
+      val ident = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+      val plan = UnresolvedNamespace(ident)
+      val resolved = sparkSession.sessionState.executePlan(plan).analyzed
+      resolved match {
+        case ResolvedNamespace(catalog: SupportsNamespaces, namespace) =>
+          val metadata = catalog.loadNamespaceMetadata(namespace.toArray)
+          new Database(
+            name = namespace.quoted,
+            catalog = catalog.name,
+            description = metadata.get(SupportsNamespaces.PROP_COMMENT),
+            locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION))
+        // similar to databaseExists: if the catalog doesn't support 
namespaces, we assume it's an
+        // implicit namespace, which exists but has no metadata.
+        case ResolvedNamespace(catalog: CatalogPlugin, namespace) =>
+          new Database(
+            name = namespace.quoted,
+            catalog = catalog.name,
+            description = null,
+            locationUri = null)
+        case _ => new Database(name = dbName, description = null, locationUri 
= null)
+      }
+    }
   }
 
   /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 07c21fff712..6e6138c91dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -159,6 +159,13 @@ class CatalogSuite extends SharedSparkSession with 
AnalysisTest with BeforeAndAf
       Set("default", "my_db2"))
   }
 
+  test("list databases with current catalog") {
+    spark.catalog.setCurrentCatalog("testcat")
+    sql(s"CREATE NAMESPACE testcat.my_db")
+    sql(s"CREATE NAMESPACE testcat.my_db2")
+    assert(spark.catalog.listDatabases().collect().map(_.name).toSet == 
Set("my_db", "my_db2"))
+  }
+
   test("list tables") {
     assert(spark.catalog.listTables().collect().isEmpty)
     createTable("my_table1")
@@ -356,7 +363,7 @@ class CatalogSuite extends SharedSparkSession with 
AnalysisTest with BeforeAndAf
   }
 
   test("catalog classes format in Dataset.show") {
-    val db = new Database("nama", "descripta", "locata")
+    val db = new Database("nama", "cataloa", "descripta", "locata")
     val table = new Table("nama", "cataloa", Array("databasa"), "descripta", 
"typa",
       isTemporary = false)
     val function = new Function("nama", "databasa", "descripta", "classa", 
isTemporary = false)
@@ -366,7 +373,7 @@ class CatalogSuite extends SharedSparkSession with 
AnalysisTest with BeforeAndAf
     val tableFields = ScalaReflection.getConstructorParameterValues(table)
     val functionFields = 
ScalaReflection.getConstructorParameterValues(function)
     val columnFields = ScalaReflection.getConstructorParameterValues(column)
-    assert(dbFields == Seq("nama", "descripta", "locata"))
+    assert(dbFields == Seq("nama", "cataloa", "descripta", "locata"))
     assert(Seq(tableFields(0), tableFields(1), tableFields(3), tableFields(4), 
tableFields(5)) ==
       Seq("nama", "cataloa", "descripta", "typa", false))
     
assert(tableFields(2).asInstanceOf[Array[String]].sameElements(Array("databasa")))
@@ -816,4 +823,47 @@ class CatalogSuite extends SharedSparkSession with 
AnalysisTest with BeforeAndAf
       assert(spark.table(tableName).collect().length == 0)
     }
   }
+
+  test("three layer namespace compatibility - get database") {
+    val catalogsAndDatabases =
+      Seq(("testcat", "somedb"), ("testcat", "ns.somedb"), ("spark_catalog", 
"somedb"))
+    catalogsAndDatabases.foreach { case (catalog, dbName) =>
+      val qualifiedDb = s"$catalog.$dbName"
+      sql(s"CREATE NAMESPACE $qualifiedDb COMMENT '$qualifiedDb' LOCATION 
'/test/location'")
+      val db = spark.catalog.getDatabase(qualifiedDb)
+      assert(db.name === dbName)
+      assert(db.description === qualifiedDb)
+      assert(db.locationUri === "file:/test/location")
+    }
+
+    // test without qualifier
+    val name = "testns"
+    sql(s"CREATE NAMESPACE testcat.$name COMMENT '$name'")
+    spark.catalog.setCurrentCatalog("testcat")
+    val db = spark.catalog.getDatabase(name)
+    assert(db.name === name)
+    assert(db.description === name)
+
+    intercept[AnalysisException](spark.catalog.getDatabase("randomcat.db10"))
+  }
+
+  test("three layer namespace compatibility - get database, same in hive and 
testcat") {
+    // create 'testdb' in hive and testcat
+    val dbName = "testdb"
+    sql(s"CREATE NAMESPACE spark_catalog.$dbName COMMENT 'hive database'")
+    sql(s"CREATE NAMESPACE testcat.$dbName COMMENT 'testcat namespace'")
+    sql("SET CATALOG testcat")
+    // should still return the database in Hive
+    val db = spark.catalog.getDatabase(dbName)
+    assert(db.name === dbName)
+    assert(db.description === "hive database")
+  }
+
+  test("get database when there is `default` catalog") {
+    spark.conf.set("spark.sql.catalog.default", 
classOf[InMemoryCatalog].getName)
+    val db = "testdb"
+    val qualified = s"default.$db"
+    sql(s"CREATE NAMESPACE $qualified")
+    assert(spark.catalog.getDatabase(qualified).name === db)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to