This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 89a2a5323505 [SPARK-52812][CONNECT] Preserve spark.sql.sources.default 
for eager createTable(tableName, path)
89a2a5323505 is described below

commit 89a2a53235058d764e97009f4a28e6a3dc8defa3
Author: haoyangeng-db <[email protected]>
AuthorDate: Sun May 31 13:48:36 2026 +0800

    [SPARK-52812][CONNECT] Preserve spark.sql.sources.default for eager 
createTable(tableName, path)
    
    ### What changes were proposed in this pull request?
    
    SPARK-52812 (#56064) made Spark Connect `Catalog.createTable` eager by 
re-routing the two-argument `createTable(tableName, path)` overload through 
`createTable(tableName, path, "parquet")`. That hardcodes the parquet provider 
and drops the `spark.sql.sources.default` fallback that the overload previously 
relied on.
    
    This PR restores the original behavior: the two-argument overload again 
leaves the source unset so the server resolves `spark.sql.sources.default`, 
while keeping the eager execution introduced by SPARK-52812. A regression test 
is added to `CatalogSuite`.
    
    ### Why are the changes needed?
    
    The two-argument `createTable(tableName, path)` overload is documented as 
"It will use the default data source configured by spark.sql.sources.default." 
After SPARK-52812 it always used parquet regardless of that configuration, 
contradicting its own contract and the classic Catalog behavior.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, within the unreleased master branch. 
`spark.catalog.createTable(tableName, path)` on Spark Connect once again honors 
`spark.sql.sources.default` instead of always creating a parquet table. The 
eager-execution behavior from SPARK-52812 is preserved.
    
    ### How was this patch tested?
    
    Added a regression test in `CatalogSuite` that sets 
`spark.sql.sources.default` to `json`, writes JSON data, creates the table via 
the two-argument overload, and asserts the resulting table uses the json 
provider and is readable. The test fails on the previous hardcoded-parquet 
behavior.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Co-authored with Claude Code.
    
    Closes #56211 from 
haoyangeng-db/spark-52812-followup-createtable-default-source.
    
    Authored-by: haoyangeng-db <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 7fa449ac73688e4497e0b62e1cd953fdbfab9c44)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/sql/connect/CatalogSuite.scala    | 21 +++++++++++++++++
 .../org/apache/spark/sql/connect/Catalog.scala     | 26 ++++++++++++++++++++--
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala
index e8ccc9f083c6..5237554b3625 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala
@@ -163,6 +163,27 @@ class CatalogSuite extends ConnectFunSuite with 
RemoteSparkSession with SQLHelpe
     }
   }
 
+  test("createTable(tableName, path) uses spark.sql.sources.default") {
+    val tableName = "default_source_table"
+    withSQLConf("spark.sql.sources.default" -> "json") {
+      withTable(tableName) {
+        withTempPath { dir =>
+          val session = spark
+          import session.implicits._
+          // Write the data as JSON. If createTable hardcoded the parquet 
provider, reading the
+          // table back would fail because the files are not parquet.
+          Seq((1, "a")).toDF("id", "value").write.json(dir.getPath)
+          spark.catalog.createTable(tableName, dir.getPath)
+          assert(spark.catalog.tableExists(tableName))
+          val ddl = spark.catalog.getCreateTableString(tableName)
+          assert(ddl.toLowerCase(java.util.Locale.ROOT).contains("using json"))
+          // Reading the table back succeeds only if it was created with the 
json provider.
+          assert(spark.table(tableName).count() == 1)
+        }
+      }
+    }
+  }
+
   test("Cache Table APIs") {
     val parquetTableName = "parquet_table"
     withTable(parquetTableName) {
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala
index 2324ca05d7b7..ce7a10c4026c 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala
@@ -392,7 +392,15 @@ class Catalog(sparkSession: SparkSession) extends 
catalog.Catalog {
    * @since 3.5.0
    */
   override def createTable(tableName: String, path: String): DataFrame = {
-    createTable(tableName, path, "parquet")
+    // Leave the source unset so the server resolves 
spark.sql.sources.default, as documented
+    // above. Routing through createTable(tableName, path, "parquet") would 
hardcode the provider
+    // and ignore that configuration.
+    createTable(
+      tableName = tableName,
+      source = None,
+      schema = new StructType,
+      description = "",
+      options = Map("path" -> path))
   }
 
   /**
@@ -478,12 +486,26 @@ class Catalog(sparkSession: SparkSession) extends 
catalog.Catalog {
       schema: StructType,
       description: String,
       options: Map[String, String]): DataFrame = {
+    createTable(tableName, Some(source), schema, description, options)
+  }
+
+  /**
+   * Shared implementation for the public `createTable` overloads. When 
`source` is `None`, the
+   * proto's `source` field is left unset so the server resolves 
`spark.sql.sources.default`;
+   * otherwise the provided source is pinned via `setSource`.
+   */
+  private def createTable(
+      tableName: String,
+      source: Option[String],
+      schema: StructType,
+      description: String,
+      options: Map[String, String]): DataFrame = {
     sparkSession.execute { builder =>
       val createTableBuilder = builder.getCatalogBuilder.getCreateTableBuilder
         .setTableName(tableName)
-        .setSource(source)
         .setSchema(DataTypeProtoConverter.toConnectProtoType(schema))
         .setDescription(description)
+      source.foreach(createTableBuilder.setSource)
       options.foreach { case (k, v) =>
         createTableBuilder.putOptions(k, v)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to