This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 681542b5644a [SPARK-54537][CONNECT] Fix SparkConnectDatabaseMetaData
getSchemas/getTables on catalogs missing namespace capbalitity
681542b5644a is described below
commit 681542b5644ab53445dba3188d9f9c451aeb43ba
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Nov 27 16:47:33 2025 -0800
[SPARK-54537][CONNECT] Fix SparkConnectDatabaseMetaData
getSchemas/getTables on catalogs missing namespace capbalitity
### What changes were proposed in this pull request?
For DSv2 catalog implementation that does not mix in the interface
`SupportsNamespaces`, `SHOW SCHEMAS IN foo_catalog` fails with
`MISSING_CATALOG_ABILITY.NAMESPACES`, we should catch this and return empty
result instead of failing on `SparkConnectDatabaseMetaData#getSchemas|getTables`
### Why are the changes needed?
Fix a bug in the Connect JDBC dirver.
### Does this PR introduce _any_ user-facing change?
No, Connect JDBC dirver is an unreleased feature.
### How was this patch tested?
UT is adjusted to cover the changes.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53246 from pan3793/SPARK-54537.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 0a76a2138286b9b54fbde2a71a30e7d387c823d2)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../client/jdbc/SparkConnectDatabaseMetaData.scala | 28 +++++++++++++---------
.../jdbc/SparkConnectDatabaseMetaDataSuite.scala | 15 ++++++++++++
2 files changed, 32 insertions(+), 11 deletions(-)
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
index 490fb7b6472e..7a37c272daf2 100644
---
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
@@ -326,23 +326,29 @@ class SparkConnectDatabaseMetaData(conn:
SparkConnectConnection) extends Databas
case Some(schemaPattern) => $"TABLE_SCHEM".like(schemaPattern)
}
+ lazy val emptyDf = conn.spark.emptyDataFrame
+ .withColumn("TABLE_SCHEM", lit(""))
+ .withColumn("TABLE_CATALOG", lit(""))
+
def internalGetSchemas(
catalogOpt: Option[String],
schemaFilterExpr: Column): connect.DataFrame = {
val catalog = catalogOpt.getOrElse(conn.getCatalog)
- // Spark SQL supports LIKE clause in SHOW SCHEMAS command, but we can't
use that
- // because the LIKE pattern does not follow SQL standard.
- conn.spark.sql(s"SHOW SCHEMAS IN ${quoteIdentifier(catalog)}")
- .select($"namespace".as("TABLE_SCHEM"))
- .filter(schemaFilterExpr)
- .withColumn("TABLE_CATALOG", lit(catalog))
+ try {
+ // Spark SQL supports LIKE clause in SHOW SCHEMAS command, but we
can't use that
+ // because the LIKE pattern does not follow SQL standard.
+ conn.spark.sql(s"SHOW SCHEMAS IN ${quoteIdentifier(catalog)}")
+ .select($"namespace".as("TABLE_SCHEM"))
+ .filter(schemaFilterExpr)
+ .withColumn("TABLE_CATALOG", lit(catalog))
+ } catch {
+ case st: SparkThrowable if st.getCondition ==
"MISSING_CATALOG_ABILITY.NAMESPACES" =>
+ emptyDf
+ }
}
if (catalog == null) {
// search in all catalogs
- val emptyDf = conn.spark.emptyDataFrame
- .withColumn("TABLE_SCHEM", lit(""))
- .withColumn("TABLE_CATALOG", lit(""))
conn.spark.catalog.listCatalogs().collect().map(_.name).map { c =>
internalGetSchemas(Some(c), schemaFilterExpr)
}.fold(emptyDf) { (l, r) => l.unionAll(r) }
@@ -402,7 +408,7 @@ class SparkConnectDatabaseMetaData(conn:
SparkConnectConnection) extends Databas
$"TABLE_NAME".like(tableNamePattern)
}
- val emptyDf = conn.spark.emptyDataFrame
+ lazy val emptyDf = conn.spark.emptyDataFrame
.withColumn("TABLE_CAT", lit(""))
.withColumn("TABLE_SCHEM", lit(""))
.withColumn("TABLE_NAME", lit(""))
@@ -490,7 +496,7 @@ class SparkConnectDatabaseMetaData(conn:
SparkConnectConnection) extends Databas
$"COLUMN_NAME".like(columnNamePattern)
}
- val emptyDf = conn.spark.emptyDataFrame
+ lazy val emptyDf = conn.spark.emptyDataFrame
.withColumn("TABLE_CAT", lit(""))
.withColumn("TABLE_SCHEM", lit(""))
.withColumn("TABLE_NAME", lit(""))
diff --git
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
index 255537af4bbb..4d66392109e7 100644
---
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
+++
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
@@ -34,6 +34,8 @@ class SparkConnectDatabaseMetaDataSuite extends
ConnectFunSuite with RemoteSpark
// catalyst test jar is inaccessible here, but presents at the testing
connect server classpath
private val TEST_IN_MEMORY_CATALOG =
"org.apache.spark.sql.connector.catalog.InMemoryCatalog"
+ private val TEST_BASIC_IN_MEMORY_CATALOG =
+ "org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog"
private def registerCatalog(
name: String, className: String)(implicit spark: SparkSession): Unit = {
@@ -254,6 +256,12 @@ class SparkConnectDatabaseMetaDataSuite extends
ConnectFunSuite with RemoteSpark
withConnection { conn =>
implicit val spark: SparkSession =
conn.asInstanceOf[SparkConnectConnection].spark
+ // this catalog does not support namespace
+ registerCatalog("test_noop", TEST_BASIC_IN_MEMORY_CATALOG)
+ // Spark loads catalog plugins lazily, we must initialize it first,
+ // otherwise it won't be listed by SHOW CATALOGS
+ conn.setCatalog("test_noop")
+
registerCatalog("test`cat", TEST_IN_MEMORY_CATALOG)
spark.sql("CREATE DATABASE IF NOT EXISTS `test``cat`.t_db1")
@@ -290,6 +298,7 @@ class SparkConnectDatabaseMetaDataSuite extends
ConnectFunSuite with RemoteSpark
}
// list schemas in current catalog
+ conn.setCatalog("spark_catalog")
assert(conn.getCatalog === "spark_catalog")
val getSchemasInCurrentCatalog =
List(null, "%").map { database => () => metadata.getSchemas("",
database) }
@@ -405,6 +414,12 @@ class SparkConnectDatabaseMetaDataSuite extends
ConnectFunSuite with RemoteSpark
withConnection { conn =>
implicit val spark: SparkSession =
conn.asInstanceOf[SparkConnectConnection].spark
+ // this catalog does not support namespace
+ registerCatalog("test_noop", TEST_BASIC_IN_MEMORY_CATALOG)
+ // Spark loads catalog plugins lazily, we must initialize it first,
+ // otherwise it won't be listed by SHOW CATALOGS
+ conn.setCatalog("test_noop")
+
// this catalog does not support view
registerCatalog("testcat", TEST_IN_MEMORY_CATALOG)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]