This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 8149b80a1f5a [SPARK-51747][SQL] Data source cached plan should respect 
options
8149b80a1f5a is described below

commit 8149b80a1f5acb36b305748cbe6f480492d7a742
Author: Amanda Liu <[email protected]>
AuthorDate: Wed Apr 9 20:10:42 2025 -0700

    [SPARK-51747][SQL] Data source cached plan should respect options
    
    ### What changes were proposed in this pull request?
    
    Data source cached plan should respect options, such as CSV delimiter. 
Before this, DataSourceStrategy caches the first plan and reuses it in the 
future, ignoring updated options. This change returns a **new plan** if options 
are changed.
    
    ### Why are the changes needed?
    
    For example:
    
    ```
    spark.sql("CREATE TABLE t(a string, b string) USING CSV".stripMargin)
    spark.sql("INSERT INTO TABLE t VALUES ('a;b', 'c')")
    
    spark.sql("SELECT * FROM t").show()
    spark.sql("SELECT * FROM t WITH ('delimiter' = ';')")
    ```
    
    Expected output:
    
     ```
    +----+----+
    |col1|col2|
    +----+----+
    | a;b|   c|
    +----+----+
    
    +----+----+
    |col1|col2|
    +----+----+
    |   a| b,c|
    +----+----+
     ```
    
    Output before this PR:
    
     ```
    +----+----+
    |col1|col2|
    +----+----+
    | a;b|   c|
    +----+----+
    
    +----+----+
    |col1|col2|
    +----+----+
    | a;b|   c|
    +----+----+
    ```
    
    The PR is needed to get the expected result.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, corrects the caching behavior from DataSourceStrategy
    
    ### How was this patch tested?
    
    Added test in DDLSuite.scala
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50538 from asl3/asl3/datasourcestrategycacheoptions.
    
    Lead-authored-by: Amanda Liu <[email protected]>
    Co-authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
    (cherry picked from commit d2a864f988c792e9c211d012f8aa8815d1142703)
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../execution/datasources/DataSourceStrategy.scala | 44 +++++++++++++++-------
 .../spark/sql/execution/command/DDLSuite.scala     | 29 ++++++++++++++
 2 files changed, 59 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 32decd9c429d..cb744cec103f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -21,6 +21,7 @@ import java.util.Locale
 
 import scala.collection.immutable.ListMap
 import scala.collection.mutable
+import scala.jdk.CollectionConverters._
 
 import org.apache.hadoop.fs.Path
 
@@ -256,20 +257,35 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
       QualifiedTableName(table.identifier.catalog.get, table.database, 
table.identifier.table)
     val catalog = sparkSession.sessionState.catalog
     val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, 
table)
-    catalog.getCachedPlan(qualifiedTableName, () => {
-      val dataSource =
-        DataSource(
-          sparkSession,
-          // In older version(prior to 2.1) of Spark, the table schema can be 
empty and should be
-          // inferred at runtime. We should still support it.
-          userSpecifiedSchema = if (table.schema.isEmpty) None else 
Some(table.schema),
-          partitionColumns = table.partitionColumnNames,
-          bucketSpec = table.bucketSpec,
-          className = table.provider.get,
-          options = dsOptions,
-          catalogTable = Some(table))
-      LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), 
table)
-    })
+    catalog.getCachedTable(qualifiedTableName) match {
+      case null =>
+        val dataSource =
+          DataSource(
+            sparkSession,
+            // In older version(prior to 2.1) of Spark, the table schema can 
be empty and should be
+            // inferred at runtime. We should still support it.
+            userSpecifiedSchema = if (table.schema.isEmpty) None else 
Some(table.schema),
+            partitionColumns = table.partitionColumnNames,
+            bucketSpec = table.bucketSpec,
+            className = table.provider.get,
+            options = dsOptions,
+            catalogTable = Some(table))
+        val plan = LogicalRelation(dataSource.resolveRelation(checkFilesExist 
= false), table)
+        catalog.cacheTable(qualifiedTableName, plan)
+        plan
+
+      // If the cached table relation's options differ from the new options:
+      // 1. Create a new HadoopFsRelation with updated options
+      // 2. Return a new LogicalRelation with the updated HadoopFsRelation
+      // This ensures the relation reflects any changes in data source options
+      case r @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _, _)
+        if new CaseInsensitiveStringMap(fsRelation.options.asJava) !=
+          new CaseInsensitiveStringMap(dsOptions.asJava) =>
+        val newFsRelation = fsRelation.copy(options = dsOptions)(sparkSession)
+        r.copy(relation = newFsRelation)
+
+      case other => other
+    }
   }
 
   private def getStreamingRelation(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 21bdbd40caa8..28dac5e1a250 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1376,6 +1376,35 @@ abstract class DDLSuite extends QueryTest with 
DDLSuiteBase {
     }
   }
 
+  test("SPARK-51747: Data source cached plan should respect options") {
+    withTable("t") {
+      spark.sql("CREATE TABLE t(a string, b string) USING CSV".stripMargin)
+      spark.sql("INSERT INTO TABLE t VALUES ('a;b', 'c')")
+      spark.sql("INSERT INTO TABLE t VALUES ('hello; world', 'test')")
+
+      // check initial contents of table
+      checkAnswer(spark.table("t"), Row("a;b", "c") :: Row("hello; world", 
"test") :: Nil)
+
+      // no option
+      checkAnswer(
+        spark.sql("SELECT * FROM t"),
+        Row("a;b", "c") :: Row("hello; world", "test") :: Nil
+      )
+
+      // respect delimiter option
+      checkAnswer(
+        spark.sql("SELECT * FROM t WITH ('delimiter' = ';')"),
+        Row("a", "b,c") :: Row("hello", " world,test") :: Nil
+      )
+
+      // respect lineSep option
+      checkAnswer(
+        spark.sql("SELECT * FROM t WITH ('lineSep' = ';')"),
+        Row("a", null) :: Row("b", "c\n") :: Row("hello", null) :: Row(" 
world", "test\n") :: Nil
+      )
+    }
+  }
+
   test("SPARK-18009 calling toLocalIterator on commands") {
     import scala.jdk.CollectionConverters._
     val df = sql("show databases")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to