This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 27f363e62cbe Revert "[SPARK-54310][SQL] Add `numSourceRows` metric for 
`MergeIntoExec`"
27f363e62cbe is described below

commit 27f363e62cbe183672ffaac5fc66bfc05408f0c2
Author: Amanda Liu <[email protected]>
AuthorDate: Wed Dec 3 20:41:26 2025 -0800

    Revert "[SPARK-54310][SQL] Add `numSourceRows` metric for `MergeIntoExec`"
    
    ### What changes were proposed in this pull request?
    
    Clean revert of d65234b8578ce9a01ed656367646e2bea8f0f4fb.
    
    Will later handle for cases of sourceSide child nodes without 
`numOutputRows`, and will re-target the new implementation to later Spark 
release.
    
    ### Why are the changes needed?
    
    The current implementation may grab the incorrect `numOutputRows` metric if 
there is an intermediary node (such as custom Spark operator) which does not 
support the metric.
    
    This is because we target the first sourceSide child node with 
`numOutputRows`. If a SparkExtension node does not contain this metric but 
transforms the source table, then we could progress all the way to the source 
table and grab the incorrect metric.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing CI, as this is a revert
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53293 from asl3/numsourcerowsrevert.
    
    Authored-by: Amanda Liu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit ee41857903ee2bd1675d65414b0aceeba6e6cd9b)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/connector/write/MergeSummary.java    |   5 -
 .../sql/connector/write/MergeSummaryImpl.scala     |   1 -
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  39 +-
 .../sql/connector/MergeIntoTableSuiteBase.scala    | 528 +++++++++------------
 4 files changed, 227 insertions(+), 346 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
index 37917bd7649d..e5ae57a76708 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
@@ -27,11 +27,6 @@ import org.apache.spark.annotation.Evolving;
 @Evolving
 public interface MergeSummary extends WriteSummary {
 
-  /**
-   * Returns the number of source rows.
-   */
-  long numSourceRows();
-
   /**
    * Returns the number of target rows copied unmodified because they did not 
match any action,
    * or -1 if not found.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
index f07f47061ee8..911749072c43 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
@@ -21,7 +21,6 @@ package org.apache.spark.sql.connector.write
  * Implementation of [[MergeSummary]] that provides MERGE operation summary.
  */
 private[sql] case class MergeSummaryImpl(
-    numSourceRows: Long,
     numTargetRowsCopied: Long,
     numTargetRowsDeleted: Long,
     numTargetRowsUpdated: Long,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 75915d97ba4b..3e4a2f792a1c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -31,11 +31,10 @@ import 
org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSER
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, 
TableWritePrivilege}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.metric.CustomMetric
-import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, 
PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage, 
WriteSummary}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, 
PhysicalWriteInfoImpl, Write, WriterCommitMessage, WriteSummary}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, 
SQLExecution, UnaryExecNode}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.joins.BaseJoinExec
 import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, 
SQLMetrics}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES
@@ -493,9 +492,7 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode with AdaptiveSpa
   private def getWriteSummary(query: SparkPlan): Option[WriteSummary] = {
     collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
       val metrics = n.metrics
-      val numSourceRows = getNumSourceRows(n)
       MergeSummaryImpl(
-        numSourceRows,
         metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L),
         metrics.get("numTargetRowsDeleted").map(_.value).getOrElse(-1L),
         metrics.get("numTargetRowsUpdated").map(_.value).getOrElse(-1L),
@@ -507,40 +504,6 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode with AdaptiveSpa
       )
     }
   }
-
-  private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = {
-    def hasTargetTable(plan: SparkPlan): Boolean = {
-      collectFirst(plan) {
-        case scan @ BatchScanExec(_, _, _, _, _: RowLevelOperationTable, _) => 
scan
-      }.isDefined
-    }
-
-    def findSourceScan(join: BaseJoinExec): Option[SparkPlan] = {
-      val leftHasTarget = hasTargetTable(join.left)
-      val rightHasTarget = hasTargetTable(join.right)
-
-      val sourceSide = if (leftHasTarget) {
-        Some(join.right)
-      } else if (rightHasTarget) {
-        Some(join.left)
-      } else {
-        None
-      }
-
-      sourceSide.flatMap { side =>
-        collectFirst(side) {
-          case source if source.metrics.contains("numOutputRows") =>
-          source
-        }
-      }
-    }
-
-    (for {
-      join <- collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j }
-      sourceScan <- findSourceScan(join)
-      metric <- sourceScan.metrics.get("numOutputRows")
-    } yield metric.value).getOrElse(-1L)
-  }
 }
 
 trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with 
