This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dc65a5416b5 [SPARK-50287][SQL] Merge options of table and relation 
when creating WriteBuilder in FileTable
5dc65a5416b5 is described below

commit 5dc65a5416b51b9a41d36881cb919a9d10834511
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Nov 25 21:54:32 2024 +0800

    [SPARK-50287][SQL] Merge options of table and relation when creating 
WriteBuilder in FileTable
    
    ### What changes were proposed in this pull request?
    
    Merge `options` of table and relation when creating WriteBuilder in 
FileTable.
    
    ### Why are the changes needed?
    
    Similar to SPARK-49519 which fixes the read path.
    
    ### Does this PR introduce _any_ user-facing change?
    
    FileTable's options are accounted on the V2 write path now, but given the 
built-in file formats use V1 by default, it has no real effect.
    
    ### How was this patch tested?
    
    UT is updated to cover the case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48821 from pan3793/SPARK-50287.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/v2/avro/AvroTable.scala   |  6 +++--
 .../sql/execution/datasources/v2/FileTable.scala   | 14 +++++++++++
 .../sql/execution/datasources/v2/FileWrite.scala   |  2 +-
 .../execution/datasources/v2/csv/CSVTable.scala    |  6 +++--
 .../execution/datasources/v2/json/JsonTable.scala  |  6 +++--
 .../execution/datasources/v2/orc/OrcTable.scala    |  6 +++--
 .../datasources/v2/parquet/ParquetTable.scala      |  6 +++--
 .../execution/datasources/v2/text/TextTable.scala  |  6 +++--
 .../execution/datasources/v2/FileTableSuite.scala  | 27 ++++++++++++++--------
 9 files changed, 57 insertions(+), 22 deletions(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
index 8ec711b2757f..e898253be116 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
@@ -42,10 +42,12 @@ case class AvroTable(
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
     AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)
 
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
     new WriteBuilder {
-      override def build(): Write = AvroWrite(paths, formatName, 
supportsDataType, info)
+      override def build(): Write =
+        AvroWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
     }
+  }
 
   override def supportsDataType(dataType: DataType): Boolean = 
AvroUtils.supportsDataType(dataType)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 4eee731e0b2d..863104da80c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, 
Table, TableCapability}
 import org.apache.spark.sql.connector.catalog.TableCapability._
 import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, 
LogicalWriteInfoImpl}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.streaming.{FileStreamSink, 
MetadataLogFileIndex}
@@ -159,6 +160,19 @@ abstract class FileTable(
       options.asCaseSensitiveMap().asScala
     new CaseInsensitiveStringMap(finalOptions.asJava)
   }
