This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f92816cc8507 [SPARK-53878][SQL][CONNECT] Fix race condition issue 
related to ObservedMetrics
f92816cc8507 is described below

commit f92816cc8507b98d2082e3acc43ef296ae5995be
Author: Kousuke Saruta <[email protected]>
AuthorDate: Mon Oct 13 13:24:21 2025 -0700

    [SPARK-53878][SQL][CONNECT] Fix race condition issue related to 
ObservedMetrics
    
    ### What changes were proposed in this pull request?
    In Spark Connect environment, `QueryExecution#observedMetrics` can be 
called by two threads concurrently.
    
    * Thread1(ObservationManager)
    ```
    private def tryComplete(qe: QueryExecution): Unit = {
      val allMetrics = qe.observedMetrics
      qe.logical.foreach {
        case c: CollectMetrics =>
          allMetrics.get(c.name).foreach { metrics =>
            val observation = observations.remove((c.name, c.dataframeId))
            if (observation != null) {
              observation.setMetricsAndNotify(metrics)
            }
          }
        case _ =>
      }
    }
    ```
    
    * Thread2(SparkConnectPlanExecution)
    ```
    private def createObservedMetricsResponse(
        sessionId: String,
        observationAndPlanIds: Map[String, Long],
        dataframe: DataFrame): Option[ExecutePlanResponse] = {
      val observedMetrics = dataframe.queryExecution.observedMetrics.collect {
        case (name, row) if !executeHolder.observations.contains(name) =>
          val values = SparkConnectPlanExecution.toObservedMetricsValues(row)
          name -> values
      }
    ````
    
    This can cause race condition issues. We can see CI failure caused by this 
issue.
    https://github.com/apache/spark/actions/runs/18422173471/job/52497913985
    
    ```
    ======================================================================
    ERROR [0.181s]: test_observe_with_map_type 
(pyspark.sql.tests.connect.test_parity_observation.DataFrameObservationParityTests.test_observe_with_map_type)
    ----------------------------------------------------------------------
    Traceback (most recent call last):
      File "/__w/spark/spark/python/pyspark/testing/utils.py", line 228, in 
wrapper
        lastValue = condition(*args, **kwargs)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/__w/spark/spark/python/pyspark/sql/tests/test_observation.py", 
line 226, in test_observe_with_map_type
        assertDataFrameEqual(df, [Row(id=id) for id in range(10)])
      File "/__w/spark/spark/python/pyspark/testing/utils.py", line 1098, in 
assertDataFrameEqual
        actual_list = actual.collect()
                      ^^^^^^^^^^^^^^^^
      File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line 
1817, in collect
        table, schema = self._to_table()
                        ^^^^^^^^^^^^^^^^
      File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line 
1830, in _to_table
        table, schema, self._execution_info = self._session.client.to_table(
                                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
946, in to_table
        table, schema, metrics, observed_metrics, _ = 
self._execute_and_fetch(req, observations)
                                                      
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
1642, in _execute_and_fetch
        for response in self._execute_and_fetch_as_iterator(
      File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
1619, in _execute_and_fetch_as_iterator
        self._handle_error(error)
      File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
1893, in _handle_error
        self._handle_rpc_error(error)
      File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
1966, in _handle_rpc_error
        raise convert_exception(
    pyspark.errors.exceptions.connect.IllegalArgumentException: requirement 
failed
    
    JVM stacktrace:
    java.lang.IllegalArgumentException
            at scala.Predef$.require(Predef.scala:324)
            at 
org.apache.spark.sql.catalyst.util.ArrayBasedMapData.<init>(ArrayBasedMapData.scala:31)
            at 
org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.build(ArrayBasedMapBuilder.scala:130)
            at 
org.apache.spark.sql.catalyst.expressions.CreateMap.eval(complexTypeCreator.scala:260)
            at 
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:162)
            at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:84)
            at 
org.apache.spark.sql.execution.AggregatingAccumulator.$anonfun$value$2(AggregatingAccumulator.scala:199)
            at 
org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:162)
            at 