Serializable {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 7539506e8bfe..a36aeab1295a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -1779,179 +1779,159 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
   }
 
   test("Merge metrics with matched clause") {
-    Seq("true", "false").foreach { aqeEnabled: String =>
-      withTempView("source") {
-        withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
-          createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
-            """{ "pk": 1, "salary": 100, "dep": "hr" }
-              |{ "pk": 2, "salary": 200, "dep": "software" }
-              |{ "pk": 3, "salary": 300, "dep": "hr" }
-              |""".stripMargin)
-
-          val sourceDF = Seq(1, 2, 10).toDF("pk")
-          sourceDF.createOrReplaceTempView("source")
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |""".stripMargin)
 
-          val mergeExec = findMergeExec {
-            s"""MERGE INTO $tableNameAsString t
-               |USING source s
-               |ON t.pk = s.pk
-               |WHEN MATCHED AND salary < 200 THEN
-               | UPDATE SET salary = 1000
-               |""".stripMargin
-          }
+      val sourceDF = Seq(1, 2, 10).toDF("pk")
+      sourceDF.createOrReplaceTempView("source")
 
-          assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 
else 2)
-          assertMetric(mergeExec, "numTargetRowsInserted", 0)
-          assertMetric(mergeExec, "numTargetRowsUpdated", 1)
-          assertMetric(mergeExec, "numTargetRowsDeleted", 0)
-          assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
-          assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
-          assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
-          assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
+      val mergeExec = findMergeExec {
+        s"""MERGE INTO $tableNameAsString t
+           |USING source s
+           |ON t.pk = s.pk
+           |WHEN MATCHED AND salary < 200 THEN
+           | UPDATE SET salary = 1000
+           |""".stripMargin
+      }
 
-          checkAnswer(
-            sql(s"SELECT * FROM $tableNameAsString"),
-            Seq(
-              Row(1, 1000, "hr"), // updated
-              Row(2, 200, "software"),
-              Row(3, 300, "hr")))
-        }
+      assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 2)
+      assertMetric(mergeExec, "numTargetRowsInserted", 0)
+      assertMetric(mergeExec, "numTargetRowsUpdated", 1)
+      assertMetric(mergeExec, "numTargetRowsDeleted", 0)
+      assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
+      assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
+      assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
+      assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
 
-        val mergeSummary = getMergeSummary()
-        assert(mergeSummary.numSourceRows === 3L)
-        assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 
2L))
-        assert(mergeSummary.numTargetRowsInserted === 0L)
-        assert(mergeSummary.numTargetRowsUpdated === 1L)
-        assert(mergeSummary.numTargetRowsDeleted === 0L)
-        assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
-        assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
-        assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
-        assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 1000, "hr"), // updated
+          Row(2, 200, "software"),
+          Row(3, 300, "hr")))
 
-        sql(s"DROP TABLE $tableNameAsString")
-      }
+      val mergeSummary = getMergeSummary()
+      assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 2L))
+      assert(mergeSummary.numTargetRowsInserted === 0L)
+      assert(mergeSummary.numTargetRowsUpdated === 1L)
+      assert(mergeSummary.numTargetRowsDeleted === 0L)
+      assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+      assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
     }
   }
 
   test("Merge metrics with matched and not matched clause") {
-    Seq("true", "false").foreach { aqeEnabled: String =>
-      withTempView("source") {
-        createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
-          """{ "pk": 1, "salary": 100, "dep": "hr" }
-            |{ "pk": 2, "salary": 200, "dep": "software" }
-            |{ "pk": 3, "salary": 300, "dep": "hr" }
-            |""".stripMargin)
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |""".stripMargin)
 
-        val sourceDF = Seq(
-          (4, 100, "marketing"),
-          (5, 400, "executive"),
-          (6, 100, "hr")
-        ).toDF("pk", "salary", "dep")
-        sourceDF.createOrReplaceTempView("source")
+      val sourceDF = Seq(
+        (4, 100, "marketing"),
+        (5, 400, "executive"),
+        (6, 100, "hr")
+      ).toDF("pk", "salary", "dep")
+      sourceDF.createOrReplaceTempView("source")
 
-        val mergeExec = findMergeExec {
-          s"""MERGE INTO $tableNameAsString t
-             |USING source s
-             |ON t.pk = s.pk
-             |WHEN MATCHED THEN
-             | UPDATE SET salary = 9999
-             |WHEN NOT MATCHED AND salary > 200 THEN
-             | INSERT *
-             |""".stripMargin
-        }
+      val mergeExec = findMergeExec {
+        s"""MERGE INTO $tableNameAsString t
+           |USING source s
+           |ON t.pk = s.pk
+           |WHEN MATCHED THEN
+           | UPDATE SET salary = 9999
+           |WHEN NOT MATCHED AND salary > 200 THEN
+           | INSERT *
+           |""".stripMargin
+      }
 
-        assertMetric(mergeExec, "numTargetRowsCopied", 0)
-        assertMetric(mergeExec, "numTargetRowsInserted", 1)
-        assertMetric(mergeExec, "numTargetRowsUpdated", 0)
-        assertMetric(mergeExec, "numTargetRowsDeleted", 0)
-        assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0)
-        assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
-        assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
-        assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
+      assertMetric(mergeExec, "numTargetRowsCopied", 0)
+      assertMetric(mergeExec, "numTargetRowsInserted", 1)
+      assertMetric(mergeExec, "numTargetRowsUpdated", 0)
+      assertMetric(mergeExec, "numTargetRowsDeleted", 0)
+      assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0)
+      assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
+      assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
+      assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
 
