This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cc6d6f17bdee [SPARK-49519][SQL] Merge options of table and relation
when constructing FileScanBuilder
cc6d6f17bdee is described below
commit cc6d6f17bdee8e856c333a177356e626bb8d9a73
Author: joey.ljy <[email protected]>
AuthorDate: Wed Sep 11 07:42:02 2024 -0700
[SPARK-49519][SQL] Merge options of table and relation when constructing
FileScanBuilder
### What changes were proposed in this pull request?
Merge the options of both `DataSourceV2Relation` and `FileTable` when
constructing `FileScanBuilder`.
### Why are the changes needed?
Currently, the subclass of `FileTable` only uses the options from relation
when constructing the `FileScanBuilder`, which leads to the omission of the
contents in `FileTable.options`. For the `TableCatalog`, the `dsOptions` can be
set into the `FileTable.options` returned by the `TableCatalog.loadTable`
method. If only the relation options are used here, the `TableCatalog` will not
be able to pass `dsOptions` that contains table options to `FileScan`.
Merge the two options is a better option.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47996 from liujiayi771/csv-options.
Lead-authored-by: joey.ljy <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/v2/avro/AvroTable.scala | 2 +-
.../sql/execution/datasources/v2/FileTable.scala | 13 +++++++
.../execution/datasources/v2/csv/CSVTable.scala | 2 +-
.../execution/datasources/v2/json/JsonTable.scala | 2 +-
.../execution/datasources/v2/orc/OrcTable.scala | 2 +-
.../datasources/v2/parquet/ParquetTable.scala | 2 +-
.../execution/datasources/v2/text/TextTable.scala | 2 +-
.../execution/datasources/v2/FileTableSuite.scala | 43 ++++++++++++++++++++++
8 files changed, 62 insertions(+), 6 deletions(-)
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
index fe61fe3db878..8ec711b2757f 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
@@ -37,7 +37,7 @@ case class AvroTable(
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap):
AvroScanBuilder =
- new AvroScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+ AvroScanBuilder(sparkSession, fileIndex, schema, dataSchema,
mergedOptions(options))
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index f56f9436d943..4eee731e0b2d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -146,6 +146,19 @@ abstract class FileTable(
val entry = options.get(DataSource.GLOB_PATHS_KEY)
Option(entry).map(_ == "true").getOrElse(true)
}
+
+ /**
+ * Merge the options of FileTable and the table operation while respecting
the
+ * keys of the table operation.
+ *
+ * @param options The options of the table operation.
+ * @return
+ */
+ protected def mergedOptions(options: CaseInsensitiveStringMap):
CaseInsensitiveStringMap = {
+ val finalOptions = this.options.asCaseSensitiveMap().asScala ++
+ options.asCaseSensitiveMap().asScala
+ new CaseInsensitiveStringMap(finalOptions.asJava)
+ }
}
object FileTable {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
index 8b4fd3af6ded..4c201ca66cf6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
@@ -38,7 +38,7 @@ case class CSVTable(
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap):
CSVScanBuilder =
- CSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+ CSVScanBuilder(sparkSession, fileIndex, schema, dataSchema,
mergedOptions(options))
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
val parsedOptions = new CSVOptions(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
index c567e87e7d76..54244c4d95e7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
@@ -38,7 +38,7 @@ case class JsonTable(
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap):
JsonScanBuilder =
- new JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+ JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema,
mergedOptions(options))
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
val parsedOptions = new JSONOptionsInRead(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
index ca4b83b3c58f..1037370967c8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
@@ -38,7 +38,7 @@ case class OrcTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap):
OrcScanBuilder =
- new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+ OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema,
mergedOptions(options))
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
index e593ad7d0c0c..8463a05569c0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
@@ -38,7 +38,7 @@ case class ParquetTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap):
ParquetScanBuilder =
- new ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema,
options)
+ ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema,
mergedOptions(options))
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
index 046bdcb69846..87ae34532f88 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
@@ -34,7 +34,7 @@ case class TextTable(
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap):
TextScanBuilder =
- TextScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+ TextScanBuilder(sparkSession, fileIndex, schema, dataSchema,
mergedOptions(options))
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
Some(StructType(Array(StructField("value", StringType))))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
index 4160516deece..0316f09e42ce 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
@@ -23,8 +23,15 @@ import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.execution.datasources.v2.csv.CSVScanBuilder
+import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
+import org.apache.spark.sql.execution.datasources.v2.text.TextScanBuilder
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -53,6 +60,8 @@ class DummyFileTable(
class FileTableSuite extends QueryTest with SharedSparkSession {
+ private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json",
"text")
+
test("Data type validation should check data schema only") {
withTempPath { dir =>
val df = spark.createDataFrame(Seq(("a", 1), ("b", 2))).toDF("v", "p")
@@ -85,4 +94,38 @@ class FileTableSuite extends QueryTest with
SharedSparkSession {
assert(table.dataSchema == expectedDataSchema)
}
}
+
+ allFileBasedDataSources.foreach { format =>
+ test(s"SPARK-49519: Merge options of table and relation when constructing
FileScanBuilder" +
+ s" - $format") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+ val userSpecifiedSchema = StructType(Seq(StructField("c1",
StringType)))
+
+ DataSource.lookupDataSourceV2(format, spark.sessionState.conf) match {
+ case Some(provider) =>
+ val dsOptions = new CaseInsensitiveStringMap(
+ Map("k1" -> "v1", "k2" -> "ds_v2").asJava)
+ val table = provider.getTable(
+ userSpecifiedSchema,
+ Array.empty,
+ dsOptions.asCaseSensitiveMap())
+ val tableOptions = new CaseInsensitiveStringMap(
+ Map("k2" -> "table_v2", "k3" -> "v3").asJava)
+ val mergedOptions =
table.asInstanceOf[FileTable].newScanBuilder(tableOptions) match {
+ case csv: CSVScanBuilder => csv.options
+ case json: JsonScanBuilder => json.options
+ case orc: OrcScanBuilder => orc.options
+ case parquet: ParquetScanBuilder => parquet.options
+ case text: TextScanBuilder => text.options
+ }
+ assert(mergedOptions.size() == 3)
+ assert("v1".equals(mergedOptions.get("k1")))
+ assert("table_v2".equals(mergedOptions.get("k2")))
+ assert("v3".equals(mergedOptions.get("k3")))
+ case _ =>
+ throw new IllegalArgumentException(s"Failed to get table provider
for $format")
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]