This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 70a4a1e7ac05 [SPARK-54988][SQL] Simplify the implementation of
ObservationManager.tryComplete
70a4a1e7ac05 is described below
commit 70a4a1e7ac0555e6939034d8a2f996c14a58c656
Author: Yihong He <[email protected]>
AuthorDate: Mon Jan 12 10:28:25 2026 +0800
[SPARK-54988][SQL] Simplify the implementation of
ObservationManager.tryComplete
### What changes were proposed in this pull request?
This PR optimizes the tryComplete method in ObservationManager by:
1. Making allMetrics a lazy val to defer metric collection until needed
2. Simplifying the conditional logic using getOrElse(c.name, Row.empty)
### Why are the changes needed?
- Performance: The current implementation eagerly collects observed metrics
even when the logical plan doesn't contain CollectMetrics nodes. Using lazy val
allows tryComplete to complete faster when metric collection is unnecessary.
- Code simplification: The original code had separate branches for handling
missing vs. present metrics. This consolidates them into a single, cleaner code
path.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests for ObservationManager and observation functionality
cover this change. The refactoring maintains identical behavior.
### Was this patch authored or co-authored using generative AI tooling?
Yes
Closes #53752 from heyihong/SPARK-54988.
Authored-by: Yihong He <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/classic/ObservationManager.scala | 27 ++++++++--------------
1 file changed, 9 insertions(+), 18 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
index b5ec18d5ff12..fbe7034cc247 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
@@ -54,27 +54,18 @@ private[sql] class ObservationManager(session:
SparkSession) {
})
private def tryComplete(qe: QueryExecution): Unit = {
- val allMetrics = qe.observedMetrics
+ // Use lazy val to defer collecting the observed metrics until it is
needed so that tryComplete
+ // can finish faster (e.g., when the logical plan doesn't contain
CollectMetrics).
+ lazy val allMetrics = qe.observedMetrics
qe.logical.foreachWithSubqueriesAndPruning(
_.containsPattern(TreePattern.COLLECT_METRICS)) {
case c: CollectMetrics =>
- val keyExists = observations.containsKey((c.name, c.dataframeId))
- val metrics = allMetrics.get(c.name)
- if (keyExists && metrics.isEmpty) {
- // If the key exists but no metrics were collected, it means for
some reason the metrics
- // could not be collected. This can happen e.g., if the
CollectMetricsExec was optimized
- // away.
- val observation = observations.remove((c.name, c.dataframeId))
- if (observation != null) {
- observation.setMetricsAndNotify(Row.empty)
- }
- } else {
- metrics.foreach { metrics =>
- val observation = observations.remove((c.name, c.dataframeId))
- if (observation != null) {
- observation.setMetricsAndNotify(metrics)
- }
- }
+ val observation = observations.remove((c.name, c.dataframeId))
+ if (observation != null) {
+ // If the key exists but no metrics were collected, it means for
some reason the
+ // metrics could not be collected. This can happen e.g., if the
CollectMetricsExec
+ // was optimized away.
+ observation.setMetricsAndNotify(allMetrics.getOrElse(c.name,
Row.empty))
}
case _ =>
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]