-        checkAnswer(
-          sql(s"SELECT * FROM $tableNameAsString"),
-          Seq(
-            Row(1, 100, "hr"),
-            Row(2, 200, "software"),
-            Row(3, 300, "hr"),
-            Row(5, 400, "executive"))) // inserted
-
-        val mergeSummary = getMergeSummary()
-        // TODO SPARK-52578: Handle this case when optimizer removes Join due 
to no matching pks
-        assert(mergeSummary.numSourceRows === (if (deltaMerge) 3L else -1L))
-        assert(mergeSummary.numTargetRowsCopied === 0L)
-        assert(mergeSummary.numTargetRowsInserted === 1L)
-        assert(mergeSummary.numTargetRowsUpdated === 0L)
-        assert(mergeSummary.numTargetRowsDeleted === 0L)
-        assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
-        assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
-        assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
-        assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 100, "hr"),
+          Row(2, 200, "software"),
+          Row(3, 300, "hr"),
+          Row(5, 400, "executive"))) // inserted
 
-        sql(s"DROP TABLE $tableNameAsString")
-      }
+      val mergeSummary = getMergeSummary()
+      assert(mergeSummary.numTargetRowsCopied === 0L)
+      assert(mergeSummary.numTargetRowsInserted === 1L)
+      assert(mergeSummary.numTargetRowsUpdated === 0L)
+      assert(mergeSummary.numTargetRowsDeleted === 0L)
+      assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+      assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
     }
   }
 
   test("Merge metrics with matched and not matched by source clauses: update") 
{
-    Seq("true", "false").foreach { aqeEnabled: String =>
-      withTempView("source") {
-        withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
-          createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
-            """{ "pk": 1, "salary": 100, "dep": "hr" }
-              |{ "pk": 2, "salary": 200, "dep": "software" }
-              |{ "pk": 3, "salary": 300, "dep": "hr" }
-              |{ "pk": 4, "salary": 400, "dep": "marketing" }
-              |{ "pk": 5, "salary": 500, "dep": "executive" }
-              |""".stripMargin)
-
-          val sourceDF = Seq(1, 2, 10).toDF("pk")
-          sourceDF.createOrReplaceTempView("source")
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |{ "pk": 4, "salary": 400, "dep": "marketing" }
+          |{ "pk": 5, "salary": 500, "dep": "executive" }
+          |""".stripMargin)
 
-          val mergeExec = findMergeExec {
-            s"""MERGE INTO $tableNameAsString t
-               |USING source s
-               |ON t.pk = s.pk
-               |WHEN MATCHED AND salary < 200 THEN
-               | UPDATE SET salary = 1000
-               |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
-               | UPDATE SET salary = -1
-               |""".stripMargin
-          }
+      val sourceDF = Seq(1, 2, 10).toDF("pk")
+      sourceDF.createOrReplaceTempView("source")
 
-          assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 
else 3)
-          assertMetric(mergeExec, "numTargetRowsInserted", 0)
-          assertMetric(mergeExec, "numTargetRowsUpdated", 2)
-          assertMetric(mergeExec, "numTargetRowsDeleted", 0)
-          assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
-          assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
-          assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1)
-          assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
+      val mergeExec = findMergeExec {
+        s"""MERGE INTO $tableNameAsString t
+           |USING source s
+           |ON t.pk = s.pk
+           |WHEN MATCHED AND salary < 200 THEN
+           | UPDATE SET salary = 1000
+           |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
+           | UPDATE SET salary = -1
+           |""".stripMargin
+      }
 
-          checkAnswer(
-            sql(s"SELECT * FROM $tableNameAsString"),
-            Seq(
-              Row(1, 1000, "hr"), // updated
-              Row(2, 200, "software"),
-              Row(3, 300, "hr"),
-              Row(4, 400, "marketing"),
-              Row(5, -1, "executive"))) // updated
-        }
+      assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3)
+      assertMetric(mergeExec, "numTargetRowsInserted", 0)
+      assertMetric(mergeExec, "numTargetRowsUpdated", 2)
+      assertMetric(mergeExec, "numTargetRowsDeleted", 0)
+      assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
+      assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
+      assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1)
+      assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
 
-        val mergeSummary = getMergeSummary()
-        assert(mergeSummary.numSourceRows === 3L)
-        assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 
3L))
-        assert(mergeSummary.numTargetRowsInserted === 0L)
-        assert(mergeSummary.numTargetRowsUpdated === 2L)
-        assert(mergeSummary.numTargetRowsDeleted === 0L)
-        assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
-        assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
-        assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
-        assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 1000, "hr"), // updated
+          Row(2, 200, "software"),
+          Row(3, 300, "hr"),
+          Row(4, 400, "marketing"),
+          Row(5, -1, "executive"))) // updated
 
-        sql(s"DROP TABLE $tableNameAsString")
-      }
+      val mergeSummary = getMergeSummary()
+      assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+      assert(mergeSummary.numTargetRowsInserted === 0L)
+      assert(mergeSummary.numTargetRowsUpdated === 2L)
+      assert(mergeSummary.numTargetRowsDeleted === 0L)
+      assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+      assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
     }
   }
 
@@ -2000,7 +1980,6 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
       )
 
       val mergeSummary = getMergeSummary()
-      assert(mergeSummary.numSourceRows === 3L)
       assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
       assert(mergeSummary.numTargetRowsInserted === 0L)
       assert(mergeSummary.numTargetRowsUpdated === 0L)
@@ -2013,130 +1992,116 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
   }
 
   test("Merge metrics with matched, not matched, and not matched by source 
clauses: update") {
-    Seq("true", "false").foreach { aqeEnabled: String =>
-      withTempView("source") {
-        withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
-          createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
-            """{ "pk": 1, "salary": 100, "dep": "hr" }
-              |{ "pk": 2, "salary": 200, "dep": "software" }
-              |{ "pk": 3, "salary": 300, "dep": "hr" }
-              |{ "pk": 4, "salary": 400, "dep": "marketing" }
-              |{ "pk": 5, "salary": 500, "dep": "executive" }
-              |""".stripMargin)
-
-          val sourceDF = Seq(1, 2, 6, 10).toDF("pk")
-          sourceDF.createOrReplaceTempView("source")
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |{ "pk": 4, "salary": 400, "dep": "marketing" }
+          |{ "pk": 5, "salary": 500, "dep": "executive" }
+          |""".stripMargin)
 
-          val mergeExec = findMergeExec {
-            s"""MERGE INTO $tableNameAsString t
-               |USING source s
-               |ON t.pk = s.pk
-               |WHEN MATCHED AND salary < 200 THEN
-               | UPDATE SET salary = 1000
-               |WHEN NOT MATCHED AND s.pk < 10 THEN
-               | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy")
-               |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
-               | UPDATE SET salary = -1
-               |""".stripMargin
-          }
+      val sourceDF = Seq(1, 2, 6, 10).toDF("pk")
+      sourceDF.createOrReplaceTempView("source")
 
-          assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 
else 3)
-          assertMetric(mergeExec, "numTargetRowsInserted", 1)
-          assertMetric(mergeExec, "numTargetRowsUpdated", 2)
-          assertMetric(mergeExec, "numTargetRowsDeleted", 0)
-          assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
-          assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
-          assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1)
-          assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
+      val mergeExec = findMergeExec {
+        s"""MERGE INTO $tableNameAsString t
+           |USING source s
+           |ON t.pk = s.pk
+           |WHEN MATCHED AND salary < 200 THEN
+           | UPDATE SET salary = 1000
+           |WHEN NOT MATCHED AND s.pk < 10 THEN
+           | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy")
+           |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
+           | UPDATE SET salary = -1
+           |""".stripMargin
+      }
 
