This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new b848c2753184 [SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec
interpreted iterator
b848c2753184 is described below
commit b848c2753184560978a5178f07b647abe90465f2
Author: Szehon Ho <[email protected]>
AuthorDate: Thu May 21 10:29:01 2026 +0800
[SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec interpreted iterator
### What changes were proposed in this pull request?
Cache `SQLMetric` references in `MergeRowIterator` and update them directly
in the hot loop. Previously, each row called `longMetric("…")`, which performs
a `metrics(name)` map lookup on every increment (up to 2–3 lookups per
delete/update row). Metrics are `lazy val` fields so a partition only resolves
metrics it actually increments.
This matches the pattern used elsewhere (e.g. `FilterEvaluatorFactory`
passes a `SQLMetric` into the partition evaluator). The whole-stage codegen
path is unchanged; it already resolves metrics once via `metricTerm`.
`codegenBenchmark` in `SqlBasedBenchmark` now accepts optional
`warmupTime`, `minTime`, and per-case `numIters`. `MergeRowsExecBenchmark` uses
7s warmup and a 7s timed window for all whole-stage on/off cases.
### Why are the changes needed?
`MergeRowsExec` updates multiple MERGE metrics per output row on the
interpreted path (`doExecute` / `MergeRowIterator`). For delete-heavy workloads
with little projection work, repeated map lookups were a noticeable fraction of
per-row cost. Production MERGE typically runs with whole-stage codegen enabled,
but the interpreted path is still used when codegen is disabled or unsupported.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing `MergeRowsExec` / MERGE tests (CI).
**Local benchmark** (`MergeRowsExecBenchmark`, 20M rows, Apple M4 Max, JDK
21). **Both sides used the same benchmark harness** (7s `warmupTime`, 7s
`minTime`, `wholestageOffNumIters = 0` / `wholestageOnNumIters = 0` via
extended `codegenBenchmark`). Compared `MergeRowsExec` **without** the cache
(`1ad4fa420cd`, parent of the cache commit) vs **with** the cache (this PR),
checking out only that file between runs.
```bash
SPARK_LOCAL_HOSTNAME=127.0.0.1 build/sbt -batch \
-Dspark.driver.host=127.0.0.1 -Dspark.driver.bindAddress=127.0.0.1 \
"sql/Test/runMain
org.apache.spark.sql.execution.benchmark.MergeRowsExecBenchmark"
```
**Whole-stage off (interpreted path)** — best time (ms):
| Case | Without cache | With cache (this PR) | Change |
|------|--------------:|---------------------:|-------:|
| matched update only | 5475 | 5238 | −4% |
| not matched insert only | 7612 | 7337 | −4% |
| matched update + not matched insert | 5795 | 4315 | −26% |
| matched delete | 2914 | 546 | −81% |
| conditional clauses | 3872 | 1251 | −68% |
| matched + not matched + not matched by source | 3813 | 1119 | −71% |
| split update (delete + insert) | 1844 | 1400 | −24% |
Matched-update-only and insert-only are roughly unchanged on the
interpreted path in this run; the largest wins are on delete-heavy and
multi-metric cases.
**Whole-stage on (codegen)** — unchanged within noise (e.g. matched delete
best ~13 ms; matched update only ~333–338 ms).
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #55967 from szehon-ho/spark-cache-merge-rows-metrics.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit f08523bd969e5f1b20db09fdcb94a53e4408c0d8)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/datasources/v2/MergeRowsExec.scala | 46 ++++++++++++++--------
.../benchmark/MergeRowsExecBenchmark.scala | 28 +++++++++----
.../execution/benchmark/SqlBasedBenchmark.scala | 37 ++++++++++++++---
3 files changed, 82 insertions(+), 29 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
index 67212de165e9..887f6d832c82 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
@@ -518,6 +518,19 @@ case class MergeRowsExec(
private val notMatchedBySourceInstructions: Seq[InstructionExec])
extends Iterator[InternalRow] {
+ // Resolve each metric at most once per partition, on first use;
longMetric(name) is a map
+ // lookup. See SPARK-56933.
+ private lazy val numTargetRowsCopied = longMetric("numTargetRowsCopied")
+ private lazy val numTargetRowsInserted =
longMetric("numTargetRowsInserted")
+ private lazy val numTargetRowsDeleted = longMetric("numTargetRowsDeleted")
+ private lazy val numTargetRowsUpdated = longMetric("numTargetRowsUpdated")
+ private lazy val numTargetRowsMatchedUpdated =
longMetric("numTargetRowsMatchedUpdated")
+ private lazy val numTargetRowsMatchedDeleted =
longMetric("numTargetRowsMatchedDeleted")
+ private lazy val numTargetRowsNotMatchedBySourceUpdated =
+ longMetric("numTargetRowsNotMatchedBySourceUpdated")
+ private lazy val numTargetRowsNotMatchedBySourceDeleted =
+ longMetric("numTargetRowsNotMatchedBySourceDeleted")
+
var cachedExtraRow: InternalRow = _
override def hasNext: Boolean = cachedExtraRow != null ||
rowIterator.hasNext
@@ -579,28 +592,27 @@ case class MergeRowsExec(
null
}
- }
- // For group based merge, copy is inserted if row matches no other case
- private def incrementCopyMetric(): Unit = longMetric("numTargetRowsCopied")
+= 1
+ private def incrementCopyMetric(): Unit = numTargetRowsCopied += 1
- private def incrementInsertMetric(): Unit =
longMetric("numTargetRowsInserted") += 1
+ private def incrementInsertMetric(): Unit = numTargetRowsInserted += 1
- private def incrementDeleteMetric(sourcePresent: Boolean): Unit = {
- longMetric("numTargetRowsDeleted") += 1
- if (sourcePresent) {
- longMetric("numTargetRowsMatchedDeleted") += 1
- } else {
- longMetric("numTargetRowsNotMatchedBySourceDeleted") += 1
+ private def incrementDeleteMetric(sourcePresent: Boolean): Unit = {
+ numTargetRowsDeleted += 1
+ if (sourcePresent) {
+ numTargetRowsMatchedDeleted += 1
+ } else {
+ numTargetRowsNotMatchedBySourceDeleted += 1
+ }
}
- }
- private def incrementUpdateMetric(sourcePresent: Boolean): Unit = {
- longMetric("numTargetRowsUpdated") += 1
- if (sourcePresent) {
- longMetric("numTargetRowsMatchedUpdated") += 1
- } else {
- longMetric("numTargetRowsNotMatchedBySourceUpdated") += 1
+ private def incrementUpdateMetric(sourcePresent: Boolean): Unit = {
+ numTargetRowsUpdated += 1
+ if (sourcePresent) {
+ numTargetRowsMatchedUpdated += 1
+ } else {
+ numTargetRowsNotMatchedBySourceUpdated += 1
+ }
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala
index 8ddbca46b739..0fcac326d923 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.benchmark
+import scala.concurrent.duration._
+
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
GreaterThan, IsNotNull, Literal}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
@@ -43,6 +45,18 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with
ClassicConversions
private val N = 20 << 20
+ /** Longer warm-up and timed window for stable interpreted (whole-stage off)
results. */
+ private def mergeRowsBenchmark(name: String, cardinality: Long)(f: => Unit):
Unit = {
+ codegenBenchmark(
+ name,
+ cardinality,
+ warmupTime = 7.seconds,
+ minTime = 7.seconds,
+ minNumIters = 3,
+ wholestageOffNumIters = 0,
+ wholestageOnNumIters = 0)(f)
+ }
+
/**
* Creates a DataFrame simulating the join output from a MERGE operation.
*
@@ -110,7 +124,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark
with ClassicConversions
a(0), a(5), a(6), a(3)
)))
- codegenBenchmark("merge - matched update only", N) {
+ mergeRowsBenchmark("merge - matched update only", N) {
val df = buildMergeRowsDF(inputDF, matchedInstr)
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
df.noop()
@@ -126,7 +140,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark
with ClassicConversions
a(4), a(5), a(6), a(7)
)))
- codegenBenchmark("merge - not matched insert only", N) {
+ mergeRowsBenchmark("merge - not matched insert only", N) {
val df = buildMergeRowsDF(inputDF, Seq.empty, notMatchedInstr)
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
df.noop()
@@ -144,7 +158,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark
with ClassicConversions
a(4), a(5), a(6), a(7)
)))
- codegenBenchmark("merge - matched update + not matched insert", N) {
+ mergeRowsBenchmark("merge - matched update + not matched insert", N) {
val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr)
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
df.noop()
@@ -156,7 +170,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark
with ClassicConversions
val matchedInstr = Seq(Discard(TrueLiteral))
- codegenBenchmark("merge - matched delete", N) {
+ mergeRowsBenchmark("merge - matched delete", N) {
val df = buildMergeRowsDF(inputDF, matchedInstr)
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
df.noop()
@@ -177,7 +191,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark
with ClassicConversions
Keep(Insert, GreaterThan(a(5), Literal(500)), Seq(a(4), a(5), a(6),
a(7)))
)
- codegenBenchmark("merge - conditional clauses", N) {
+ mergeRowsBenchmark("merge - conditional clauses", N) {
val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr)
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
df.noop()
@@ -199,7 +213,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark
with ClassicConversions
)))
val notMatchedBySourceInstr = Seq(Discard(TrueLiteral))
- codegenBenchmark("merge - matched + not matched + not matched by source",
N) {
+ mergeRowsBenchmark("merge - matched + not matched + not matched by
source", N) {
val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr,
notMatchedBySourceInstr)
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
df.noop()
@@ -216,7 +230,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark
with ClassicConversions
Seq(a(0), a(5), a(6), a(3))
))
- codegenBenchmark("merge - split update (delete + insert)", N) {
+ mergeRowsBenchmark("merge - split update (delete + insert)", N) {
val df = buildMergeRowsDF(inputDF, matchedInstr)
assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
df.noop()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
index 78d6b0158035..6c60721599bb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.benchmark
+import scala.concurrent.duration._
+
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.internal.config.MAX_RESULT_SIZE
import org.apache.spark.internal.config.UI.UI_ENABLED
@@ -46,17 +48,42 @@ trait SqlBasedBenchmark extends BenchmarkBase with
SQLHelper {
.getOrCreate()
}
- /** Runs function `f` with whole stage codegen on and off. */
- final def codegenBenchmark(name: String, cardinality: Long)(f: => Unit):
Unit = {
- val benchmark = new Benchmark(name, cardinality, output = output)
+ /**
+ * Runs function `f` with whole stage codegen on and off.
+ *
+ * @param minNumIters minimum timed iterations per case when the
corresponding
+ * `wholestageOffNumIters` or `wholestageOnNumIters` is zero.
+ * @param warmupTime JIT warm-up duration per case before timed iterations.
+ * @param minTime minimum total timed duration per case when the
corresponding
+ * `wholestageOffNumIters` or `wholestageOnNumIters` is zero.
+ * @param wholestageOffNumIters if non-zero, run exactly this many timed
iterations
+ * for the wholestage-off case; otherwise use `minNumIters` and
`minTime`.
+ * @param wholestageOnNumIters if non-zero, run exactly this many timed
iterations
+ * for the wholestage-on case; otherwise use `minNumIters` and
`minTime`.
+ */
+ final def codegenBenchmark(
+ name: String,
+ cardinality: Long,
+ minNumIters: Int = 2,
+ warmupTime: FiniteDuration = 2.seconds,
+ minTime: FiniteDuration = 2.seconds,
+ wholestageOffNumIters: Int = 2,
+ wholestageOnNumIters: Int = 5)(f: => Unit): Unit = {
+ val benchmark = new Benchmark(
+ name,
+ cardinality,
+ minNumIters = minNumIters,
+ warmupTime = warmupTime,
+ minTime = minTime,
+ output = output)
- benchmark.addCase(s"$name wholestage off", numIters = 2) { _ =>
+ benchmark.addCase(s"$name wholestage off", numIters =
wholestageOffNumIters) { _ =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
f
}
}
- benchmark.addCase(s"$name wholestage on", numIters = 5) { _ =>
+ benchmark.addCase(s"$name wholestage on", numIters = wholestageOnNumIters)
{ _ =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
f
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]