[ 
https://issues.apache.org/jira/browse/SPARK-52520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17985755#comment-17985755
 ] 

Kent Yao commented on SPARK-52520:
----------------------------------

Hi [~jihoonson], 
Unfortunately, 3.4 is EOL. There won't be a new release for 3.4 anymore. You 
can either backport the PR yourself to your fork or simply upgrade Spark to 
3.5.2 or upper.

> Deadlock in the executor with delta input and AQE on
> ----------------------------------------------------
>
>                 Key: SPARK-52520
>                 URL: https://issues.apache.org/jira/browse/SPARK-52520
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.3, 3.4.4
>            Reporter: Jihoon Son
>            Priority: Major
>
> A deadlock was found while running a query, which got stuck while scheduling 
> next stages to run. Per stack traces, I believe this is what happened. Note 
> that I used the Spark-RAPIDS plugin.
>  * Thread A (Thread-5 in the stack traces below) acquired a lock on 
> ExplainUtils while running AdaptiveSparkPlanExec. With the lock acquired, it 
> tried to materialize all subqueries in the plan. It was waiting to lock 
> GpuFileSourceScanExec. 
>  * Thread B (gpu-broadcast-exchange-127 in the below) acquired a lock on 
> GpuFileSourceScanExec.selectedPartitions, which is a lazy val. While 
> evaluating the lazy val, it tried to update delta log, which triggered 
> evaluating a data frame (because of delta). Evaluating a data frame required 
> to explain the query plan which required to lock ExplainUtils.
> Even though I am seeing this issue with the Spark-RAPIDS plugin, I believe 
> that Apache Spark has the same issue as the function call hierarchy is almost 
> identical. The FileSourceScanExec class in Apache Spark also has the 
> [selectedPartitions lazy 
> val|https://github.com/apache/spark/blob/v3.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L251-L262],
>  which is computed in the same way as in the [Spark-RAPIDS 
> plugin|https://github.com/NVIDIA/spark-rapids/blob/v25.04.0/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala#L125-L136].
>  
> I noticed that this problem doesn't exist in Spark 3.5.x as the 
> synchronization on ExplainUtils was removed in 
> https://issues.apache.org/jira/browse/SPARK-48610. Can we consider 
> backporting this fix to the 3.4 branch as well?
> Here are more details of the deadlock.
>  
> Found one Java-level deadlock:
> =============================
> "Thread-5":
>   waiting to lock monitor 0x00007f3cbc005280 (object 0x0000000306482908, a 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec),
>   which is held by "gpu-broadcast-exchange-127"
> "gpu-broadcast-exchange-127":
>   waiting to lock monitor 0x00007f3adc00d580 (object 0x0000000300fd86e8, a 
> org.apache.spark.sql.execution.ExplainUtils$),
>   which is held by "Thread-5"
>  
> Java stack information for the threads listed above:
> ===================================================
> "Thread-5":
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries$lzycompute(QueryPlan.scala:450)
>  - waiting to lock <0x0000000306482908> (a 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries(QueryPlan.scala:450)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.innerChildren(QueryPlan.scala:533)
> ...
> at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
> at 
> org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:87)
>  - locked <0x0000000300fd86e8> (a 
> org.apache.spark.sql.execution.ExplainUtils$)
> at 
> org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
> at 
> org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
> at 
> org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
> at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:730)
> at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:255)
> ...
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
> at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:242)
>  - locked <0x000000030d336ba0> (a java.lang.Object)
> at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:387)
> at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:360)
> at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
> at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$2041/0x0000000840f12c40.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
> at 
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
> at 
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
>  Source)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
> at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
> at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
> ...
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
> at 
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
> at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
>  - locked <0x0000000317df3270> (a 
> org.apache.spark.sql.execution.QueryExecution)
> at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
> at 
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
> at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
> at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
> ...
>  
> "gpu-broadcast-exchange-127":
> at 
> org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:80)
>  - waiting to lock <0x0000000300fd86e8> (a 
> org.apache.spark.sql.execution.ExplainUtils$)
> at 
> org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
> at 
> org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
> at 
> org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
> at 
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
> at 
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
>  Source)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206)
> at org.apache.spark.sql.Dataset.collect(Dataset.scala:3459)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles(DataSkippingReader.scala:1091)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles$(DataSkippingReader.scala:1090)
> at 
> org.apache.spark.sql.delta.Snapshot.convertDataFrameToAddFiles(Snapshot.scala:66)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$2(DataSkippingReader.scala:850)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5257/0x0000000841a9e040.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
> at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:66)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$1(DataSkippingReader.scala:848)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5201/0x0000000841a71040.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
> ...
> at 
> com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
> at 
> com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
> at org.apache.spark.sql.delta.Snapshot.recordOperation(Snapshot.scala:66)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
> at org.apache.spark.sql.delta.Snapshot.recordDeltaOperation(Snapshot.scala:66)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan(DataSkippingReader.scala:922)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan$(DataSkippingReader.scala:867)
> at org.apache.spark.sql.delta.Snapshot.filesForScan(Snapshot.scala:66)
> at 
> org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex.matchingFiles(PrepareDeltaScan.scala:362)
> at 
> org.apache.spark.sql.delta.files.TahoeFileIndex.listAddFiles(TahoeFileIndex.scala:72)
> at 
> org.apache.spark.sql.delta.files.TahoeFileIndex.listFiles(TahoeFileIndex.scala:64)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions$lzycompute(GpuFileSourceScanExec.scala:129)
>  - locked <0x0000000306482908> (a 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions(GpuFileSourceScanExec.scala:125)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions$lzycompute(GpuFileSourceScanExec.scala:160)
>  - locked <0x0000000306482908> (a 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions(GpuFileSourceScanExec.scala:141)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.$anonfun$createNonBucketedReadRDD$1(GpuFileSourceScanExec.scala:530)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec$$Lambda$5684/0x0000000841c32840.apply(Unknown
>  Source)
> at scala.Option.getOrElse(Option.scala:189)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.createNonBucketedReadRDD(GpuFileSourceScanExec.scala:527)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD$lzycompute(GpuFileSourceScanExec.scala:350)
>  - locked <0x0000000306482908> (a 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD(GpuFileSourceScanExec.scala:328)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.internalDoExecuteColumnar(GpuFileSourceScanExec.scala:440)
> at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar(GpuExec.scala:192)
> at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar$(GpuExec.scala:190)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.doExecuteColumnar(GpuFileSourceScanExec.scala:67)
> ...
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
> at 
> org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
> at 
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$2(GpuBroadcastExchangeExec.scala:400)
> at 
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5677/0x0000000841c2f840.apply(Unknown
>  Source)
> at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
> at 
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$1(GpuBroadcastExchangeExec.scala:399)
> at 
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5671/0x0000000841c2bc40.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217)
> at 
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$5672/0x0000000841c2c040.call(Unknown
>  Source)
> at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
> at java.lang.Thread.run([email protected]/Thread.java:829)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to