-          checkAnswer(
-            sql(s"SELECT * FROM $tableNameAsString"),
-            Seq(
-              Row(1, 1000, "hr"), // updated
-              Row(2, 200, "software"),
-              Row(3, 300, "hr"),
-              Row(4, 400, "marketing"),
-              Row(5, -1, "executive"), // updated
-              Row(6, -1, "dummy"))) // inserted
-        }
+      assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3)
+      assertMetric(mergeExec, "numTargetRowsInserted", 1)
+      assertMetric(mergeExec, "numTargetRowsUpdated", 2)
+      assertMetric(mergeExec, "numTargetRowsDeleted", 0)
+      assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1)
+      assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0)
+      assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1)
+      assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0)
 
-        val mergeSummary = getMergeSummary()
-        assert(mergeSummary.numSourceRows === 4L)
-        assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 
3L))
-        assert(mergeSummary.numTargetRowsInserted === 1L)
-        assert(mergeSummary.numTargetRowsUpdated === 2L)
-        assert(mergeSummary.numTargetRowsDeleted === 0L)
-        assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
-        assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
-        assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
-        assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 1000, "hr"), // updated
+          Row(2, 200, "software"),
+          Row(3, 300, "hr"),
+          Row(4, 400, "marketing"),
+          Row(5, -1, "executive"), // updated
+          Row(6, -1, "dummy"))) // inserted
 
