This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5dc65a5416b5 [SPARK-50287][SQL] Merge options of table and relation
when creating WriteBuilder in FileTable
5dc65a5416b5 is described below
commit 5dc65a5416b51b9a41d36881cb919a9d10834511
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Nov 25 21:54:32 2024 +0800
[SPARK-50287][SQL] Merge options of table and relation when creating
WriteBuilder in FileTable
### What changes were proposed in this pull request?
Merge `options` of table and relation when creating WriteBuilder in
FileTable.
### Why are the changes needed?
Similar to SPARK-49519 which fixes the read path.
### Does this PR introduce _any_ user-facing change?
FileTable's options are accounted on the V2 write path now, but given the
built-in file formats use V1 by default, it has no real effect.
### How was this patch tested?
UT is updated to cover the case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48821 from pan3793/SPARK-50287.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/v2/avro/AvroTable.scala | 6 +++--
.../sql/execution/datasources/v2/FileTable.scala | 14 +++++++++++
.../sql/execution/datasources/v2/FileWrite.scala | 2 +-
.../execution/datasources/v2/csv/CSVTable.scala | 6 +++--
.../execution/datasources/v2/json/JsonTable.scala | 6 +++--
.../execution/datasources/v2/orc/OrcTable.scala | 6 +++--
.../datasources/v2/parquet/ParquetTable.scala | 6 +++--
.../execution/datasources/v2/text/TextTable.scala | 6 +++--
.../execution/datasources/v2/FileTableSuite.scala | 27 ++++++++++++++--------
9 files changed, 57 insertions(+), 22 deletions(-)
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
index 8ec711b2757f..e898253be116 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
@@ -42,10 +42,12 @@ case class AvroTable(
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder {
- override def build(): Write = AvroWrite(paths, formatName,
supportsDataType, info)
+ override def build(): Write =
+ AvroWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
}
+ }
override def supportsDataType(dataType: DataType): Boolean =
AvroUtils.supportsDataType(dataType)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 4eee731e0b2d..863104da80c2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
LogicalWriteInfoImpl}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{FileStreamSink,
MetadataLogFileIndex}
@@ -159,6 +160,19 @@ abstract class FileTable(
options.asCaseSensitiveMap().asScala
new CaseInsensitiveStringMap(finalOptions.asJava)
}
+
+ /**
+ * Merge the options of FileTable and the LogicalWriteInfo while respecting
the
+ * keys of the options carried by LogicalWriteInfo.
+ */
+ protected def mergedWriteInfo(writeInfo: LogicalWriteInfo): LogicalWriteInfo
= {
+ LogicalWriteInfoImpl(
+ writeInfo.queryId(),
+ writeInfo.schema(),
+ mergedOptions(writeInfo.options()),
+ writeInfo.rowIdSchema(),
+ writeInfo.metadataSchema())
+ }
}
object FileTable {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
index f4cabcb69d08..77e1ade44780 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
@@ -49,7 +49,7 @@ trait FileWrite extends Write {
private val schema = info.schema()
private val queryId = info.queryId()
- private val options = info.options()
+ val options = info.options()
override def description(): String = formatName
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
index 4c201ca66cf6..df8df37b711f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
@@ -49,10 +49,12 @@ case class CSVTable(
CSVDataSource(parsedOptions).inferSchema(sparkSession, files,
parsedOptions)
}
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder {
- override def build(): Write = CSVWrite(paths, formatName,
supportsDataType, info)
+ override def build(): Write =
+ CSVWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
}
+ }
override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
index 54244c4d95e7..1c1d3393b95a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
@@ -49,10 +49,12 @@ case class JsonTable(
sparkSession, files, parsedOptions)
}
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder {
- override def build(): Write = JsonWrite(paths, formatName,
supportsDataType, info)
+ override def build(): Write =
+ JsonWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
}
+ }
override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
index 1037370967c8..81c347ae9c59 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
@@ -43,10 +43,12 @@ case class OrcTable(
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder {
- override def build(): Write = OrcWrite(paths, formatName,
supportsDataType, info)
+ override def build(): Write =
+ OrcWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
}
+ }
override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
index 8463a05569c0..28c5a62f91ec 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
@@ -43,10 +43,12 @@ case class ParquetTable(
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files)
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder {
- override def build(): Write = ParquetWrite(paths, formatName,
supportsDataType, info)
+ override def build(): Write =
+ ParquetWrite(paths, formatName, supportsDataType,
mergedWriteInfo(info))
}
+ }
override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
index 87ae34532f88..d8880b84c621 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
@@ -39,10 +39,12 @@ case class TextTable(
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
Some(StructType(Array(StructField("value", StringType))))
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder {
- override def build(): Write = TextWrite(paths, formatName,
supportsDataType, info)
+ override def build(): Write =
+ TextWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
}
+ }
override def supportsDataType(dataType: DataType): Boolean = dataType ==
StringType
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
index 0316f09e42ce..0d18e3bf809e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
LogicalWriteInfoImpl, WriteBuilder}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
@@ -96,8 +96,8 @@ class FileTableSuite extends QueryTest with
SharedSparkSession {
}
allFileBasedDataSources.foreach { format =>
- test(s"SPARK-49519: Merge options of table and relation when constructing
FileScanBuilder" +
- s" - $format") {
+ test("SPARK-49519, SPARK-50287: Merge options of table and relation when "
+
+ s"constructing ScanBuilder and WriteBuilder in FileFormat - $format") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
val userSpecifiedSchema = StructType(Seq(StructField("c1",
StringType)))
@@ -108,20 +108,29 @@ class FileTableSuite extends QueryTest with
SharedSparkSession {
val table = provider.getTable(
userSpecifiedSchema,
Array.empty,
- dsOptions.asCaseSensitiveMap())
+ dsOptions.asCaseSensitiveMap()).asInstanceOf[FileTable]
val tableOptions = new CaseInsensitiveStringMap(
Map("k2" -> "table_v2", "k3" -> "v3").asJava)
- val mergedOptions =
table.asInstanceOf[FileTable].newScanBuilder(tableOptions) match {
+
+ val mergedReadOptions = table.newScanBuilder(tableOptions) match {
case csv: CSVScanBuilder => csv.options
case json: JsonScanBuilder => json.options
case orc: OrcScanBuilder => orc.options
case parquet: ParquetScanBuilder => parquet.options
case text: TextScanBuilder => text.options
}
- assert(mergedOptions.size() == 3)
- assert("v1".equals(mergedOptions.get("k1")))
- assert("table_v2".equals(mergedOptions.get("k2")))
- assert("v3".equals(mergedOptions.get("k3")))
+ assert(mergedReadOptions.size === 3)
+ assert(mergedReadOptions.get("k1") === "v1")
+ assert(mergedReadOptions.get("k2") === "table_v2")
+ assert(mergedReadOptions.get("k3") === "v3")
+
+ val writeInfo = LogicalWriteInfoImpl("query-id",
userSpecifiedSchema, tableOptions)
+ val mergedWriteOptions = table.newWriteBuilder(writeInfo).build()
+ .asInstanceOf[FileWrite].options
+ assert(mergedWriteOptions.size === 3)
+ assert(mergedWriteOptions.get("k1") === "v1")
+ assert(mergedWriteOptions.get("k2") === "table_v2")
+ assert(mergedWriteOptions.get("k3") === "v3")
case _ =>
throw new IllegalArgumentException(s"Failed to get table provider
for $format")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]