This is an automated email from the ASF dual-hosted git repository.
gengliangwang pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new a985468ed74d [SPARK-56551][SQL][FOLLOW-UP] Fix setting
`numDeletedRows` metric as -1
a985468ed74d is described below
commit a985468ed74d4090d3af19300fd3dd83441cdf33
Author: Ziya Mukhtarov <[email protected]>
AuthorDate: Mon May 11 10:55:25 2026 -0700
[SPARK-56551][SQL][FOLLOW-UP] Fix setting `numDeletedRows` metric as -1
### What changes were proposed in this pull request?
We were previously calling `SQLMetric.set(-1)` when we couldn't compute the
value of `numDeletedRows` metric. However, this call was a no-op, and we
reported this metric in the write summary as 0 instead. This PR fixes it to
report -1 as intended.
### Why are the changes needed?
Fix the bug above.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a new test.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7
Closes #55576 from ZiyaZa/fix-negative-numdeletedrows.
Authored-by: Ziya Mukhtarov <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit 759036d6088b205f12f4f6073ce741896af42b10)
Signed-off-by: Gengliang Wang <[email protected]>
---
.../datasources/v2/WriteToDataSourceV2Exec.scala | 91 ++++++++++++----------
.../sql/connector/DeleteFromTableSuiteBase.scala | 23 ++++++
2 files changed, 73 insertions(+), 41 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index ccfcdc1855f0..3cbfed40d876 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -365,24 +365,24 @@ case class ReplaceDataExec(
copy(query = newChild)
}
- override protected def getWriteSummary(query: SparkPlan):
Option[WriteSummary] = {
- if (rowLevelCommand == DELETE) {
- // DELETE ReplaceData plans filter out the deleted rows early in the
plan, and they don't
- // reach this node. We need to calculate this value as numScannedRows -
numCopiedRows.
- val numScannedRows = collectFirst(query) {
- case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable]
=>
- getMetricValue(b.metrics, "numOutputRows")
- }
- val numCopiedRows = getMetricValue(metrics, "numCopiedRows")
- val numDeletedRows = if (numScannedRows.exists(_ >= 0) && numCopiedRows
>= 0) {
- numScannedRows.get - numCopiedRows
- } else {
- // One of the metrics couldn't be found, also mark numDeletedRows as
not found.
- -1L
- }
- metrics("numDeletedRows").set(numDeletedRows)
+ override protected def getDeleteSummary(): Option[DeleteSummaryImpl] = {
+ // DELETE ReplaceData plans filter out the deleted rows early in the plan,
and they don't
+ // reach this node. We need to calculate this value as numScannedRows -
numCopiedRows.
+ val numScannedRows = collectFirst(query) {
+ case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] =>
+ getMetricValue(b.metrics, "numOutputRows")
}
- super.getWriteSummary(query)
+ val numCopiedRows = getMetricValue(sparkMetrics, "numCopiedRows")
+ val numDeletedRows = if (numScannedRows.exists(_ >= 0) && numCopiedRows >=
0) {
+ numScannedRows.get - numCopiedRows
+ } else {
+ // One of the metrics couldn't be found, also mark numDeletedRows as not
found.
+ -1L
+ }
+
+ // SQLMetric.set is a no-op if value is -1, leaving the metric in its
invalid state.
+ sparkMetrics("numDeletedRows").set(numDeletedRows)
+ super.getDeleteSummary().map(_.copy(numDeletedRows = numDeletedRows))
}
}
@@ -496,31 +496,40 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec {
metrics.get(name).map(_.value).getOrElse(-1L)
}
- override protected def getWriteSummary(query: SparkPlan):
Option[WriteSummary] = {
+ override protected def getWriteSummary(): Option[WriteSummary] = {
rowLevelCommand match {
- case MERGE =>
- collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
- val metrics = n.metrics
- MergeSummaryImpl(
- getMetricValue(metrics, "numTargetRowsCopied"),
- getMetricValue(metrics, "numTargetRowsDeleted"),
- getMetricValue(metrics, "numTargetRowsUpdated"),
- getMetricValue(metrics, "numTargetRowsInserted"),
- getMetricValue(metrics, "numTargetRowsMatchedUpdated"),
- getMetricValue(metrics, "numTargetRowsMatchedDeleted"),
- getMetricValue(metrics, "numTargetRowsNotMatchedBySourceUpdated"),
- getMetricValue(metrics, "numTargetRowsNotMatchedBySourceDeleted"))
- }
- case UPDATE =>
- Some(UpdateSummaryImpl(
- getMetricValue(sparkMetrics, "numUpdatedRows"),
- getMetricValue(sparkMetrics, "numCopiedRows")))
- case DELETE =>
- Some(DeleteSummaryImpl(
- getMetricValue(sparkMetrics, "numDeletedRows"),
- getMetricValue(sparkMetrics, "numCopiedRows")))
+ case MERGE => getMergeSummary()
+ case UPDATE => getUpdateSummary()
+ case DELETE => getDeleteSummary()
}
}
+
+ protected def getMergeSummary(): Option[MergeSummaryImpl] = {
+ collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
+ val metrics = n.metrics
+ MergeSummaryImpl(
+ getMetricValue(metrics, "numTargetRowsCopied"),
+ getMetricValue(metrics, "numTargetRowsDeleted"),
+ getMetricValue(metrics, "numTargetRowsUpdated"),
+ getMetricValue(metrics, "numTargetRowsInserted"),
+ getMetricValue(metrics, "numTargetRowsMatchedUpdated"),
+ getMetricValue(metrics, "numTargetRowsMatchedDeleted"),
+ getMetricValue(metrics, "numTargetRowsNotMatchedBySourceUpdated"),
+ getMetricValue(metrics, "numTargetRowsNotMatchedBySourceDeleted"))
+ }
+ }
+
+ protected def getUpdateSummary(): Option[UpdateSummaryImpl] = {
+ Some(UpdateSummaryImpl(
+ getMetricValue(sparkMetrics, "numUpdatedRows"),
+ getMetricValue(sparkMetrics, "numCopiedRows")))
+ }
+
+ protected def getDeleteSummary(): Option[DeleteSummaryImpl] = {
+ Some(DeleteSummaryImpl(
+ getMetricValue(sparkMetrics, "numDeletedRows"),
+ getMetricValue(sparkMetrics, "numCopiedRows")))
+ }
}
/**
@@ -582,7 +591,7 @@ trait V2TableWriteExec
}
)
- val writeSummary = getWriteSummary(query)
+ val writeSummary = getWriteSummary()
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE,
batchWrite)} is committing.")
writeSummary match {
case Some(summary) => batchWrite.commit(messages, summary)
@@ -610,7 +619,7 @@ trait V2TableWriteExec
Nil
}
- protected def getWriteSummary(query: SparkPlan): Option[WriteSummary] = None
+ protected def getWriteSummary(): Option[WriteSummary] = None
}
trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with
Serializable {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
index f8d81ee08691..89e3ce503fed 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
@@ -959,6 +959,29 @@ abstract class DeleteFromTableSuiteBase extends
RowLevelOperationSuiteBase {
Row(2, 200, "software")))
}
+ test("delete with NOT IN over empty subquery") {
+ withTempView("empty_subq") {
+ createAndInitTable("pk INT NOT NULL, id INT NOT NULL, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "hr" }
+ |{ "pk": 3, "id": 3, "dep": "hr" }
+ |""".stripMargin)
+
+ Seq.empty[Int].toDF("v").createOrReplaceTempView("empty_subq")
+
+ sql(
+ s"""DELETE FROM $tableNameAsString
+ |WHERE id NOT IN (SELECT v FROM empty_subq)
+ |""".stripMargin)
+
+ checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Nil)
+ // The filter gets replaced by an EmptyRelation in the ReplaceData
executed plan, which hides
+ // the executed BatchScan and prevents computing numDeletedRows using
numOutputRows of the
+ // scan node.
+ checkDeleteMetrics(numDeletedRows = if (deltaDelete) 3 else -1,
numCopiedRows = 0)
+ }
+ }
+
private def executeDeleteWithFilters(query: String): Unit = {
val executedPlan = executeAndKeepPlan {
sql(query)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]