This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new a2178afd5bf2 [SPARK-54620][SQL] Add safety check in ObservationManager 
to avoid Observation blocking
a2178afd5bf2 is described below

commit a2178afd5bf2244baedb8079d2e1c41138616ac6
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Dec 6 07:27:11 2025 -0800

    [SPARK-54620][SQL] Add safety check in ObservationManager to avoid 
Observation blocking
    
    ### What changes were proposed in this pull request?
    
    This patch adds a safety check into `ObservationManager.tryComplete` to 
avoid Observation blocking.
    
    ### Why are the changes needed?
    
    We got reports that for some corner cases `Observation.get` will be blocked 
forever. It is not deadlock case after investigation. If the 
`CollectMetricsExec` operator was optimized away, e.g., the executed plan was 
optimized to have some empty relation propagation on top of plan tree of 
`CollectMetricsExec`, Spark won't fulfill the promise in `Observation` and 
`get` calls will be blocked forever.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Previously for some corner cases `Observation.get` call will be 
blocked forever. After this change, `get` will return an empty map.
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53358 from viirya/fix_observation_blocking.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit e6c892397596a81b038eb14f1bb055dc4f3503f9)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../main/scala/org/apache/spark/sql/Observation.scala  | 12 +++++++++++-
 .../apache/spark/sql/classic/ObservationManager.scala  | 18 +++++++++++++++---
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 18 ++++++++++++++++++
 3 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala
index 59c27d1e5630..2a25b4bd4430 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala
@@ -70,6 +70,9 @@ class Observation(val name: String) {
    * first action. Only the result of the first action is available. 
Subsequent actions do not
    * modify the result.
    *
+   * Note that if no metrics were recorded, an empty map is probably returned. 
It possibly happens
+   * when the operators used for observation are optimized away.
+   *
    * @return
    *   the observed metrics as a `Map[String, Any]`
    * @throws InterruptedException
@@ -78,7 +81,11 @@ class Observation(val name: String) {
   @throws[InterruptedException]
   def get: Map[String, Any] = {
     val row = getRow
-    row.getValuesMap(row.schema.map(_.name))
+    if (row == null || row.schema == null) {
+      Map.empty
+    } else {
+      row.getValuesMap(row.schema.map(_.name))
+    }
   }
 
   /**
@@ -86,6 +93,9 @@ class Observation(val name: String) {
    * first action. Only the result of the first action is available. 
Subsequent actions do not
    * modify the result.
    *
+   * Note that if no metrics were recorded, an empty map is probably returned. 
It possibly happens
+   * when the operators used for observation are optimized away.
+   *
    * @return
    *   the observed metrics as a `java.util.Map[String, Object]`
    * @throws InterruptedException
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
index 3edd789b685f..308651b449fd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.classic
 
 import java.util.concurrent.ConcurrentHashMap
 
-import org.apache.spark.sql.Observation
+import org.apache.spark.sql.{Observation, Row}
 import org.apache.spark.sql.catalyst.plans.logical.CollectMetrics
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.util.QueryExecutionListener
@@ -56,10 +56,22 @@ private[sql] class ObservationManager(session: 
SparkSession) {
     val allMetrics = qe.observedMetrics
     qe.logical.foreach {
       case c: CollectMetrics =>
-        allMetrics.get(c.name).foreach { metrics =>
+        val keyExists = observations.containsKey((c.name, c.dataframeId))
+        val metrics = allMetrics.get(c.name)
+        if (keyExists && metrics.isEmpty) {
+          // If the key exists but no metrics were collected, it means for 
some reason the metrics
+          // could not be collected. This can happen e.g., if the 
CollectMetricsExec was optimized
+          // away.
           val observation = observations.remove((c.name, c.dataframeId))
           if (observation != null) {
-            observation.setMetricsAndNotify(metrics)
+            observation.setMetricsAndNotify(Row.empty)
+          }
+        } else {
+          metrics.foreach { metrics =>
+            val observation = observations.remove((c.name, c.dataframeId))
+            if (observation != null) {
+              observation.setMetricsAndNotify(metrics)
+            }
           }
         }
       case _ =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 941fd2205424..6df8d66ee7f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -2878,6 +2878,24 @@ class DatasetSuite extends QueryTest
     checkDataset(Seq(seqMutableSet).toDS(), seqMutableSet)
     checkDataset(Seq(mapMutableSet).toDS(), mapMutableSet)
   }
+
+  test("SPARK-54620: Observation should not blocking forever") {
+    val observation = Observation("row_count")
+
+    var df = Seq.empty[(Int, Int)].toDF("v1", "v2")
+    df = df.observe(observation,
+      functions.count(functions.lit(1)).alias("record_cnt"))
+    df = df.repartition($"v1")
+      .select($"v1" + 1 as "v1", $"v2" + 1 as "v2")
+      .join(
+        Seq((1, 2), (3, 4)).toDF("v1", "v2").repartition($"v2"),
+        Seq("v1"),
+        "inner")
+    df.collect()
+
+    val metrics = observation.get
+    assert(metrics.isEmpty)
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to