This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dd48646ab868 [SPARK-55037][CONNECT][SQL] Re-implement Observation 
Without Using the QueryExecutionListener
dd48646ab868 is described below

commit dd48646ab8680904a830afd17be7abebc682fd45
Author: Yihong He <[email protected]>
AuthorDate: Fri Jan 23 01:53:56 2026 +0800

    [SPARK-55037][CONNECT][SQL] Re-implement Observation Without Using the 
QueryExecutionListener
    
    ### What changes were proposed in this pull request?
    
    This PR proposes the below change:
    - **Re-implement Observation without using QueryExecutionListener**: 
Changed the observation completion mechanism to call 
ObservationManager.tryComplete() directly from 
SQLExecution.withNewExecutionId() instead of relying on QueryExecutionListener 
callbacks.
    
    ### Why are the changes needed?
    
    The previous implementation using QueryExecutionListener had two issues:
    - **No guarantees about when observation metrics are delivered**: Because 
QueryExecutionListener operates asynchronously, there are no guarantees about 
when observation metrics are delivered. Even after QueryExecution finishes on 
the main thread, Observation objects may not receive the collected observed 
metrics from the QueryExecutionListener until the asynchronous observed metrics 
collection is complete.
    - **Delayed metric processing due to listener contention**: The processing 
of observed metrics in a QueryExecutionListener can be delayed by other slow 
registered listeners.
    
    The listener contention issue can be reproduced by copying the code below 
and running it in SparkConnectServerTestSuite using `build/sbt 
"connect/testOnly *SparkConnectServerTestSuite -- -z SPARK-55037"`:
    ```scala
    test("SPARK-55037: listener contention should not fail observation") {
        withSession { clientSession =>
          // Trigger server session creation by executing a simple query
          clientSession.sql("SELECT 1").collect()
    
          // Get server-side session to register a slow listener
          val serverSession = getServerSession(clientSession)
    
          // Register a slow QueryExecutionListener that sleeps during onSuccess
          val slowListener = new QueryExecutionListener {
            override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
              // Simulate slow listener processing. 1000ms is sufficient to hit 
the timeout
              // in Observation.getRowOrEmpty, since the timeout in 
Observation.getRowOrEmpty
              // is 100ms in the previous implementation.
              Thread.sleep(1000)
            }
    
            override def onFailure(
                funcName: String,
                qe: QueryExecution,
                exception: Exception): Unit = {}
          }
    
          serverSession.listenerManager.register(slowListener)
    
          // Create a simple observation using the client session
          val observation = new Observation("test_observation")
          val clientDf = clientSession.range(10)
          val observedClientDf = clientDf.observe(
            observation,
            functions.min("id"),
            functions.max("id"),
            functions.sum("id"))
    
          // Execute the query via client session - this triggers the slow 
listener on server
          val result = observedClientDf.collect()
          assert(result.length == 10)
    
          // Verify the observation metrics are available despite the slow 
listener
          val metrics = observation.get
          assert(metrics.size == 3)
          assert(metrics("min(id)") == 0L)
          assert(metrics("max(id)") == 9L)
          assert(metrics("sum(id)") == 45L)
        }
      }
    ```
    
    By calling tryComplete() directly from SQLExecution.withNewExecutionId(), 
we ensure that observation metrics are completed synchronously and 
deterministically, without being affected by other listeners.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes
    
    Closes #53801 from heyihong/SPARK-55037.
    
    Authored-by: Yihong He <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../main/scala/org/apache/spark/sql/Observation.scala    |  5 ++---
 .../apache/spark/sql/classic/ObservationManager.scala    | 16 +++-------------
 .../org/apache/spark/sql/classic/SparkSession.scala      |  1 +
 .../org/apache/spark/sql/execution/SQLExecution.scala    |  4 ++++
 4 files changed, 10 insertions(+), 16 deletions(-)

diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala
index ffb8a0c0ba05..3e0a2515d7fe 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/Observation.scala
@@ -21,9 +21,8 @@ import java.util.UUID
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.concurrent.{Future, Promise}
-import scala.concurrent.duration.{Duration, DurationInt}
+import scala.concurrent.duration.Duration
 import scala.jdk.CollectionConverters.MapHasAsJava
-import scala.util.Try
 
 import org.apache.spark.util.SparkThreadUtils
 
@@ -130,7 +129,7 @@ class Observation(val name: String) {
    *   the observed metrics as a `Row`, or None if the metrics are not 
available.
    */
   private[sql] def getRowOrEmpty: Option[Row] = {
-    Try(SparkThreadUtils.awaitResult(future, 100.millis)).toOption
+    future.value.flatMap(_.toOption)
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
index fbe7034cc247..972f572218ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
@@ -22,14 +22,12 @@ import org.apache.spark.sql.{Observation, Row}
 import org.apache.spark.sql.catalyst.plans.logical.CollectMetrics
 import org.apache.spark.sql.catalyst.trees.TreePattern
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.util.QueryExecutionListener
 
 /**
  * This class keeps track of registered Observations that await query 
completion.
  */
 private[sql] class ObservationManager(session: SparkSession) {
   private val observations = new ConcurrentHashMap[(String, Long), Observation]
-  session.listenerManager.register(Listener)
 
   def register(observation: Observation, ds: Dataset[_]): Unit = {
     if (ds.isStreaming) {
@@ -53,10 +51,10 @@ private[sql] class ObservationManager(session: 
SparkSession) {
       observation
     })
 
-  private def tryComplete(qe: QueryExecution): Unit = {
+  private[sql] def tryComplete(qe: QueryExecution): Unit = {
     // Use lazy val to defer collecting the observed metrics until it is 
needed so that tryComplete
     // can finish faster (e.g., when the logical plan doesn't contain 
CollectMetrics).
-    lazy val allMetrics = qe.observedMetrics
+    lazy val lazyObservedMetrics = qe.observedMetrics
     qe.logical.foreachWithSubqueriesAndPruning(
       _.containsPattern(TreePattern.COLLECT_METRICS)) {
       case c: CollectMetrics =>
@@ -65,17 +63,9 @@ private[sql] class ObservationManager(session: SparkSession) 
{
           // If the key exists but no metrics were collected, it means for 
some reason the
           // metrics could not be collected. This can happen e.g., if the 
CollectMetricsExec
           // was optimized away.
-          observation.setMetricsAndNotify(allMetrics.getOrElse(c.name, 
Row.empty))
+          
observation.setMetricsAndNotify(lazyObservedMetrics.getOrElse(c.name, 
Row.empty))
         }
       case _ =>
     }
   }
-
-  private object Listener extends QueryExecutionListener {
-    override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit =
-      tryComplete(qe)
-
-    override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit =
-      tryComplete(qe)
-  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
index 3b6f268ff20f..c4620f13afc6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
@@ -907,6 +907,7 @@ class SparkSession private(
       override protected def conf: SQLConf = sessionState.conf
     }
 
+  @transient
   private[sql] lazy val observationManager = new ObservationManager(this)
 
   override private[sql] def isUsable: Boolean = !sparkContext.isStopped
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 2ac04e0ef20d..45fa6c60f465 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -239,6 +239,10 @@ object SQLExecution extends Logging {
               event.qe = queryExecution
               event.executionFailure = ex
               sc.listenerBus.post(event)
+
+              // Observation.tryComplete is called here to ensure the 
observation is completed,
+              // but it is not high priority, so it is fine to call it later.
+              sparkSession.observationManager.tryComplete(queryExecution)
             }
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to