This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new aea583640f4a [SPARK-51936][SQL] ReplaceTableAsSelect should overwrite 
the new table instead of append
aea583640f4a is described below

commit aea583640f4aede902dc9ef3345713b72c2b2948
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Apr 29 10:53:20 2025 +0800

    [SPARK-51936][SQL] ReplaceTableAsSelect should overwrite the new table 
instead of append
    
    ### What changes were proposed in this pull request?
    
    For file source v1, if you do
    ```
    Seq(1 -> "a").toDF().write.option("path", p).saveAsTable("t")
    Seq(2 -> "b").toDF().write.mode("overwrite").option("path", 
p).saveAsTable("t")
    ```
    At the end, the data of `t` is `[2, "b"]`, because the v1 command 
`CreateDataSourceTableAsSelectCommand` uses `Overwrite` mode to write the data 
to the file directory.
    
    With DS v2, we use the v2 command `ReplaceTableAsSelect`, which uses 
`AppendData` to write to the new table. If the new table still keeps the old 
data, which can happen for file source tables, as DROP TABLE won't delete the 
external location, then the behavior will be different from file source v1.
    
    This PR fixes this inconsistency by using `OverwriteByExpression` in 
`ReplaceTableAsSelect` physical commands.
    
    ### Why are the changes needed?
    
    Fixes a potential inconsistency issue between file source v1 and v2, for 
now we are fine as we don't support file source v2 table yet.
    This is also helpful for third-party v2 sources that may retain old data in 
the new table.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, file source v2 table is not supported yet.
    
    ### How was this patch tested?
    
    update an existing test
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #50739 from cloud-fan/RTAS.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 25 ++++++++++++----------
 .../spark/sql/connector/DataSourceV2Suite.scala    |  2 +-
 2 files changed, 15 insertions(+), 12 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 2d1964f6a217..4436c6b24f7c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -24,8 +24,8 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, ProjectingInternalRow}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
TableSpec, UnaryNode}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
OverwriteByExpression, TableSpec, UnaryNode}
 import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, 
CharVarcharUtils, ReplaceDataProjections, WriteDeltaProjections}
 import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, 
INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION, WRITE_OPERATION, 
WRITE_WITH_METADATA_OPERATION}
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, 
TableWritePrivilege}
@@ -89,7 +89,7 @@ case class CreateTableAsSelectExec(
       .build()
     val table = Option(catalog.createTable(ident, tableInfo))
       .getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, table, writeOptions, ident, query)
+    writeToTable(catalog, table, writeOptions, ident, query, overwrite = false)
   }
 }
 
@@ -130,7 +130,7 @@ case class AtomicCreateTableAsSelectExec(
       .build()
     val stagedTable = Option(catalog.stageCreate(ident, tableInfo)
     ).getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, stagedTable, writeOptions, ident, query)
+    writeToTable(catalog, stagedTable, writeOptions, ident, query, overwrite = 
false)
   }
 }
 
@@ -180,7 +180,7 @@ case class ReplaceTableAsSelectExec(
       .build()
     val table = Option(catalog.createTable(ident, tableInfo))
       .getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, table, writeOptions, ident, query)
+    writeToTable(catalog, table, writeOptions, ident, query, overwrite = true)
   }
 }
 
@@ -242,7 +242,7 @@ case class AtomicReplaceTableAsSelectExec(
     }
     val table = Option(staged).getOrElse(
       catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, table, writeOptions, ident, query)
+    writeToTable(catalog, table, writeOptions, ident, query, overwrite = true)
   }
 }
 
@@ -697,15 +697,18 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends 
LeafV2CommandExec {
       table: Table,
       writeOptions: Map[String, String],
       ident: Identifier,
-      query: LogicalPlan): Seq[InternalRow] = {
+      query: LogicalPlan,
+      overwrite: Boolean): Seq[InternalRow] = {
     Utils.tryWithSafeFinallyAndFailureCallbacks({
       val relation = DataSourceV2Relation.create(table, Some(catalog), 
Some(ident))
-      val append = AppendData.byPosition(relation, query, writeOptions)
-      val qe = session.sessionState.executePlan(append)
+      val writeCommand = if (overwrite) {
+        OverwriteByExpression.byPosition(relation, query, Literal.TrueLiteral, 
writeOptions)
+      } else {
+        AppendData.byPosition(relation, query, writeOptions)
+      }
+      val qe = session.sessionState.executePlan(writeCommand)
       qe.assertCommandExecuted()
-
       DataSourceV2Utils.commitStagedChanges(sparkContext, table, metrics)
-
       Nil
     })(catchBlock = {
       table match {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index f5ca885b1ad6..3eeed2e41754 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -819,7 +819,7 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
              |OPTIONS (PATH '$path')
              |AS VALUES (2, 3)
              |""".stripMargin)
-        checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), 
Row(1, 2), Row(2, 3)))
+        checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3)))
         // Replace the table without the path options.
         sql(
           s"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to