This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cc6d6f17bdee [SPARK-49519][SQL] Merge options of table and relation 
when constructing FileScanBuilder
cc6d6f17bdee is described below

commit cc6d6f17bdee8e856c333a177356e626bb8d9a73
Author: joey.ljy <[email protected]>
AuthorDate: Wed Sep 11 07:42:02 2024 -0700

    [SPARK-49519][SQL] Merge options of table and relation when constructing 
FileScanBuilder
    
    ### What changes were proposed in this pull request?
    Merge the options of both `DataSourceV2Relation` and `FileTable` when 
constructing `FileScanBuilder`.
    
    ### Why are the changes needed?
    Currently, the subclass of `FileTable` only uses the options from relation 
when constructing the `FileScanBuilder`, which leads to the omission of the 
contents in `FileTable.options`. For the `TableCatalog`, the `dsOptions` can be 
set into the `FileTable.options` returned by the `TableCatalog.loadTable` 
method. If only the relation options are used here, the `TableCatalog` will not 
be able to pass `dsOptions` that contains table options to `FileScan`.
    
    Merge the two options is a better option.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #47996 from liujiayi771/csv-options.
    
    Lead-authored-by: joey.ljy <[email protected]>
    Co-authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/sql/v2/avro/AvroTable.scala   |  2 +-
 .../sql/execution/datasources/v2/FileTable.scala   | 13 +++++++
 .../execution/datasources/v2/csv/CSVTable.scala    |  2 +-
 .../execution/datasources/v2/json/JsonTable.scala  |  2 +-
 .../execution/datasources/v2/orc/OrcTable.scala    |  2 +-
 .../datasources/v2/parquet/ParquetTable.scala      |  2 +-
 .../execution/datasources/v2/text/TextTable.scala  |  2 +-
 .../execution/datasources/v2/FileTableSuite.scala  | 43 ++++++++++++++++++++++
 8 files changed, 62 insertions(+), 6 deletions(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
index fe61fe3db878..8ec711b2757f 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
@@ -37,7 +37,7 @@ case class AvroTable(
     fallbackFileFormat: Class[_ <: FileFormat])
   extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
   override def newScanBuilder(options: CaseInsensitiveStringMap): 
AvroScanBuilder =
-    new AvroScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+    AvroScanBuilder(sparkSession, fileIndex, schema, dataSchema, 
mergedOptions(options))
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
     AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index f56f9436d943..4eee731e0b2d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -146,6 +146,19 @@ abstract class FileTable(
     val entry = options.get(DataSource.GLOB_PATHS_KEY)
     Option(entry).map(_ == "true").getOrElse(true)
   }
+
+  /**
+   * Merge the options of FileTable and the table operation while respecting 
the
+   * keys of the table operation.
+   *
+   * @param options The options of the table operation.
+   * @return
+   */
+  protected def mergedOptions(options: CaseInsensitiveStringMap): 
CaseInsensitiveStringMap = {
+    val finalOptions = this.options.asCaseSensitiveMap().asScala ++
+      options.asCaseSensitiveMap().asScala
+    new CaseInsensitiveStringMap(finalOptions.asJava)
+  }
 }
 
 object FileTable {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
index 8b4fd3af6ded..4c201ca66cf6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
@@ -38,7 +38,7 @@ case class CSVTable(
     fallbackFileFormat: Class[_ <: FileFormat])
   extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
   override def newScanBuilder(options: CaseInsensitiveStringMap): 
CSVScanBuilder =
-    CSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+    CSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, 
mergedOptions(options))
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     val parsedOptions = new CSVOptions(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
index c567e87e7d76..54244c4d95e7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
@@ -38,7 +38,7 @@ case class JsonTable(
     fallbackFileFormat: Class[_ <: FileFormat])
   extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
   override def newScanBuilder(options: CaseInsensitiveStringMap): 
JsonScanBuilder =
-    new JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+    JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema, 
mergedOptions(options))
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     val parsedOptions = new JSONOptionsInRead(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
index ca4b83b3c58f..1037370967c8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
@@ -38,7 +38,7 @@ case class OrcTable(
   extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): 
OrcScanBuilder =
-    new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+    OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, 
mergedOptions(options))
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
     OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
index e593ad7d0c0c..8463a05569c0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
@@ -38,7 +38,7 @@ case class ParquetTable(
   extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): 
ParquetScanBuilder =
-    new ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema, 
options)
+    ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema, 
mergedOptions(options))
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
     ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
index 046bdcb69846..87ae34532f88 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
@@ -34,7 +34,7 @@ case class TextTable(
     fallbackFileFormat: Class[_ <: FileFormat])
   extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
   override def newScanBuilder(options: CaseInsensitiveStringMap): 
TextScanBuilder =
-    TextScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+    TextScanBuilder(sparkSession, fileIndex, schema, dataSchema, 
mergedOptions(options))
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
     Some(StructType(Array(StructField("value", StringType))))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
index 4160516deece..0316f09e42ce 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
@@ -23,8 +23,15 @@ import org.apache.hadoop.fs.FileStatus
 import org.apache.spark.sql.{QueryTest, SparkSession}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.execution.datasources.v2.csv.CSVScanBuilder
+import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
+import org.apache.spark.sql.execution.datasources.v2.text.TextScanBuilder
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -53,6 +60,8 @@ class DummyFileTable(
 
 class FileTableSuite extends QueryTest with SharedSparkSession {
 
+  private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", 
"text")
+
   test("Data type validation should check data schema only") {
     withTempPath { dir =>
       val df = spark.createDataFrame(Seq(("a", 1), ("b", 2))).toDF("v", "p")
@@ -85,4 +94,38 @@ class FileTableSuite extends QueryTest with 
SharedSparkSession {
       assert(table.dataSchema == expectedDataSchema)
     }
   }
+
+  allFileBasedDataSources.foreach { format =>
+    test(s"SPARK-49519: Merge options of table and relation when constructing 
FileScanBuilder" +
+      s" - $format") {
+      withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+        val userSpecifiedSchema = StructType(Seq(StructField("c1", 
StringType)))
+
+        DataSource.lookupDataSourceV2(format, spark.sessionState.conf) match {
+          case Some(provider) =>
+            val dsOptions = new CaseInsensitiveStringMap(
+              Map("k1" -> "v1", "k2" -> "ds_v2").asJava)
+            val table = provider.getTable(
+              userSpecifiedSchema,
+              Array.empty,
+              dsOptions.asCaseSensitiveMap())
+            val tableOptions = new CaseInsensitiveStringMap(
+              Map("k2" -> "table_v2", "k3" -> "v3").asJava)
+            val mergedOptions = 
table.asInstanceOf[FileTable].newScanBuilder(tableOptions) match {
+              case csv: CSVScanBuilder => csv.options
+              case json: JsonScanBuilder => json.options
+              case orc: OrcScanBuilder => orc.options
+              case parquet: ParquetScanBuilder => parquet.options
+              case text: TextScanBuilder => text.options
+            }
+            assert(mergedOptions.size() == 3)
+            assert("v1".equals(mergedOptions.get("k1")))
+            assert("table_v2".equals(mergedOptions.get("k2")))
+            assert("v3".equals(mergedOptions.get("k3")))
+          case _ =>
+            throw new IllegalArgumentException(s"Failed to get table provider 
for $format")
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to