This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 27f363e62cbe Revert "[SPARK-54310][SQL] Add `numSourceRows` metric for
`MergeIntoExec`"
27f363e62cbe is described below
commit 27f363e62cbe183672ffaac5fc66bfc05408f0c2
Author: Amanda Liu <[email protected]>
AuthorDate: Wed Dec 3 20:41:26 2025 -0800
Revert "[SPARK-54310][SQL] Add `numSourceRows` metric for `MergeIntoExec`"
### What changes were proposed in this pull request?
Clean revert of d65234b8578ce9a01ed656367646e2bea8f0f4fb.
Will later handle for cases of sourceSide child nodes without
`numOutputRows`, and will re-target the new implementation to later Spark
release.
### Why are the changes needed?
The current implementation may grab the incorrect `numOutputRows` metric if
there is an intermediary node (such as custom Spark operator) which does not
support the metric.
This is because we target the first sourceSide child node with
`numOutputRows`. If a SparkExtension node does not contain this metric but
transforms the source table, then we could progress all the way to the source
table and grab the incorrect metric.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing CI, as this is a revert
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53293 from asl3/numsourcerowsrevert.
Authored-by: Amanda Liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit ee41857903ee2bd1675d65414b0aceeba6e6cd9b)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/connector/write/MergeSummary.java | 5 -
.../sql/connector/write/MergeSummaryImpl.scala | 1 -
.../datasources/v2/WriteToDataSourceV2Exec.scala | 39 +-
.../sql/connector/MergeIntoTableSuiteBase.scala | 528 +++++++++------------
4 files changed, 227 insertions(+), 346 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
index 37917bd7649d..e5ae57a76708 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
@@ -27,11 +27,6 @@ import org.apache.spark.annotation.Evolving;
@Evolving
public interface MergeSummary extends WriteSummary {
- /**
- * Returns the number of source rows.
- */
- long numSourceRows();
-
/**
* Returns the number of target rows copied unmodified because they did not
match any action,
* or -1 if not found.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
index f07f47061ee8..911749072c43 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
@@ -21,7 +21,6 @@ package org.apache.spark.sql.connector.write
* Implementation of [[MergeSummary]] that provides MERGE operation summary.
*/
private[sql] case class MergeSummaryImpl(
- numSourceRows: Long,
numTargetRowsCopied: Long,
numTargetRowsDeleted: Long,
numTargetRowsUpdated: Long,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 75915d97ba4b..3e4a2f792a1c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -31,11 +31,10 @@ import
org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSER
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo,
TableWritePrivilege}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.metric.CustomMetric
-import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter,
DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl,
PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage,
WriteSummary}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter,
DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl,
PhysicalWriteInfoImpl, Write, WriterCommitMessage, WriteSummary}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan,
SQLExecution, UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.joins.BaseJoinExec
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric,
SQLMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES
@@ -493,9 +492,7 @@ trait V2TableWriteExec extends V2CommandExec with
UnaryExecNode with AdaptiveSpa
private def getWriteSummary(query: SparkPlan): Option[WriteSummary] = {
collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
val metrics = n.metrics
- val numSourceRows = getNumSourceRows(n)
MergeSummaryImpl(
- numSourceRows,
metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L),
metrics.get("numTargetRowsDeleted").map(_.value).getOrElse(-1L),
metrics.get("numTargetRowsUpdated").map(_.value).getOrElse(-1L),
@@ -507,40 +504,6 @@ trait V2TableWriteExec extends V2CommandExec with
UnaryExecNode with AdaptiveSpa
)
}
}
-
- private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = {
- def hasTargetTable(plan: SparkPlan): Boolean = {
- collectFirst(plan) {
- case scan @ BatchScanExec(_, _, _, _, _: RowLevelOperationTable, _) =>
scan
- }.isDefined
- }
-
- def findSourceScan(join: BaseJoinExec): Option[SparkPlan] = {
- val leftHasTarget = hasTargetTable(join.left)
- val rightHasTarget = hasTargetTable(join.right)
-
- val sourceSide = if (leftHasTarget) {
- Some(join.right)
- } else if (rightHasTarget) {
- Some(join.left)
- } else {
- None
- }
-
- sourceSide.flatMap { side =>
- collectFirst(side) {
- case source if source.metrics.contains("numOutputRows") =>
- source
- }
- }
- }
-
- (for {
- join <- collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j }
- sourceScan <- findSourceScan(join)
- metric <- sourceScan.metrics.get("numOutputRows")
- } yield metric.value).getOrElse(-1L)
- }
}
trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with
Serializable {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 7539506e8bfe..a36aeab1295a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -1779,179 +1779,159 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("Merge metrics with matched clause") {
- Seq("true", "false").foreach { aqeEnabled: String =>
- withTempView("source") {
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
- createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
- """{ "pk": 1, "salary": 100, "dep": "hr" }
- |{ "pk": 2, "salary": 200, "dep": "software" }
- |{ "pk": 3, "salary": 300, "dep": "hr" }
- |""".stripMargin)
-
- val sourceDF = Seq(1, 2, 10).toDF("pk")
- sourceDF.createOrReplaceTempView("source")
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
- val mergeExec = findMergeExec {
- s"""MERGE INTO $tableNameAsString t
- |USING source s
- |ON t.pk = s.pk
- |WHEN MATCHED AND salary < 200 THEN
- | UPDATE SET salary = 1000
- |""".stripMargin
- }
+ val sourceDF = Seq(1, 2, 10).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
- assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0
else 2)
- assertMetric(mergeExec, "numTargetRowsInserted", 0)
- assertMetric(mergeExec, "numTargetRowsUpdated", 1)
- assertMetric(mergeExec, "numTargetRowsDeleted", 0)
- assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
- assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
- assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
- assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
+ val mergeExec = findMergeExec {
+ s"""MERGE INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED AND salary < 200 THEN
+ | UPDATE SET salary = 1000
+ |""".stripMargin
+ }
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, 1000, "hr"), // updated
- Row(2, 200, "software"),
- Row(3, 300, "hr")))
- }
+ assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 2)
+ assertMetric(mergeExec, "numTargetRowsInserted", 0)
+ assertMetric(mergeExec, "numTargetRowsUpdated", 1)
+ assertMetric(mergeExec, "numTargetRowsDeleted", 0)
+ assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
+ assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
+ assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
+ assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
- val mergeSummary = getMergeSummary()
- assert(mergeSummary.numSourceRows === 3L)
- assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else
2L))
- assert(mergeSummary.numTargetRowsInserted === 0L)
- assert(mergeSummary.numTargetRowsUpdated === 1L)
- assert(mergeSummary.numTargetRowsDeleted === 0L)
- assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
- assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 1000, "hr"), // updated
+ Row(2, 200, "software"),
+ Row(3, 300, "hr")))
- sql(s"DROP TABLE $tableNameAsString")
- }
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 2L))
+ assert(mergeSummary.numTargetRowsInserted === 0L)
+ assert(mergeSummary.numTargetRowsUpdated === 1L)
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
}
}
test("Merge metrics with matched and not matched clause") {
- Seq("true", "false").foreach { aqeEnabled: String =>
- withTempView("source") {
- createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
- """{ "pk": 1, "salary": 100, "dep": "hr" }
- |{ "pk": 2, "salary": 200, "dep": "software" }
- |{ "pk": 3, "salary": 300, "dep": "hr" }
- |""".stripMargin)
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
- val sourceDF = Seq(
- (4, 100, "marketing"),
- (5, 400, "executive"),
- (6, 100, "hr")
- ).toDF("pk", "salary", "dep")
- sourceDF.createOrReplaceTempView("source")
+ val sourceDF = Seq(
+ (4, 100, "marketing"),
+ (5, 400, "executive"),
+ (6, 100, "hr")
+ ).toDF("pk", "salary", "dep")
+ sourceDF.createOrReplaceTempView("source")
- val mergeExec = findMergeExec {
- s"""MERGE INTO $tableNameAsString t
- |USING source s
- |ON t.pk = s.pk
- |WHEN MATCHED THEN
- | UPDATE SET salary = 9999
- |WHEN NOT MATCHED AND salary > 200 THEN
- | INSERT *
- |""".stripMargin
- }
+ val mergeExec = findMergeExec {
+ s"""MERGE INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET salary = 9999
+ |WHEN NOT MATCHED AND salary > 200 THEN
+ | INSERT *
+ |""".stripMargin
+ }
- assertMetric(mergeExec, "numTargetRowsCopied", 0)
- assertMetric(mergeExec, "numTargetRowsInserted", 1)
- assertMetric(mergeExec, "numTargetRowsUpdated", 0)
- assertMetric(mergeExec, "numTargetRowsDeleted", 0)
- assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0)
- assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
- assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
- assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
+ assertMetric(mergeExec, "numTargetRowsCopied", 0)
+ assertMetric(mergeExec, "numTargetRowsInserted", 1)
+ assertMetric(mergeExec, "numTargetRowsUpdated", 0)
+ assertMetric(mergeExec, "numTargetRowsDeleted", 0)
+ assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0)
+ assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
+ assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
+ assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, 100, "hr"),
- Row(2, 200, "software"),
- Row(3, 300, "hr"),
- Row(5, 400, "executive"))) // inserted
-
- val mergeSummary = getMergeSummary()
- // TODO SPARK-52578: Handle this case when optimizer removes Join due
to no matching pks
- assert(mergeSummary.numSourceRows === (if (deltaMerge) 3L else -1L))
- assert(mergeSummary.numTargetRowsCopied === 0L)
- assert(mergeSummary.numTargetRowsInserted === 1L)
- assert(mergeSummary.numTargetRowsUpdated === 0L)
- assert(mergeSummary.numTargetRowsDeleted === 0L)
- assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
- assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"),
+ Row(2, 200, "software"),
+ Row(3, 300, "hr"),
+ Row(5, 400, "executive"))) // inserted
- sql(s"DROP TABLE $tableNameAsString")
- }
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsCopied === 0L)
+ assert(mergeSummary.numTargetRowsInserted === 1L)
+ assert(mergeSummary.numTargetRowsUpdated === 0L)
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
}
}
test("Merge metrics with matched and not matched by source clauses: update")
{
- Seq("true", "false").foreach { aqeEnabled: String =>
- withTempView("source") {
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
- createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
- """{ "pk": 1, "salary": 100, "dep": "hr" }
- |{ "pk": 2, "salary": 200, "dep": "software" }
- |{ "pk": 3, "salary": 300, "dep": "hr" }
- |{ "pk": 4, "salary": 400, "dep": "marketing" }
- |{ "pk": 5, "salary": 500, "dep": "executive" }
- |""".stripMargin)
-
- val sourceDF = Seq(1, 2, 10).toDF("pk")
- sourceDF.createOrReplaceTempView("source")
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |{ "pk": 4, "salary": 400, "dep": "marketing" }
+ |{ "pk": 5, "salary": 500, "dep": "executive" }
+ |""".stripMargin)
- val mergeExec = findMergeExec {
- s"""MERGE INTO $tableNameAsString t
- |USING source s
- |ON t.pk = s.pk
- |WHEN MATCHED AND salary < 200 THEN
- | UPDATE SET salary = 1000
- |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
- | UPDATE SET salary = -1
- |""".stripMargin
- }
+ val sourceDF = Seq(1, 2, 10).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
- assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0
else 3)
- assertMetric(mergeExec, "numTargetRowsInserted", 0)
- assertMetric(mergeExec, "numTargetRowsUpdated", 2)
- assertMetric(mergeExec, "numTargetRowsDeleted", 0)
- assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
- assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
- assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1)
- assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
+ val mergeExec = findMergeExec {
+ s"""MERGE INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED AND salary < 200 THEN
+ | UPDATE SET salary = 1000
+ |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
+ | UPDATE SET salary = -1
+ |""".stripMargin
+ }
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, 1000, "hr"), // updated
- Row(2, 200, "software"),
- Row(3, 300, "hr"),
- Row(4, 400, "marketing"),
- Row(5, -1, "executive"))) // updated
- }
+ assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3)
+ assertMetric(mergeExec, "numTargetRowsInserted", 0)
+ assertMetric(mergeExec, "numTargetRowsUpdated", 2)
+ assertMetric(mergeExec, "numTargetRowsDeleted", 0)
+ assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
+ assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
+ assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1)
+ assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
- val mergeSummary = getMergeSummary()
- assert(mergeSummary.numSourceRows === 3L)
- assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else
3L))
- assert(mergeSummary.numTargetRowsInserted === 0L)
- assert(mergeSummary.numTargetRowsUpdated === 2L)
- assert(mergeSummary.numTargetRowsDeleted === 0L)
- assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
- assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 1000, "hr"), // updated
+ Row(2, 200, "software"),
+ Row(3, 300, "hr"),
+ Row(4, 400, "marketing"),
+ Row(5, -1, "executive"))) // updated
- sql(s"DROP TABLE $tableNameAsString")
- }
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+ assert(mergeSummary.numTargetRowsInserted === 0L)
+ assert(mergeSummary.numTargetRowsUpdated === 2L)
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
}
}
@@ -2000,7 +1980,6 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
)
val mergeSummary = getMergeSummary()
- assert(mergeSummary.numSourceRows === 3L)
assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
assert(mergeSummary.numTargetRowsInserted === 0L)
assert(mergeSummary.numTargetRowsUpdated === 0L)
@@ -2013,130 +1992,116 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("Merge metrics with matched, not matched, and not matched by source
clauses: update") {
- Seq("true", "false").foreach { aqeEnabled: String =>
- withTempView("source") {
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
- createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
- """{ "pk": 1, "salary": 100, "dep": "hr" }
- |{ "pk": 2, "salary": 200, "dep": "software" }
- |{ "pk": 3, "salary": 300, "dep": "hr" }
- |{ "pk": 4, "salary": 400, "dep": "marketing" }
- |{ "pk": 5, "salary": 500, "dep": "executive" }
- |""".stripMargin)
-
- val sourceDF = Seq(1, 2, 6, 10).toDF("pk")
- sourceDF.createOrReplaceTempView("source")
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |{ "pk": 4, "salary": 400, "dep": "marketing" }
+ |{ "pk": 5, "salary": 500, "dep": "executive" }
+ |""".stripMargin)
- val mergeExec = findMergeExec {
- s"""MERGE INTO $tableNameAsString t
- |USING source s
- |ON t.pk = s.pk
- |WHEN MATCHED AND salary < 200 THEN
- | UPDATE SET salary = 1000
- |WHEN NOT MATCHED AND s.pk < 10 THEN
- | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy")
- |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
- | UPDATE SET salary = -1
- |""".stripMargin
- }
+ val sourceDF = Seq(1, 2, 6, 10).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
- assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0
else 3)
- assertMetric(mergeExec, "numTargetRowsInserted", 1)
- assertMetric(mergeExec, "numTargetRowsUpdated", 2)
- assertMetric(mergeExec, "numTargetRowsDeleted", 0)
- assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
- assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
- assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1)
- assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
+ val mergeExec = findMergeExec {
+ s"""MERGE INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED AND salary < 200 THEN
+ | UPDATE SET salary = 1000
+ |WHEN NOT MATCHED AND s.pk < 10 THEN
+ | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy")
+ |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
+ | UPDATE SET salary = -1
+ |""".stripMargin
+ }
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, 1000, "hr"), // updated
- Row(2, 200, "software"),
- Row(3, 300, "hr"),
- Row(4, 400, "marketing"),
- Row(5, -1, "executive"), // updated
- Row(6, -1, "dummy"))) // inserted
- }
+ assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3)
+ assertMetric(mergeExec, "numTargetRowsInserted", 1)
+ assertMetric(mergeExec, "numTargetRowsUpdated", 2)
+ assertMetric(mergeExec, "numTargetRowsDeleted", 0)
+ assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
+ assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
+ assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1)
+ assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
- val mergeSummary = getMergeSummary()
- assert(mergeSummary.numSourceRows === 4L)
- assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else
3L))
- assert(mergeSummary.numTargetRowsInserted === 1L)
- assert(mergeSummary.numTargetRowsUpdated === 2L)
- assert(mergeSummary.numTargetRowsDeleted === 0L)
- assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
- assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 1000, "hr"), // updated
+ Row(2, 200, "software"),
+ Row(3, 300, "hr"),
+ Row(4, 400, "marketing"),
+ Row(5, -1, "executive"), // updated
+ Row(6, -1, "dummy"))) // inserted
- sql(s"DROP TABLE $tableNameAsString")
- }
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+ assert(mergeSummary.numTargetRowsInserted === 1L)
+ assert(mergeSummary.numTargetRowsUpdated === 2L)
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
}
}
test("Merge metrics with matched, not matched, and not matched by source
clauses: delete") {
- Seq("true", "false").foreach { aqeEnabled: String =>
- withTempView("source") {
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
- createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
- """{ "pk": 1, "salary": 100, "dep": "hr" }
- |{ "pk": 2, "salary": 200, "dep": "software" }
- |{ "pk": 3, "salary": 300, "dep": "hr" }
- |{ "pk": 4, "salary": 400, "dep": "marketing" }
- |{ "pk": 5, "salary": 500, "dep": "executive" }
- |""".stripMargin)
-
- val sourceDF = Seq(1, 2, 6, 10).toDF("pk")
- sourceDF.createOrReplaceTempView("source")
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |{ "pk": 4, "salary": 400, "dep": "marketing" }
+ |{ "pk": 5, "salary": 500, "dep": "executive" }
+ |""".stripMargin)
- val mergeExec = findMergeExec {
- s"""MERGE INTO $tableNameAsString t
- |USING source s
- |ON t.pk = s.pk
- |WHEN MATCHED AND salary < 200 THEN
- | DELETE
- |WHEN NOT MATCHED AND s.pk < 10 THEN
- | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy")
- |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
- | DELETE
- |""".stripMargin
- }
+ val sourceDF = Seq(1, 2, 6, 10).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
- assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0
else 3)
- assertMetric(mergeExec, "numTargetRowsInserted", 1)
- assertMetric(mergeExec, "numTargetRowsUpdated", 0)
- assertMetric(mergeExec, "numTargetRowsDeleted", 2)
- assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0)
- assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 1)
- assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
- assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1)
+ val mergeExec = findMergeExec {
+ s"""MERGE INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED AND salary < 200 THEN
+ | DELETE
+ |WHEN NOT MATCHED AND s.pk < 10 THEN
+ | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy")
+ |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
+ | DELETE
+ |""".stripMargin
+ }
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- // Row(1, 100, "hr") deleted
- Row(2, 200, "software"),
- Row(3, 300, "hr"),
- Row(4, 400, "marketing"),
- // Row(5, 500, "executive") deleted
- Row(6, -1, "dummy"))) // inserted
- }
+ assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3)
+ assertMetric(mergeExec, "numTargetRowsInserted", 1)
+ assertMetric(mergeExec, "numTargetRowsUpdated", 0)
+ assertMetric(mergeExec, "numTargetRowsDeleted", 2)
+ assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0)
+ assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 1)
+ assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
+ assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1)
- val mergeSummary = getMergeSummary()
- assert(mergeSummary.numSourceRows === 4L)
- assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else
3L))
- assert(mergeSummary.numTargetRowsInserted === 1L)
- assert(mergeSummary.numTargetRowsUpdated === 0L)
- assert(mergeSummary.numTargetRowsDeleted === 2L)
- assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
- assert(mergeSummary.numTargetRowsMatchedDeleted === 1L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ // Row(1, 100, "hr") deleted
+ Row(2, 200, "software"),
+ Row(3, 300, "hr"),
+ Row(4, 400, "marketing"),
+ // Row(5, 500, "executive") deleted
+ Row(6, -1, "dummy"))) // inserted
- sql(s"DROP TABLE $tableNameAsString")
- }
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+ assert(mergeSummary.numTargetRowsInserted === 1L)
+ assert(mergeSummary.numTargetRowsUpdated === 0L)
+ assert(mergeSummary.numTargetRowsDeleted === 2L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 1L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L)
}
}
@@ -2169,7 +2134,6 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
)
val mergeMetrics = getMergeSummary()
- assert(mergeMetrics.numSourceRows === 4L)
assert(mergeMetrics.numTargetRowsCopied === (if (deltaMerge) 0L else
3L))
assert(mergeMetrics.numTargetRowsInserted === 1L)
assert(mergeMetrics.numTargetRowsUpdated === 0L)
@@ -2185,46 +2149,6 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
}
- test("Merge metrics with numSourceRows for empty source") {
- Seq("true", "false").foreach { aqeEnabled: String =>
- withTempView("source") {
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
- createAndInitTable(
- "pk INT NOT NULL, salary INT, dep STRING",
- """{ "pk": 1, "salary": 100, "dep": "hr" }
- |{ "pk": 2, "salary": 200, "dep": "software" }
- |{ "pk": 3, "salary": 300, "dep": "hr" }
- |""".stripMargin)
-
- // source is empty
- Seq.empty[Int].toDF("pk").createOrReplaceTempView("source")
-
- sql(s"""MERGE INTO $tableNameAsString t
- |USING source s
- |ON t.pk = s.pk
- |WHEN MATCHED THEN
- | UPDATE SET salary = 1000
- |WHEN NOT MATCHED BY SOURCE THEN
- | DELETE
- |""".stripMargin)
-
- val mergeSummary = getMergeSummary()
- assert(mergeSummary.numSourceRows === -1L) // if no numOutputRows,
should be -1
- assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else
0L))
- assert(mergeSummary.numTargetRowsInserted === 0L)
- assert(mergeSummary.numTargetRowsUpdated === 0L)
- assert(mergeSummary.numTargetRowsDeleted === 3L)
- assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
- assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
- assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 3L)
-
- sql(s"DROP TABLE $tableNameAsString")
- }
- }
- }
- }
-
test("Merge schema evolution new column with set explicit column") {
Seq((true, true), (false, true), (true, false)).foreach {
case (withSchemaEvolution, schemaEvolutionEnabled) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]