This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1793a20dbdcb [SPARK-50700][SQL] `spark.sql.catalog.spark_catalog`
supports `builtin` magic value
1793a20dbdcb is described below
commit 1793a20dbdcb10eab4fe2eecbba4ebc5258aa637
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jan 6 11:28:51 2025 +0800
[SPARK-50700][SQL] `spark.sql.catalog.spark_catalog` supports `builtin`
magic value
### What changes were proposed in this pull request?
This PR adds a magic value `builtin`(and sets it to the default value) for
`spark.sql.catalog.spark_catalog`.
### Why are the changes needed?
Currently, `spark.sql.catalog.spark_catalog` is optional and has `None` as
the default value. When `spark.sql.catalog.spark_catalog=a.bad.catalog.impl` is
wrongly set in `spark-defaults.conf`, the user has no way to overwrite it in
`spark-submit`.
Note that, explicitly setting it to
`o.a.s.sql.execution.datasources.v2.V2SessionCatalog` does not work either,
because `V2SessionCatalog` does not have a zero-args constructor.
```
spark-submit \
--conf
spark.sql.catalog.spark_catalog=org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
\
...
```
To fix the above issue, similar to what we did for
`spark.sql.hive.metastore.jars`, just use "builtin" to represent the built-in
`V2SessionCatalog`.
### Does this PR introduce _any_ user-facing change?
No change for default behavior, and users are allowed to use
`spark.sql.catalog.spark_catalog=builtin` to set `spark_catalog` as the
built-in `V2SessionCatalog`.
### How was this patch tested?
Code in UTs like
```
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
```
are replaced with
```
spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49332 from pan3793/SPARK-50700.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../sql/connector/catalog/CatalogManager.scala | 7 ++--
.../org/apache/spark/sql/internal/SQLConf.scala | 10 ++++--
.../catalyst/analysis/ResolveSessionCatalog.scala | 2 +-
.../datasources/v2/V2SessionCatalog.scala | 2 +-
.../spark/sql/internal/DataFrameWriterImpl.scala | 8 +++--
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 42 +++++++++++-----------
.../spark/sql/connector/DeleteFromTests.scala | 4 +--
7 files changed, 43 insertions(+), 32 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index db94659b1033..9b8584604d32 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -85,9 +85,10 @@ class CatalogManager(
* in the fallback configuration, spark.sql.sources.useV1SourceList
*/
private[sql] def v2SessionCatalog: CatalogPlugin = {
- conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { _ =>
- catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())
- }.getOrElse(defaultSessionCatalog)
+ conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) match {
+ case "builtin" => defaultSessionCatalog
+ case _ => catalogs.getOrElseUpdate(SESSION_CATALOG_NAME,
loadV2SessionCatalog())
+ }
}
private var _currentNamespace: Option[Array[String]] = None
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6a45380b7a99..7bc4051b45d4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4404,10 +4404,16 @@ object SQLConf {
s"the $SESSION_CATALOG_NAME and must be consistent with it; for
example, if a table can " +
s"be loaded by the $SESSION_CATALOG_NAME, this catalog must also
return the table " +
s"metadata. To delegate operations to the $SESSION_CATALOG_NAME,
implementations can " +
- "extend 'CatalogExtension'.")
+ "extend 'CatalogExtension'. The value should be either 'builtin' which
represents the " +
+ "spark's builit-in V2SessionCatalog, or a fully qualified class name
of the catalog " +
+ "implementation.")
.version("3.0.0")
.stringConf
- .createOptional
+ .transform {
+ case builtin if builtin.equalsIgnoreCase("builtin") => "builtin"
+ case fullClassName => fullClassName
+ }
+ .createWithDefault("builtin")
object MapKeyDedupPolicy extends Enumeration {
val EXCEPTION, LAST_WIN = Value
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index fa28a2cb9ead..87ea3071f490 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -726,7 +726,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
isSessionCatalog(catalog) && (
- SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty ||
+ SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) ==
"builtin" ||
catalog.isInstanceOf[CatalogExtension])
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index e9927cdcc7a3..0a533645648e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -83,7 +83,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
}
private def hasCustomSessionCatalog: Boolean = {
- catalog.conf.contains(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ catalog.conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) !=
"builtin"
}
override def loadTable(ident: Identifier): Table = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
index 0069062e6307..5a96db5e34bb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
@@ -383,6 +383,11 @@ final class DataFrameWriterImpl[T] private[sql](ds:
Dataset[T]) extends DataFram
}
}
+ private def hasCustomSessionCatalog: Boolean = {
+ df.sparkSession.sessionState.conf
+ .getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) != "builtin"
+ }
+
/**
* Saves the content of the `DataFrame` as the specified table.
*
@@ -426,8 +431,7 @@ final class DataFrameWriterImpl[T] private[sql](ds:
Dataset[T]) extends DataFram
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val session = df.sparkSession
- val canUseV2 = lookupV2Provider().isDefined ||
(df.sparkSession.sessionState.conf.getConf(
- SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined &&
+ val canUseV2 = lookupV2Provider().isDefined || (hasCustomSessionCatalog &&
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
.isInstanceOf[CatalogExtension])
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 87d0a1ff4e7b..8d255e9efda5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -278,8 +278,8 @@ class DataSourceV2SQLSuiteV1Filter
test("CreateTable: without USING clause") {
withSQLConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key -> "false") {
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ // use the default v2 session catalog.
+ spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
val testCatalog = catalog("testcat").asTableCatalog
sql("CREATE TABLE testcat.t1 (id int)")
@@ -785,8 +785,8 @@ class DataSourceV2SQLSuiteV1Filter
}
test("CreateTableAsSelect: v2 session catalog can load v1 source table") {
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ // use the default v2 session catalog.
+ spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L,
"c"))).toDF("id", "data")
df.createOrReplaceTempView("source")
@@ -846,8 +846,8 @@ class DataSourceV2SQLSuiteV1Filter
// TODO: ignored by SPARK-31707, restore the test after create table syntax
unification
ignore("CreateTableAsSelect: without USING clause") {
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ // use the default v2 session catalog.
+ spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
val testCatalog = catalog("testcat").asTableCatalog
sql("CREATE TABLE testcat.t1 AS SELECT 1 i")
@@ -1086,11 +1086,11 @@ class DataSourceV2SQLSuiteV1Filter
Seq(true, false).foreach { useV1Table =>
val format = if (useV1Table) "json" else v2Format
if (useV1Table) {
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ // use the default v2 session catalog.
+ spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
} else {
spark.conf.set(
- V2_SESSION_CATALOG_IMPLEMENTATION.key,
classOf[InMemoryTableSessionCatalog].getName)
+ V2_SESSION_CATALOG_IMPLEMENTATION,
classOf[InMemoryTableSessionCatalog].getName)
}
withTable("t") {
@@ -1815,8 +1815,8 @@ class DataSourceV2SQLSuiteV1Filter
}
test("SPARK-46972: asymmetrical replacement for char/varchar in
V2SessionCatalog.createTable") {
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ // use the default v2 session catalog.
+ spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
withTable("t") {
sql(s"CREATE TABLE t(c char(1), v varchar(2)) USING $v2Source")
}
@@ -2533,8 +2533,8 @@ class DataSourceV2SQLSuiteV1Filter
}
test("SPARK-30001: session catalog name can be specified in SQL statements")
{
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ // use the default v2 session catalog.
+ spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
withTable("t") {
sql("CREATE TABLE t USING json AS SELECT 1 AS i")
@@ -2598,8 +2598,8 @@ class DataSourceV2SQLSuiteV1Filter
}
test("SPARK-30094: current namespace is used during table resolution") {
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ // use the default v2 session catalog.
+ spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
withTable("spark_catalog.default.t", "testcat.ns.t") {
sql("CREATE TABLE t USING parquet AS SELECT 1")
@@ -2613,8 +2613,8 @@ class DataSourceV2SQLSuiteV1Filter
}
test("SPARK-30284: CREATE VIEW should track the current catalog and
namespace") {
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ // use the default v2 session catalog.
+ spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
val sessionCatalogName = CatalogManager.SESSION_CATALOG_NAME
sql("CREATE NAMESPACE testcat.ns1.ns2")
@@ -2651,8 +2651,8 @@ class DataSourceV2SQLSuiteV1Filter
}
test("COMMENT ON NAMESPACE") {
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ // use the default v2 session catalog.
+ spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
// Session catalog is used.
sql("CREATE NAMESPACE ns")
checkNamespaceComment("ns", "minor revision")
@@ -2685,8 +2685,8 @@ class DataSourceV2SQLSuiteV1Filter
}
test("COMMENT ON TABLE") {
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ // use the default v2 session catalog.
+ spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
// Session catalog is used.
withTable("t") {
sql("CREATE TABLE t(k int) USING json")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala
index fd022580db42..26f64ceb33fe 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala
@@ -100,8 +100,8 @@ trait DeleteFromTests extends DatasourceV2SQLBase {
}
test("DeleteFrom: DELETE is only supported with v2 tables") {
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ // use the default v2 session catalog.
+ spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
val v1Table = "tbl"
withTable(v1Table) {
sql(s"CREATE TABLE $v1Table" +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]