This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1793a20dbdcb [SPARK-50700][SQL] `spark.sql.catalog.spark_catalog` 
supports `builtin` magic value
1793a20dbdcb is described below

commit 1793a20dbdcb10eab4fe2eecbba4ebc5258aa637
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jan 6 11:28:51 2025 +0800

    [SPARK-50700][SQL] `spark.sql.catalog.spark_catalog` supports `builtin` 
magic value
    
    ### What changes were proposed in this pull request?
    
    This PR adds a magic value `builtin`(and sets it to the default value) for 
`spark.sql.catalog.spark_catalog`.
    
    ### Why are the changes needed?
    
    Currently, `spark.sql.catalog.spark_catalog` is optional and has `None` as 
the default value. When `spark.sql.catalog.spark_catalog=a.bad.catalog.impl` is 
wrongly set in `spark-defaults.conf`, the user has no way to overwrite it in 
`spark-submit`.
    
    Note that, explicitly setting it to 
`o.a.s.sql.execution.datasources.v2.V2SessionCatalog` does not work either, 
because `V2SessionCatalog` does not have a zero-args constructor.
    ```
    spark-submit \
      --conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
 \
      ...
    ```
    
    To fix the above issue, similar to what we did for 
`spark.sql.hive.metastore.jars`, just use "builtin" to represent the built-in 
`V2SessionCatalog`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No change for default behavior, and users are allowed to use  
`spark.sql.catalog.spark_catalog=builtin` to set `spark_catalog` as the 
built-in `V2SessionCatalog`.
    
    ### How was this patch tested?
    
    Code in UTs like
    ```
    // unset this config to use the default v2 session catalog.
    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
    ```
    are replaced with
    ```
    spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
    ```
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49332 from pan3793/SPARK-50700.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../sql/connector/catalog/CatalogManager.scala     |  7 ++--
 .../org/apache/spark/sql/internal/SQLConf.scala    | 10 ++++--
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  2 +-
 .../datasources/v2/V2SessionCatalog.scala          |  2 +-
 .../spark/sql/internal/DataFrameWriterImpl.scala   |  8 +++--
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 42 +++++++++++-----------
 .../spark/sql/connector/DeleteFromTests.scala      |  4 +--
 7 files changed, 43 insertions(+), 32 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index db94659b1033..9b8584604d32 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -85,9 +85,10 @@ class CatalogManager(
    * in the fallback configuration, spark.sql.sources.useV1SourceList
    */
   private[sql] def v2SessionCatalog: CatalogPlugin = {
-    conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { _ =>
-      catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())
-    }.getOrElse(defaultSessionCatalog)
+    conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) match {
+      case "builtin" => defaultSessionCatalog
+      case _ => catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, 
loadV2SessionCatalog())
+    }
   }
 
   private var _currentNamespace: Option[Array[String]] = None
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6a45380b7a99..7bc4051b45d4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4404,10 +4404,16 @@ object SQLConf {
         s"the $SESSION_CATALOG_NAME and must be consistent with it; for 
example, if a table can " +
         s"be loaded by the $SESSION_CATALOG_NAME, this catalog must also 
return the table " +
         s"metadata. To delegate operations to the $SESSION_CATALOG_NAME, 
implementations can " +
-        "extend 'CatalogExtension'.")
+        "extend 'CatalogExtension'. The value should be either 'builtin' which 
represents the " +
+        "spark's builit-in V2SessionCatalog, or a fully qualified class name 
of the catalog " +
+        "implementation.")
       .version("3.0.0")
       .stringConf
-      .createOptional
+      .transform {
+        case builtin if builtin.equalsIgnoreCase("builtin") => "builtin"
+        case fullClassName => fullClassName
+      }
+      .createWithDefault("builtin")
 
   object MapKeyDedupPolicy extends Enumeration {
     val EXCEPTION, LAST_WIN = Value
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index fa28a2cb9ead..87ea3071f490 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -726,7 +726,7 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
 
   private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
     isSessionCatalog(catalog) && (
-      SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty ||
+      SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) == 
"builtin" ||
         catalog.isInstanceOf[CatalogExtension])
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index e9927cdcc7a3..0a533645648e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -83,7 +83,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
   }
 
   private def hasCustomSessionCatalog: Boolean = {
-    catalog.conf.contains(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    catalog.conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) != 
"builtin"
   }
 
   override def loadTable(ident: Identifier): Table = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
index 0069062e6307..5a96db5e34bb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
@@ -383,6 +383,11 @@ final class DataFrameWriterImpl[T] private[sql](ds: 
Dataset[T]) extends DataFram
     }
   }
 
