[ 
https://issues.apache.org/jira/browse/SPARK-53878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-53878:
-------------------------------------

    Assignee: Kousuke Saruta

> Fix race condition issue related to ObservedMetrics
> ---------------------------------------------------
>
>                 Key: SPARK-53878
>                 URL: https://issues.apache.org/jira/browse/SPARK-53878
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Connect, SQL
>    Affects Versions: 4.1.0
>            Reporter: Kousuke Saruta
>            Assignee: Kousuke Saruta
>            Priority: Major
>              Labels: pull-request-available
>
> In Spark Connect environment, QueryExecution#observedMetrics can be called by 
> two threads concurrently.
>  
> Thread1(ObservationManager)
> {code:java}
> private def tryComplete(qe: QueryExecution): Unit = {
>   val allMetrics = qe.observedMetrics
>   qe.logical.foreach {
>     case c: CollectMetrics =>
>       allMetrics.get(c.name).foreach { metrics =>
>         val observation = observations.remove((c.name, c.dataframeId))
>         if (observation != null) {
>           observation.setMetricsAndNotify(metrics)
>         }
>       }
>     case _ =>
>   }
> }
> {code}
> Thread2(SparkConnectPlanExecution)
> {code:java}
> private def createObservedMetricsResponse(
>     sessionId: String,
>     observationAndPlanIds: Map[String, Long],
>     dataframe: DataFrame): Option[ExecutePlanResponse] = {
>   val observedMetrics = dataframe.queryExecution.observedMetrics.collect {
>     case (name, row) if !executeHolder.observations.contains(name) =>
>       val values = SparkConnectPlanExecution.toObservedMetricsValues(row)
>       name -> values
>   } {code}
> This can cause race condition issues. We can see CI failure caused by this 
> issue.
> https://github.com/apache/spark/actions/runs/18422173471/job/52497913985
> {code}
> ======================================================================
> ERROR [0.181s]: test_observe_with_map_type 
> (pyspark.sql.tests.connect.test_parity_observation.DataFrameObservationParityTests.test_observe_with_map_type)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File "/__w/spark/spark/python/pyspark/testing/utils.py", line 228, in 
> wrapper
>     lastValue = condition(*args, **kwargs)
>                 ^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File "/__w/spark/spark/python/pyspark/sql/tests/test_observation.py", line 
> 226, in test_observe_with_map_type
>     assertDataFrameEqual(df, [Row(id=id) for id in range(10)])
>   File "/__w/spark/spark/python/pyspark/testing/utils.py", line 1098, in 
> assertDataFrameEqual
>     actual_list = actual.collect()
>                   ^^^^^^^^^^^^^^^^
>   File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line 1817, 
> in collect
>     table, schema = self._to_table()
>                     ^^^^^^^^^^^^^^^^
>   File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line 1830, 
> in _to_table
>     table, schema, self._execution_info = self._session.client.to_table(
>                                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
> 946, in to_table
>     table, schema, metrics, observed_metrics, _ = 
> self._execute_and_fetch(req, observations)
>                                                   
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
> 1642, in _execute_and_fetch
>     for response in self._execute_and_fetch_as_iterator(
>   File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
> 1619, in _execute_and_fetch_as_iterator
>     self._handle_error(error)
>   File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
> 1893, in _handle_error
>     self._handle_rpc_error(error)
>   File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
> 1966, in _handle_rpc_error
>     raise convert_exception(
> pyspark.errors.exceptions.connect.IllegalArgumentException: requirement failed
> JVM stacktrace:
> java.lang.IllegalArgumentException
>       at scala.Predef$.require(Predef.scala:324)
>       at 
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData.<init>(ArrayBasedMapData.scala:31)
>       at 
> org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.build(ArrayBasedMapBuilder.scala:130)
>       at 
> org.apache.spark.sql.catalyst.expressions.CreateMap.eval(complexTypeCreator.scala:260)
>       at 
> org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:162)
>       at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:84)
>       at 
> org.apache.spark.sql.execution.AggregatingAccumulator.$anonfun$value$2(AggregatingAccumulator.scala:199)
>       at 
> org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:162)
>       at 
> org.apache.spark.sql.execution.AggregatingAccumulator.withSQLConf(AggregatingAccumulator.scala:106)
>       at 
> org.apache.spark.sql.execution.AggregatingAccumulator.value(AggregatingAccumulator.scala:188)
>       at 
> org.apache.spark.sql.execution.CollectMetricsExec.collectedMetrics(CollectMetricsExec.scala:59)
>       at 
> org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:111)
>       at 
> org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:109)
>       at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
>       at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
>       at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1(AdaptiveSparkPlanHelper.scala:86)
>       at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1$adapted(AdaptiveSparkPlanHelper.scala:86)
>       at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach(AdaptiveSparkPlanHelper.scala:45)
>       at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach$(AdaptiveSparkPlanHelper.scala:44)
>       at 
> org.apache.spark.sql.execution.CollectMetricsExec$.foreach(CollectMetricsExec.scala:101)
>       at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect(AdaptiveSparkPlanHelper.scala:86)
>       at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect$(AdaptiveSparkPlanHelper.scala:83)
>       at 
> org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:101)
>       at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collectWithSubqueries$1(AdaptiveSparkPlanHelper.scala:113)
>       at scala.collection.immutable.List.flatMap(List.scala:294)
>       at scala.collection.immutable.List.flatMap(List.scala:79)
>       at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries(AdaptiveSparkPlanHelper.scala:113)
>       at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries$(AdaptiveSparkPlanHelper.scala:112)
>       at 
> org.apache.spark.sql.execution.CollectMetricsExec$.collectWithSubqueries(CollectMetricsExec.scala:101)
>       at 
> org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:109)
>       at 
> org.apache.spark.sql.execution.QueryExecution.observedMetrics(QueryExecution.scala:276)
>       at 
> org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.createObservedMetricsResponse(SparkConnectPlanExecution.scala:322)
>       at 
> org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:82)
>       at 
> org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224)
>       at 
> org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)
>       at 
> org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:394)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
>       at 
> org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:394)
>       at 
> org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
>       at 
> org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:113)
>       at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:184)
>       at 
> org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:103)
>       at 
> org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:112)
>       at 
> org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:393)
>       at 
> org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)
>       at 
> org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
>       at 
> org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:333)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to