+
+  /**
+   * Merge the options of FileTable and the LogicalWriteInfo while respecting 
the
+   * keys of the options carried by LogicalWriteInfo.
+   */
+  protected def mergedWriteInfo(writeInfo: LogicalWriteInfo): LogicalWriteInfo 
= {
+    LogicalWriteInfoImpl(
+      writeInfo.queryId(),
+      writeInfo.schema(),
+      mergedOptions(writeInfo.options()),
+      writeInfo.rowIdSchema(),
+      writeInfo.metadataSchema())
+  }
 }
 
 object FileTable {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
index f4cabcb69d08..77e1ade44780 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
@@ -49,7 +49,7 @@ trait FileWrite extends Write {
 
   private val schema = info.schema()
   private val queryId = info.queryId()
-  private val options = info.options()
+  val options = info.options()
 
   override def description(): String = formatName
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
index 4c201ca66cf6..df8df37b711f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
@@ -49,10 +49,12 @@ case class CSVTable(
     CSVDataSource(parsedOptions).inferSchema(sparkSession, files, 
parsedOptions)
   }
 
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
     new WriteBuilder {
-      override def build(): Write = CSVWrite(paths, formatName, 
supportsDataType, info)
+      override def build(): Write =
+        CSVWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
     }
+  }
 
   override def supportsDataType(dataType: DataType): Boolean = dataType match {
     case _: AtomicType => true
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
index 54244c4d95e7..1c1d3393b95a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
@@ -49,10 +49,12 @@ case class JsonTable(
       sparkSession, files, parsedOptions)
   }
 
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
     new WriteBuilder {
-      override def build(): Write = JsonWrite(paths, formatName, 
supportsDataType, info)
+      override def build(): Write =
+        JsonWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
     }
+  }
 
   override def supportsDataType(dataType: DataType): Boolean = dataType match {
     case _: AtomicType => true
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
index 1037370967c8..81c347ae9c59 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
@@ -43,10 +43,12 @@ case class OrcTable(
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
     OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)
 
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
     new WriteBuilder {
-      override def build(): Write = OrcWrite(paths, formatName, 
supportsDataType, info)
+      override def build(): Write =
+        OrcWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
     }
+  }
 
   override def supportsDataType(dataType: DataType): Boolean = dataType match {
     case _: AtomicType => true
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
index 8463a05569c0..28c5a62f91ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
@@ -43,10 +43,12 @@ case class ParquetTable(
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
     ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files)
 
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
     new WriteBuilder {
-      override def build(): Write = ParquetWrite(paths, formatName, 
supportsDataType, info)
+      override def build(): Write =
+        ParquetWrite(paths, formatName, supportsDataType, 
mergedWriteInfo(info))
     }
+  }
 
   override def supportsDataType(dataType: DataType): Boolean = dataType match {
     case _: AtomicType => true
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
index 87ae34532f88..d8880b84c621 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
@@ -39,10 +39,12 @@ case class TextTable(
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
     Some(StructType(Array(StructField("value", StringType))))
 
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
     new WriteBuilder {
-      override def build(): Write = TextWrite(paths, formatName, 
supportsDataType, info)
+      override def build(): Write =
+        TextWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
     }
+  }
 
   override def supportsDataType(dataType: DataType): Boolean = dataType == 
StringType
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
index 0316f09e42ce..0d18e3bf809e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus
 
 import org.apache.spark.sql.{QueryTest, SparkSession}
 import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, 
LogicalWriteInfoImpl, WriteBuilder}
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.text.TextFileFormat
@@ -96,8 +96,8 @@ class FileTableSuite extends QueryTest with 
SharedSparkSession {
   }
 
   allFileBasedDataSources.foreach { format =>
-    test(s"SPARK-49519: Merge options of table and relation when constructing 
FileScanBuilder" +
-      s" - $format") {
+    test("SPARK-49519, SPARK-50287: Merge options of table and relation when " 
+
+      s"constructing ScanBuilder and WriteBuilder in FileFormat - $format") {
       withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
         val userSpecifiedSchema = StructType(Seq(StructField("c1", 
StringType)))
 
@@ -108,20 +108,29 @@ class FileTableSuite extends QueryTest with 
SharedSparkSession {
             val table = provider.getTable(
               userSpecifiedSchema,
               Array.empty,
-              dsOptions.asCaseSensitiveMap())
+              dsOptions.asCaseSensitiveMap()).asInstanceOf[FileTable]
             val tableOptions = new CaseInsensitiveStringMap(
               Map("k2" -> "table_v2", "k3" -> "v3").asJava)
-            val mergedOptions = 
table.asInstanceOf[FileTable].newScanBuilder(tableOptions) match {
+
+            val mergedReadOptions = table.newScanBuilder(tableOptions) match {
               case csv: CSVScanBuilder => csv.options
               case json: JsonScanBuilder => json.options
               case orc: OrcScanBuilder => orc.options
               case parquet: ParquetScanBuilder => parquet.options
               case text: TextScanBuilder => text.options
             }
-            assert(mergedOptions.size() == 3)
-            assert("v1".equals(mergedOptions.get("k1")))
-            assert("table_v2".equals(mergedOptions.get("k2")))
-            assert("v3".equals(mergedOptions.get("k3")))
+            assert(mergedReadOptions.size === 3)
+            assert(mergedReadOptions.get("k1") === "v1")
+            assert(mergedReadOptions.get("k2") === "table_v2")
+            assert(mergedReadOptions.get("k3") === "v3")
+
+            val writeInfo = LogicalWriteInfoImpl("query-id", 
userSpecifiedSchema, tableOptions)
+            val mergedWriteOptions = table.newWriteBuilder(writeInfo).build()
+              .asInstanceOf[FileWrite].options
+            assert(mergedWriteOptions.size === 3)
+            assert(mergedWriteOptions.get("k1") === "v1")
+            assert(mergedWriteOptions.get("k2") === "table_v2")
+            assert(mergedWriteOptions.get("k3") === "v3")
           case _ =>
             throw new IllegalArgumentException(s"Failed to get table provider 
for $format")
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to