This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 52fe51b6e9b3 [SPARK-54112][CONNECT] Support getSchemas for
SparkConnectDatabaseMetaData
52fe51b6e9b3 is described below
commit 52fe51b6e9b366f4fdf108a5563a5a284c0bf4da
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Nov 10 10:04:33 2025 -0800
[SPARK-54112][CONNECT] Support getSchemas for SparkConnectDatabaseMetaData
### What changes were proposed in this pull request?
Implement `getSchemas` methods defined in `java.sql.DatabaseMetaData` for
`SparkConnectDatabaseMetaData`.
```java
/**
* Retrieves the schema names available in this database. The results
* are ordered by {code TABLE_CATALOG} and
* {code TABLE_SCHEM}.
*
* <P>The schema columns are:
* <OL>
* <LI><B>TABLE_SCHEM</B> String {code =>} schema name
* <LI><B>TABLE_CATALOG</B> String {code =>} catalog name (may be
{code null})
* </OL>
*
* return a {code ResultSet} object in which each row is a
* schema description
* throws SQLException if a database access error occurs
*
*/
ResultSet getSchemas() throws SQLException;
/**
* Retrieves the schema names available in this database. The results
* are ordered by {code TABLE_CATALOG} and
* {code TABLE_SCHEM}.
*
* <P>The schema columns are:
* <OL>
* <LI><B>TABLE_SCHEM</B> String {code =>} schema name
* <LI><B>TABLE_CATALOG</B> String {code =>} catalog name (may be
{code null})
* </OL>
*
*
* param catalog a catalog name; must match the catalog name as it is
stored
* in the database;"" retrieves those without a catalog; null means
catalog
* name should not be used to narrow down the search.
* param schemaPattern a schema name; must match the schema name as it
is
* stored in the database; null means
* schema name should not be used to narrow down the search.
* return a {code ResultSet} object in which each row is a
* schema description
* throws SQLException if a database access error occurs
* see #getSearchStringEscape
* since 1.6
*/
ResultSet getSchemas(String catalog, String schemaPattern) throws
SQLException;
```
### Why are the changes needed?
Enhance API coverage of the Connect JDBC driver, for example,
`get[Catalogs|Schemas|Tables|...]` APIs are used by SQL GUI tools such as
DBeaver for displaying the tree category.
### Does this PR introduce _any_ user-facing change?
No, Connect JDBC driver is a new feature under development.
### How was this patch tested?
New UT is added, also tested via DBeaver - the catalog/schema tree works
now.
<img width="1260" height="892" alt="Xnip2025-11-01_01-33-38"
src="https://github.com/user-attachments/assets/ca678627-e07c-430a-9750-e7ea1d69aecf"
/>
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52819 from pan3793/SPARK-54112.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../client/jdbc/SparkConnectDatabaseMetaData.scala | 67 +++++++++++--
.../jdbc/SparkConnectDatabaseMetaDataSuite.scala | 108 +++++++++++++++++++++
.../apache/spark/sql/connect/test/SQLHelper.scala | 12 +++
3 files changed, 181 insertions(+), 6 deletions(-)
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
index 215c8256acbc..13dd4d57662b 100644
---
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
@@ -20,7 +20,11 @@ package org.apache.spark.sql.connect.client.jdbc
import java.sql.{Array => _, _}
import org.apache.spark.SparkBuildInfo.{spark_version => SPARK_VERSION}
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.util.QuotingUtils._
+import org.apache.spark.sql.connect
import org.apache.spark.sql.connect.client.jdbc.SparkConnectDatabaseMetaData._
+import org.apache.spark.sql.functions._
import org.apache.spark.util.VersionUtils
class SparkConnectDatabaseMetaData(conn: SparkConnectConnection) extends
DatabaseMetaData {
@@ -97,8 +101,7 @@ class SparkConnectDatabaseMetaData(conn:
SparkConnectConnection) extends Databas
override def getTimeDateFunctions: String =
throw new SQLFeatureNotSupportedException
- override def getSearchStringEscape: String =
- throw new SQLFeatureNotSupportedException
+ override def getSearchStringEscape: String = "\\"
override def getExtraNameCharacters: String = ""
@@ -277,6 +280,9 @@ class SparkConnectDatabaseMetaData(conn:
SparkConnectConnection) extends Databas
override def dataDefinitionIgnoredInTransactions: Boolean = false
+ private def isNullOrWildcard(pattern: String): Boolean =
+ pattern == null || pattern == "%"
+
override def getProcedures(
catalog: String,
schemaPattern: String,
@@ -299,11 +305,60 @@ class SparkConnectDatabaseMetaData(conn:
SparkConnectConnection) extends Databas
new SparkConnectResultSet(df.collectResult())
}
- override def getSchemas: ResultSet =
- throw new SQLFeatureNotSupportedException
+ override def getSchemas: ResultSet = {
+ conn.checkOpen()
- override def getSchemas(catalog: String, schemaPattern: String): ResultSet =
- throw new SQLFeatureNotSupportedException
+ getSchemas(null, null)
+ }
+
+ // Schema of the returned DataFrame is:
+ // |-- TABLE_SCHEM: string (nullable = false)
+ // |-- TABLE_CATALOG: string (nullable = false)
+ private def getSchemasDataFrame(
+ catalog: String, schemaPattern: String): connect.DataFrame = {
+
+ val schemaFilterExpr = if (isNullOrWildcard(schemaPattern)) {
+ lit(true)
+ } else {
+ $"TABLE_SCHEM".like(schemaPattern)
+ }
+
+ def internalGetSchemas(
+ catalogOpt: Option[String],
+ schemaFilterExpr: Column): connect.DataFrame = {
+ val catalog = catalogOpt.getOrElse(conn.getCatalog)
+ // Spark SQL supports LIKE clause in SHOW SCHEMAS command, but we can't
use that
+ // because the LIKE pattern does not follow SQL standard.
+ conn.spark.sql(s"SHOW SCHEMAS IN ${quoteIdentifier(catalog)}")
+ .select($"namespace".as("TABLE_SCHEM"))
+ .filter(schemaFilterExpr)
+ .withColumn("TABLE_CATALOG", lit(catalog))
+ }
+
+ if (catalog == null) {
+ // search in all catalogs
+ val emptyDf = conn.spark.emptyDataFrame
+ .withColumn("TABLE_SCHEM", lit(""))
+ .withColumn("TABLE_CATALOG", lit(""))
+ conn.spark.catalog.listCatalogs().collect().map(_.name).map { c =>
+ internalGetSchemas(Some(c), schemaFilterExpr)
+ }.fold(emptyDf) { (l, r) => l.unionAll(r) }
+ } else if (catalog == "") {
+ // search only in current catalog
+ internalGetSchemas(None, schemaFilterExpr)
+ } else {
+ // search in the specific catalog
+ internalGetSchemas(Some(catalog), schemaFilterExpr)
+ }
+ }
+
+ override def getSchemas(catalog: String, schemaPattern: String): ResultSet =
{
+ conn.checkOpen()
+
+ val df = getSchemasDataFrame(catalog, schemaPattern)
+ .orderBy("TABLE_CATALOG", "TABLE_SCHEM")
+ new SparkConnectResultSet(df.collectResult())
+ }
override def getTableTypes: ResultSet =
throw new SQLFeatureNotSupportedException
diff --git
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
index 42596b56f4c5..c3d891bc38c3 100644
---
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
+++
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
@@ -69,6 +69,7 @@ class SparkConnectDatabaseMetaDataSuite extends
ConnectFunSuite with RemoteSpark
assert(metadata.storesLowerCaseQuotedIdentifiers === false)
assert(metadata.storesMixedCaseQuotedIdentifiers === false)
assert(metadata.getIdentifierQuoteString === "`")
+ assert(metadata.getSearchStringEscape === "\\")
assert(metadata.getExtraNameCharacters === "")
assert(metadata.supportsAlterTableWithAddColumn === true)
assert(metadata.supportsAlterTableWithDropColumn === true)
@@ -235,4 +236,111 @@ class SparkConnectDatabaseMetaDataSuite extends
ConnectFunSuite with RemoteSpark
}
}
}
+
+ test("SparkConnectDatabaseMetaData getSchemas") {
+
+ def verifyGetSchemas(
+ getSchemas: () => ResultSet)(verify: Seq[(String, String)] => Unit):
Unit = {
+ Using.resource(getSchemas()) { rs =>
+ val catalogDatabases = new Iterator[(String, String)] {
+ def hasNext: Boolean = rs.next()
+ def next(): (String, String) =
+ (rs.getString("TABLE_CATALOG"), rs.getString("TABLE_SCHEM"))
+ }.toSeq
+ verify(catalogDatabases)
+ }
+ }
+
+ withConnection { conn =>
+ implicit val spark: SparkSession =
conn.asInstanceOf[SparkConnectConnection].spark
+
+ registerCatalog("test`cat", TEST_IN_MEMORY_CATALOG)
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS `test``cat`.t_db1")
+ spark.sql("CREATE DATABASE IF NOT EXISTS `test``cat`.t_db2")
+ spark.sql("CREATE DATABASE IF NOT EXISTS `test``cat`.t_db_")
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db1")
+ spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db2")
+ spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.test_db3")
+
+ val metadata = conn.getMetaData
+
+ // no need to care about "test`cat" because it is memory based and
session isolated,
+ // also is inaccessible from another SparkSession
+ withDatabase("spark_catalog.db1", "spark_catalog.db2",
"spark_catalog.test_db3") {
+ // list schemas in all catalogs
+ val getSchemasInAllCatalogs = (() => metadata.getSchemas) ::
+ List(null, "%").map { database => () => metadata.getSchemas(null,
database) } ::: Nil
+
+ getSchemasInAllCatalogs.foreach { getSchemas =>
+ verifyGetSchemas(getSchemas) { catalogDatabases =>
+ // results are ordered by TABLE_CATALOG, TABLE_SCHEM
+ assert {
+ catalogDatabases === Seq(
+ ("spark_catalog", "db1"),
+ ("spark_catalog", "db2"),
+ ("spark_catalog", "default"),
+ ("spark_catalog", "test_db3"),
+ ("test`cat", "t_db1"),
+ ("test`cat", "t_db2"),
+ ("test`cat", "t_db_"))
+ }
+ }
+ }
+
+ // list schemas in current catalog
+ assert(conn.getCatalog === "spark_catalog")
+ val getSchemasInCurrentCatalog =
+ List(null, "%").map { database => () => metadata.getSchemas("",
database) }
+ getSchemasInCurrentCatalog.foreach { getSchemas =>
+ verifyGetSchemas(getSchemas) { catalogDatabases =>
+ // results are ordered by TABLE_CATALOG, TABLE_SCHEM
+ assert {
+ catalogDatabases === Seq(
+ ("spark_catalog", "db1"),
+ ("spark_catalog", "db2"),
+ ("spark_catalog", "default"),
+ ("spark_catalog", "test_db3"))
+ }
+ }
+ }
+
+ // list schemas with schema pattern
+ verifyGetSchemas { () => metadata.getSchemas(null, "db%") } {
catalogDatabases =>
+ // results are ordered by TABLE_CATALOG, TABLE_SCHEM
+ assert {
+ catalogDatabases === Seq(
+ ("spark_catalog", "db1"),
+ ("spark_catalog", "db2"))
+ }
+ }
+
+ verifyGetSchemas { () => metadata.getSchemas(null, "db_") } {
catalogDatabases =>
+ // results are ordered by TABLE_CATALOG, TABLE_SCHEM
+ assert {
+ catalogDatabases === Seq(
+ ("spark_catalog", "db1"),
+ ("spark_catalog", "db2"))
+ }
+ }
+
+ // escape backtick in catalog, and _ in schema pattern
+ verifyGetSchemas {
+ () => metadata.getSchemas("test`cat", "t\\_db\\_")
+ } { catalogDatabases =>
+ assert(catalogDatabases === Seq(("test`cat", "t_db_")))
+ }
+
+ // skip testing escape ', % in schema pattern, because Spark SQL does
not
+ // allow using those chars in schema table name.
+ //
+ // CREATE DATABASE IF NOT EXISTS `t_db1'`;
+ //
+ // the above SQL fails with error condition:
+ // [INVALID_SCHEMA_OR_RELATION_NAME] `t_db1'` is not a valid name
for tables/schemas.
+ // Valid names only contain alphabet characters, numbers and _.
SQLSTATE: 42602
+ }
+ }
+ }
}
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/SQLHelper.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/SQLHelper.scala
index b8d1062c3b3b..731550363fc0 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/SQLHelper.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/SQLHelper.scala
@@ -142,4 +142,16 @@ trait SQLHelper {
spark.sql(s"DROP VIEW IF EXISTS $name")
})
}
+
+ /**
+ * Drops database `dbName` after calling `f`.
+ */
+ protected def withDatabase(dbNames: String*)(f: => Unit): Unit = {
+ SparkErrorUtils.tryWithSafeFinally(f) {
+ dbNames.foreach { name =>
+ spark.sql(s"DROP DATABASE IF EXISTS $name CASCADE")
+ }
+ spark.sql(s"USE default")
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]