+  private def hasCustomSessionCatalog: Boolean = {
+    df.sparkSession.sessionState.conf
+      .getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) != "builtin"
+  }
+
   /**
    * Saves the content of the `DataFrame` as the specified table.
    *
@@ -426,8 +431,7 @@ final class DataFrameWriterImpl[T] private[sql](ds: 
Dataset[T]) extends DataFram
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
     val session = df.sparkSession
-    val canUseV2 = lookupV2Provider().isDefined || 
(df.sparkSession.sessionState.conf.getConf(
-        SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined &&
+    val canUseV2 = lookupV2Provider().isDefined || (hasCustomSessionCatalog &&
         
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
           .isInstanceOf[CatalogExtension])
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 87d0a1ff4e7b..8d255e9efda5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -278,8 +278,8 @@ class DataSourceV2SQLSuiteV1Filter
 
   test("CreateTable: without USING clause") {
     withSQLConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key -> "false") {
-      // unset this config to use the default v2 session catalog.
-      spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+      // use the default v2 session catalog.
+      spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
       val testCatalog = catalog("testcat").asTableCatalog
 
       sql("CREATE TABLE testcat.t1 (id int)")
@@ -785,8 +785,8 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("CreateTableAsSelect: v2 session catalog can load v1 source table") {
-    // unset this config to use the default v2 session catalog.
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    // use the default v2 session catalog.
+    spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
 
     val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, 
"c"))).toDF("id", "data")
     df.createOrReplaceTempView("source")
@@ -846,8 +846,8 @@ class DataSourceV2SQLSuiteV1Filter
 
   // TODO: ignored by SPARK-31707, restore the test after create table syntax 
unification
   ignore("CreateTableAsSelect: without USING clause") {
-    // unset this config to use the default v2 session catalog.
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    // use the default v2 session catalog.
+    spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
     val testCatalog = catalog("testcat").asTableCatalog
 
     sql("CREATE TABLE testcat.t1 AS SELECT 1 i")
@@ -1086,11 +1086,11 @@ class DataSourceV2SQLSuiteV1Filter
     Seq(true, false).foreach { useV1Table =>
       val format = if (useV1Table) "json" else v2Format
       if (useV1Table) {
-        // unset this config to use the default v2 session catalog.
-        spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+        // use the default v2 session catalog.
+        spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
       } else {
         spark.conf.set(
-          V2_SESSION_CATALOG_IMPLEMENTATION.key, 
classOf[InMemoryTableSessionCatalog].getName)
+          V2_SESSION_CATALOG_IMPLEMENTATION, 
classOf[InMemoryTableSessionCatalog].getName)
       }
 
       withTable("t") {
@@ -1815,8 +1815,8 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("SPARK-46972: asymmetrical replacement for char/varchar in 
V2SessionCatalog.createTable") {
-    // unset this config to use the default v2 session catalog.
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    // use the default v2 session catalog.
+    spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
     withTable("t") {
       sql(s"CREATE TABLE t(c char(1), v varchar(2)) USING $v2Source")
     }
@@ -2533,8 +2533,8 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("SPARK-30001: session catalog name can be specified in SQL statements") 
{
-    // unset this config to use the default v2 session catalog.
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    // use the default v2 session catalog.
+    spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
 
     withTable("t") {
       sql("CREATE TABLE t USING json AS SELECT 1 AS i")
@@ -2598,8 +2598,8 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("SPARK-30094: current namespace is used during table resolution") {
-    // unset this config to use the default v2 session catalog.
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    // use the default v2 session catalog.
+    spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
 
     withTable("spark_catalog.default.t", "testcat.ns.t") {
       sql("CREATE TABLE t USING parquet AS SELECT 1")
@@ -2613,8 +2613,8 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("SPARK-30284: CREATE VIEW should track the current catalog and 
namespace") {
-    // unset this config to use the default v2 session catalog.
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    // use the default v2 session catalog.
+    spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
     val sessionCatalogName = CatalogManager.SESSION_CATALOG_NAME
 
     sql("CREATE NAMESPACE testcat.ns1.ns2")
@@ -2651,8 +2651,8 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("COMMENT ON NAMESPACE") {
-    // unset this config to use the default v2 session catalog.
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    // use the default v2 session catalog.
+    spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
     // Session catalog is used.
     sql("CREATE NAMESPACE ns")
     checkNamespaceComment("ns", "minor revision")
@@ -2685,8 +2685,8 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("COMMENT ON TABLE") {
-    // unset this config to use the default v2 session catalog.
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    // use the default v2 session catalog.
+    spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
     // Session catalog is used.
     withTable("t") {
       sql("CREATE TABLE t(k int) USING json")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala
index fd022580db42..26f64ceb33fe 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala
@@ -100,8 +100,8 @@ trait DeleteFromTests extends DatasourceV2SQLBase {
   }
 
   test("DeleteFrom: DELETE is only supported with v2 tables") {
-    // unset this config to use the default v2 session catalog.
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+    // use the default v2 session catalog.
+    spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin")
     val v1Table = "tbl"
     withTable(v1Table) {
       sql(s"CREATE TABLE $v1Table" +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to