org.apache.spark.sql.execution.AggregatingAccumulator.withSQLConf(AggregatingAccumulator.scala:106)
            at 
org.apache.spark.sql.execution.AggregatingAccumulator.value(AggregatingAccumulator.scala:188)
            at 
org.apache.spark.sql.execution.CollectMetricsExec.collectedMetrics(CollectMetricsExec.scala:59)
            at 
org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:111)
            at 
org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:109)
            at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
            at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
            at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1(AdaptiveSparkPlanHelper.scala:86)
            at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1$adapted(AdaptiveSparkPlanHelper.scala:86)
            at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach(AdaptiveSparkPlanHelper.scala:45)
            at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach$(AdaptiveSparkPlanHelper.scala:44)
            at 
org.apache.spark.sql.execution.CollectMetricsExec$.foreach(CollectMetricsExec.scala:101)
            at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect(AdaptiveSparkPlanHelper.scala:86)
            at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect$(AdaptiveSparkPlanHelper.scala:83)
            at 
org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:101)
            at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collectWithSubqueries$1(AdaptiveSparkPlanHelper.scala:113)
            at scala.collection.immutable.List.flatMap(List.scala:294)
            at scala.collection.immutable.List.flatMap(List.scala:79)
            at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries(AdaptiveSparkPlanHelper.scala:113)
            at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries$(AdaptiveSparkPlanHelper.scala:112)
            at 
org.apache.spark.sql.execution.CollectMetricsExec$.collectWithSubqueries(CollectMetricsExec.scala:101)
            at 
org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:109)
            at 
org.apache.spark.sql.execution.QueryExecution.observedMetrics(QueryExecution.scala:276)
            at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.createObservedMetricsResponse(SparkConnectPlanExecution.scala:322)
            at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:82)
            at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224)
            at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)
            at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:394)
            at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
            at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:394)
            at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
            at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:113)
            at 
org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:184)
            at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:103)
            at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:112)
            at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:393)
            at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)
            at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
            at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:333)
    ```
    
    This test failure can be reproduced by inserting sleep into 
`ArrayBasedMapBuilder.scala` like as follows.
    ```
       private def reset(): Unit = {
         keyToIndex.clear()
         keys.clear()
    +    Thread.sleep(10)
         values.clear()
       }
    ```
    
    And then, run the test as follows.
    ```
    $ python/run-tests --modules=pyspark-connect --parallelism=1 --testnames 
pyspark.sql.tests.connect.test_parity_observation  
--python-executables=python3.11
    ```
    
    To fix this issue, this PR proposes to protect 
`QueryExecution#observedMdetrics` using synchronized block.
    
    ### Why are the changes needed?
    Bug fix.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Ran the problematic test with inserting sleep like as mentioned above, and 
confirmed the test passed.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52575 from sarutak/fix-observed-metrics-issue.
    
    Authored-by: Kousuke Saruta <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/sql/execution/QueryExecution.scala     | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 6d27740bcea9..b46172001a87 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import java.io.{BufferedWriter, OutputStreamWriter}
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
 
 import scala.util.control.NonFatal
 
@@ -272,8 +273,13 @@ class QueryExecution(
    */
   def toRdd: RDD[InternalRow] = lazyToRdd.get
 
+  private val observedMetricsLock = new Object
+
   /** Get the metrics observed during the execution of the query plan. */
-  def observedMetrics: Map[String, Row] = 
CollectMetricsExec.collect(executedPlan)
+  @GuardedBy("observedMetricsLock")
+  def observedMetrics: Map[String, Row] = observedMetricsLock.synchronized {
+    CollectMetricsExec.collect(executedPlan)
+  }
 
   protected def preparations: Seq[Rule[SparkPlan]] = {
     QueryExecution.preparations(sparkSession,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to