This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new aea583640f4a [SPARK-51936][SQL] ReplaceTableAsSelect should overwrite the new table instead of append aea583640f4a is described below commit aea583640f4aede902dc9ef3345713b72c2b2948 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Apr 29 10:53:20 2025 +0800 [SPARK-51936][SQL] ReplaceTableAsSelect should overwrite the new table instead of append ### What changes were proposed in this pull request? For file source v1, if you do ``` Seq(1 -> "a").toDF().write.option("path", p).saveAsTable("t") Seq(2 -> "b").toDF().write.mode("overwrite").option("path", p).saveAsTable("t") ``` At the end, the data of `t` is `[2, "b"]`, because the v1 command `CreateDataSourceTableAsSelectCommand` uses `Overwrite` mode to write the data to the file directory. With DS v2, we use the v2 command `ReplaceTableAsSelect`, which uses `AppendData` to write to the new table. If the new table still keeps the old data, which can happen for file source tables, as DROP TABLE won't delete the external location, then the behavior will be different from file source v1. This PR fixes this inconsistency by using `OverwriteByExpression` in `ReplaceTableAsSelect` physical commands. ### Why are the changes needed? Fixes a potential inconsistency issue between file source v1 and v2, for now we are fine as we don't support file source v2 table yet. This is also helpful for third-party v2 sources that may retain old data in the new table. ### Does this PR introduce _any_ user-facing change? No, file source v2 table is not supported yet. ### How was this patch tested? update an existing test ### Was this patch authored or co-authored using generative AI tooling? no Closes #50739 from cloud-fan/RTAS. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/v2/WriteToDataSourceV2Exec.scala | 25 ++++++++++++---------- .../spark/sql/connector/DataSourceV2Suite.scala | 2 +- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 2d1964f6a217..4436c6b24f7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -24,8 +24,8 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{InternalRow, ProjectingInternalRow} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, TableSpec, UnaryNode} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, TableSpec, UnaryNode} import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils, ReplaceDataProjections, WriteDeltaProjections} import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION, WRITE_OPERATION, WRITE_WITH_METADATA_OPERATION} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} @@ -89,7 +89,7 @@ case class CreateTableAsSelectExec( .build() val table = Option(catalog.createTable(ident, tableInfo)) .getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, table, writeOptions, ident, query) + writeToTable(catalog, table, writeOptions, ident, query, overwrite = false) } } @@ -130,7 +130,7 @@ case class AtomicCreateTableAsSelectExec( .build() val stagedTable = Option(catalog.stageCreate(ident, tableInfo) ).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, stagedTable, writeOptions, ident, query) + writeToTable(catalog, stagedTable, writeOptions, ident, query, overwrite = false) } } @@ -180,7 +180,7 @@ case class ReplaceTableAsSelectExec( .build() val table = Option(catalog.createTable(ident, tableInfo)) .getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, table, writeOptions, ident, query) + writeToTable(catalog, table, writeOptions, ident, query, overwrite = true) } } @@ -242,7 +242,7 @@ case class AtomicReplaceTableAsSelectExec( } val table = Option(staged).getOrElse( catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, table, writeOptions, ident, query) + writeToTable(catalog, table, writeOptions, ident, query, overwrite = true) } } @@ -697,15 +697,18 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec { table: Table, writeOptions: Map[String, String], ident: Identifier, - query: LogicalPlan): Seq[InternalRow] = { + query: LogicalPlan, + overwrite: Boolean): Seq[InternalRow] = { Utils.tryWithSafeFinallyAndFailureCallbacks({ val relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) - val append = AppendData.byPosition(relation, query, writeOptions) - val qe = session.sessionState.executePlan(append) + val writeCommand = if (overwrite) { + OverwriteByExpression.byPosition(relation, query, Literal.TrueLiteral, writeOptions) + } else { + AppendData.byPosition(relation, query, writeOptions) + } + val qe = session.sessionState.executePlan(writeCommand) qe.assertCommandExecuted() - DataSourceV2Utils.commitStagedChanges(sparkContext, table, metrics) - Nil })(catchBlock = { table match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index f5ca885b1ad6..3eeed2e41754 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -819,7 +819,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS |OPTIONS (PATH '$path') |AS VALUES (2, 3) |""".stripMargin) - checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), Row(1, 2), Row(2, 3))) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3))) // Replace the table without the path options. sql( s""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org