This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new a2178afd5bf2 [SPARK-54620][SQL] Add safety check in ObservationManager
to avoid Observation blocking
a2178afd5bf2 is described below
commit a2178afd5bf2244baedb8079d2e1c41138616ac6
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Dec 6 07:27:11 2025 -0800
[SPARK-54620][SQL] Add safety check in ObservationManager to avoid
Observation blocking
### What changes were proposed in this pull request?
This patch adds a safety check into `ObservationManager.tryComplete` to
avoid Observation blocking.
### Why are the changes needed?
We got reports that for some corner cases `Observation.get` will be blocked
forever. It is not deadlock case after investigation. If the
`CollectMetricsExec` operator was optimized away, e.g., the executed plan was
optimized to have some empty relation propagation on top of plan tree of
`CollectMetricsExec`, Spark won't fulfill the promise in `Observation` and
`get` calls will be blocked forever.
### Does this PR introduce _any_ user-facing change?
Yes. Previously for some corner cases `Observation.get` call will be
blocked forever. After this change, `get` will return an empty map.
### How was this patch tested?
Unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53358 from viirya/fix_observation_blocking.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit e6c892397596a81b038eb14f1bb055dc4f3503f9)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../main/scala/org/apache/spark/sql/Observation.scala | 12 +++++++++++-
.../apache/spark/sql/classic/ObservationManager.scala | 18 +++++++++++++++---
.../test/scala/org/apache/spark/sql/DatasetSuite.scala | 18 ++++++++++++++++++
3 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala
b/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala
index 59c27d1e5630..2a25b4bd4430 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala
@@ -70,6 +70,9 @@ class Observation(val name: String) {
* first action. Only the result of the first action is available.
Subsequent actions do not
* modify the result.
*
+ * Note that if no metrics were recorded, an empty map is probably returned.
It possibly happens
+ * when the operators used for observation are optimized away.
+ *
* @return
* the observed metrics as a `Map[String, Any]`
* @throws InterruptedException
@@ -78,7 +81,11 @@ class Observation(val name: String) {
@throws[InterruptedException]
def get: Map[String, Any] = {
val row = getRow
- row.getValuesMap(row.schema.map(_.name))
+ if (row == null || row.schema == null) {
+ Map.empty
+ } else {
+ row.getValuesMap(row.schema.map(_.name))
+ }
}
/**
@@ -86,6 +93,9 @@ class Observation(val name: String) {
* first action. Only the result of the first action is available.
Subsequent actions do not
* modify the result.
*
+ * Note that if no metrics were recorded, an empty map is probably returned.
It possibly happens
+ * when the operators used for observation are optimized away.
+ *
* @return
* the observed metrics as a `java.util.Map[String, Object]`
* @throws InterruptedException
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
index 3edd789b685f..308651b449fd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.classic
import java.util.concurrent.ConcurrentHashMap
-import org.apache.spark.sql.Observation
+import org.apache.spark.sql.{Observation, Row}
import org.apache.spark.sql.catalyst.plans.logical.CollectMetrics
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
@@ -56,10 +56,22 @@ private[sql] class ObservationManager(session:
SparkSession) {
val allMetrics = qe.observedMetrics
qe.logical.foreach {
case c: CollectMetrics =>
- allMetrics.get(c.name).foreach { metrics =>
+ val keyExists = observations.containsKey((c.name, c.dataframeId))
+ val metrics = allMetrics.get(c.name)
+ if (keyExists && metrics.isEmpty) {
+ // If the key exists but no metrics were collected, it means for
some reason the metrics
+ // could not be collected. This can happen e.g., if the
CollectMetricsExec was optimized
+ // away.
val observation = observations.remove((c.name, c.dataframeId))
if (observation != null) {
- observation.setMetricsAndNotify(metrics)
+ observation.setMetricsAndNotify(Row.empty)
+ }
+ } else {
+ metrics.foreach { metrics =>
+ val observation = observations.remove((c.name, c.dataframeId))
+ if (observation != null) {
+ observation.setMetricsAndNotify(metrics)
+ }
}
}
case _ =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 941fd2205424..6df8d66ee7f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -2878,6 +2878,24 @@ class DatasetSuite extends QueryTest
checkDataset(Seq(seqMutableSet).toDS(), seqMutableSet)
checkDataset(Seq(mapMutableSet).toDS(), mapMutableSet)
}
+
+ test("SPARK-54620: Observation should not blocking forever") {
+ val observation = Observation("row_count")
+
+ var df = Seq.empty[(Int, Int)].toDF("v1", "v2")
+ df = df.observe(observation,
+ functions.count(functions.lit(1)).alias("record_cnt"))
+ df = df.repartition($"v1")
+ .select($"v1" + 1 as "v1", $"v2" + 1 as "v2")
+ .join(
+ Seq((1, 2), (3, 4)).toDF("v1", "v2").repartition($"v2"),
+ Seq("v1"),
+ "inner")
+ df.collect()
+
+ val metrics = observation.get
+ assert(metrics.isEmpty)
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]