-        sql(s"DROP TABLE $tableNameAsString")
-      }
+      val mergeSummary = getMergeSummary()
+      assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+      assert(mergeSummary.numTargetRowsInserted === 1L)
+      assert(mergeSummary.numTargetRowsUpdated === 2L)
+      assert(mergeSummary.numTargetRowsDeleted === 0L)
+      assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+      assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
     }
   }
 
   test("Merge metrics with matched, not matched, and not matched by source 
clauses: delete") {
-    Seq("true", "false").foreach { aqeEnabled: String =>
-      withTempView("source") {
-        withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
-          createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
-            """{ "pk": 1, "salary": 100, "dep": "hr" }
-              |{ "pk": 2, "salary": 200, "dep": "software" }
-              |{ "pk": 3, "salary": 300, "dep": "hr" }
-              |{ "pk": 4, "salary": 400, "dep": "marketing" }
-              |{ "pk": 5, "salary": 500, "dep": "executive" }
-              |""".stripMargin)
-
-          val sourceDF = Seq(1, 2, 6, 10).toDF("pk")
-          sourceDF.createOrReplaceTempView("source")
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |{ "pk": 4, "salary": 400, "dep": "marketing" }
+          |{ "pk": 5, "salary": 500, "dep": "executive" }
+          |""".stripMargin)
 
-          val mergeExec = findMergeExec {
-            s"""MERGE INTO $tableNameAsString t
-               |USING source s
-               |ON t.pk = s.pk
-               |WHEN MATCHED AND salary < 200 THEN
-               | DELETE
-               |WHEN NOT MATCHED AND s.pk < 10 THEN
-               | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy")
-               |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
-               | DELETE
-               |""".stripMargin
-          }
+      val sourceDF = Seq(1, 2, 6, 10).toDF("pk")
+      sourceDF.createOrReplaceTempView("source")
 
-          assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 
else 3)
-          assertMetric(mergeExec, "numTargetRowsInserted", 1)
-          assertMetric(mergeExec, "numTargetRowsUpdated", 0)
-          assertMetric(mergeExec, "numTargetRowsDeleted", 2)
-          assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0)
-          assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 1)
-          assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
-          assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1)
+      val mergeExec = findMergeExec {
+        s"""MERGE INTO $tableNameAsString t
+           |USING source s
+           |ON t.pk = s.pk
+           |WHEN MATCHED AND salary < 200 THEN
+           | DELETE
+           |WHEN NOT MATCHED AND s.pk < 10 THEN
+           | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy")
+           |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
+           | DELETE
+           |""".stripMargin
+      }
 
-          checkAnswer(
-            sql(s"SELECT * FROM $tableNameAsString"),
-            Seq(
-              // Row(1, 100, "hr") deleted
-              Row(2, 200, "software"),
-              Row(3, 300, "hr"),
-              Row(4, 400, "marketing"),
-              // Row(5, 500, "executive") deleted
-              Row(6, -1, "dummy"))) // inserted
-        }
+      assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3)
+      assertMetric(mergeExec, "numTargetRowsInserted", 1)
+      assertMetric(mergeExec, "numTargetRowsUpdated", 0)
+      assertMetric(mergeExec, "numTargetRowsDeleted", 2)
+      assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0)
+      assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 1)
+      assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
+      assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1)
 
-        val mergeSummary = getMergeSummary()
-        assert(mergeSummary.numSourceRows === 4L)
-        assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 
3L))
-        assert(mergeSummary.numTargetRowsInserted === 1L)
-        assert(mergeSummary.numTargetRowsUpdated === 0L)
-        assert(mergeSummary.numTargetRowsDeleted === 2L)
-        assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
-        assert(mergeSummary.numTargetRowsMatchedDeleted === 1L)
-        assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
-        assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L)
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          // Row(1, 100, "hr") deleted
+          Row(2, 200, "software"),
+          Row(3, 300, "hr"),
+          Row(4, 400, "marketing"),
+          // Row(5, 500, "executive") deleted
+          Row(6, -1, "dummy"))) // inserted
 
-        sql(s"DROP TABLE $tableNameAsString")
-      }
+      val mergeSummary = getMergeSummary()
+      assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+      assert(mergeSummary.numTargetRowsInserted === 1L)
+      assert(mergeSummary.numTargetRowsUpdated === 0L)
+      assert(mergeSummary.numTargetRowsDeleted === 2L)
+      assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+      assert(mergeSummary.numTargetRowsMatchedDeleted === 1L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L)
     }
   }
 
@@ -2169,7 +2134,6 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           )
 
           val mergeMetrics = getMergeSummary()
-          assert(mergeMetrics.numSourceRows === 4L)
           assert(mergeMetrics.numTargetRowsCopied === (if (deltaMerge) 0L else 
3L))
           assert(mergeMetrics.numTargetRowsInserted === 1L)
           assert(mergeMetrics.numTargetRowsUpdated === 0L)
@@ -2185,46 +2149,6 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
     }
   }
 
-  test("Merge metrics with numSourceRows for empty source") {
-    Seq("true", "false").foreach { aqeEnabled: String =>
-      withTempView("source") {
-        withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
-          createAndInitTable(
-            "pk INT NOT NULL, salary INT, dep STRING",
-            """{ "pk": 1, "salary": 100, "dep": "hr" }
-              |{ "pk": 2, "salary": 200, "dep": "software" }
-              |{ "pk": 3, "salary": 300, "dep": "hr" }
-              |""".stripMargin)
-
-          // source is empty
-          Seq.empty[Int].toDF("pk").createOrReplaceTempView("source")
-
-          sql(s"""MERGE INTO $tableNameAsString t
-               |USING source s
-               |ON t.pk = s.pk
-               |WHEN MATCHED THEN
-               | UPDATE SET salary = 1000
-               |WHEN NOT MATCHED BY SOURCE THEN
-               | DELETE
-               |""".stripMargin)
-
-          val mergeSummary = getMergeSummary()
-          assert(mergeSummary.numSourceRows === -1L) // if no numOutputRows, 
should be -1
-          assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 
0L))
-          assert(mergeSummary.numTargetRowsInserted === 0L)
-          assert(mergeSummary.numTargetRowsUpdated === 0L)
-          assert(mergeSummary.numTargetRowsDeleted === 3L)
-          assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
-          assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
-          assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
-          assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 3L)
-
-          sql(s"DROP TABLE $tableNameAsString")
-        }
-      }
-    }
-  }
-
   test("Merge schema evolution new column with set explicit column") {
     Seq((true, true), (false, true), (true, false)).foreach {
       case (withSchemaEvolution, schemaEvolutionEnabled) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to