[ 
https://issues.apache.org/jira/browse/SPARK-52520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jihoon Son updated SPARK-52520:
-------------------------------
    Description: 
A deadlock was found while running a query, which got stuck while scheduling 
next stages to run. Per stack traces, I believe this is what happened. Note 
that I used the Spark-RAPIDS plugin.
 * Thread A (Thread-5 in the stack traces below) acquired a lock on 
ExplainUtils while running AdaptiveSparkPlanExec. With the lock acquired, it 
tried to materialize all subqueries in the plan. It was waiting to lock 
GpuFileSourceScanExec. 
 * Thread B (gpu-broadcast-exchange-127 in the below) acquired a lock on 
GpuFileSourceScanExec.selectedPartitions, which is a lazy val. While evaluating 
the lazy val, it tried to update delta log, which triggered evaluating a data 
frame (because of delta). Evaluating a data frame required to explain the query 
plan which required to lock ExplainUtils.

Even though I am seeing this issue with the Spark-RAPIDS plugin, I believe that 
Apache Spark has the same issue as the function call hierarchy is almost 
identical. The FileSourceScanExec class in Apache Spark also has the 
[selectedPartitions lazy 
val|https://github.com/apache/spark/blob/v3.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L251-L262],
 which is computed in the same way as in the [Spark-RAPIDS 
plugin|https://github.com/NVIDIA/spark-rapids/blob/v25.04.0/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala#L125-L136].
 

I noticed that this problem doesn't exist in Spark 3.5.x as the synchronization 
on ExplainUtils was removed in 
https://issues.apache.org/jira/browse/SPARK-48610. Can we consider backporting 
this fix to the 3.4 branch as well?

Here are more details of the deadlock.

 
Found one Java-level deadlock:
=============================
"Thread-5":
  waiting to lock monitor 0x00007f3cbc005280 (object 0x0000000306482908, a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec),
  which is held by "gpu-broadcast-exchange-127"
"gpu-broadcast-exchange-127":
  waiting to lock monitor 0x00007f3adc00d580 (object 0x0000000300fd86e8, a 
org.apache.spark.sql.execution.ExplainUtils$),
  which is held by "Thread-5"
 
Java stack information for the threads listed above:
===================================================
"Thread-5":
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries$lzycompute(QueryPlan.scala:450)
 - waiting to lock <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries(QueryPlan.scala:450)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.innerChildren(QueryPlan.scala:533)
...
at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
at 
org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:87)
 - locked <0x0000000300fd86e8> (a org.apache.spark.sql.execution.ExplainUtils$)
at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:730)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:255)
...
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:242)
 - locked <0x000000030d336ba0> (a java.lang.Object)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:387)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:360)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$2041/0x0000000840f12c40.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
 Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
...
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
 - locked <0x0000000317df3270> (a org.apache.spark.sql.execution.QueryExecution)
at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
...
 
"gpu-broadcast-exchange-127":
at 
org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:80)
 - waiting to lock <0x0000000300fd86e8> (a 
org.apache.spark.sql.execution.ExplainUtils$)
at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
 Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:3459)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles(DataSkippingReader.scala:1091)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles$(DataSkippingReader.scala:1090)
at 
org.apache.spark.sql.delta.Snapshot.convertDataFrameToAddFiles(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$2(DataSkippingReader.scala:850)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5257/0x0000000841a9e040.apply(Unknown
 Source)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$1(DataSkippingReader.scala:848)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5201/0x0000000841a71040.apply(Unknown
 Source)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
...
at 
com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
at 
com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
at org.apache.spark.sql.delta.Snapshot.recordOperation(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
at org.apache.spark.sql.delta.Snapshot.recordDeltaOperation(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan(DataSkippingReader.scala:922)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan$(DataSkippingReader.scala:867)
at org.apache.spark.sql.delta.Snapshot.filesForScan(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex.matchingFiles(PrepareDeltaScan.scala:362)
at 
org.apache.spark.sql.delta.files.TahoeFileIndex.listAddFiles(TahoeFileIndex.scala:72)
at 
org.apache.spark.sql.delta.files.TahoeFileIndex.listFiles(TahoeFileIndex.scala:64)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions$lzycompute(GpuFileSourceScanExec.scala:129)
 - locked <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions(GpuFileSourceScanExec.scala:125)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions$lzycompute(GpuFileSourceScanExec.scala:160)
 - locked <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions(GpuFileSourceScanExec.scala:141)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.$anonfun$createNonBucketedReadRDD$1(GpuFileSourceScanExec.scala:530)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec$$Lambda$5684/0x0000000841c32840.apply(Unknown
 Source)
at scala.Option.getOrElse(Option.scala:189)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.createNonBucketedReadRDD(GpuFileSourceScanExec.scala:527)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD$lzycompute(GpuFileSourceScanExec.scala:350)
 - locked <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD(GpuFileSourceScanExec.scala:328)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.internalDoExecuteColumnar(GpuFileSourceScanExec.scala:440)
at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar(GpuExec.scala:192)
at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar$(GpuExec.scala:190)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.doExecuteColumnar(GpuFileSourceScanExec.scala:67)
...
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$2(GpuBroadcastExchangeExec.scala:400)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5677/0x0000000841c2f840.apply(Unknown
 Source)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$1(GpuBroadcastExchangeExec.scala:399)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5671/0x0000000841c2bc40.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$5672/0x0000000841c2c040.call(Unknown
 Source)
at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
at java.lang.Thread.run([email protected]/Thread.java:829)
 

  was:
A deadlock was found while running a query, which got stuck while scheduling 
next stages to run. Per stack traces, I believe this is what happened. Note 
that I used the Spark-RAPIDS plugin.
 * Thread A (Thread-5 in the stack traces below) acquired a lock on 
ExplainUtils while running AdaptiveSparkPlanExec. With the lock acquired, it 
tried to materialize all subqueries in the plan. It was waiting to lock 
GpuFileSourceScanExec. 
 * Thread B (gpu-broadcast-exchange-127 in the below) acquired a lock on 
GpuFileSourceScanExec.selectedPartitions, which is a lazy val. While evaluating 
the lazy val, it tried to update delta log, which triggered evaluating a data 
frame (because of delta). Evaluating a data frame required to explain the query 
plan which required to lock ExplainUtils.

Even though I am seeing this issue with the Spark-RAPIDS plugin, I believe that 
Apache Spark has the same issue as the function call hierarchy is almost 
identical. The FileSourceScanExec class in Apache Spark also has the 
[selectedPartitions lazy 
val|https://github.com/apache/spark/blob/v3.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L251-L262],
 which is computed in the same way as in the [Spark-RAPIDS 
plugin|https://github.com/NVIDIA/spark-rapids/blob/v25.04.0/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala#L125-L136].
 

I noticed that this problem doesn't exist in Spark 3.5.x as the synchronization 
on ExplainUtils were removed in 
https://issues.apache.org/jira/browse/SPARK-48610. Can we consider backporting 
this fix to 3.4 branch as well?

Here are more details of the deadlock.

 
Found one Java-level deadlock:
=============================
"Thread-5":
  waiting to lock monitor 0x00007f3cbc005280 (object 0x0000000306482908, a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec),
  which is held by "gpu-broadcast-exchange-127"
"gpu-broadcast-exchange-127":
  waiting to lock monitor 0x00007f3adc00d580 (object 0x0000000300fd86e8, a 
org.apache.spark.sql.execution.ExplainUtils$),
  which is held by "Thread-5"
 
Java stack information for the threads listed above:
===================================================
"Thread-5":
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries$lzycompute(QueryPlan.scala:450)
 - waiting to lock <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries(QueryPlan.scala:450)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.innerChildren(QueryPlan.scala:533)
...
at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
at 
org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:87)
 - locked <0x0000000300fd86e8> (a org.apache.spark.sql.execution.ExplainUtils$)
at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:730)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:255)
...
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:242)
 - locked <0x000000030d336ba0> (a java.lang.Object)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:387)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:360)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$2041/0x0000000840f12c40.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
 Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
...
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
 - locked <0x0000000317df3270> (a org.apache.spark.sql.execution.QueryExecution)
at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
...
 
"gpu-broadcast-exchange-127":
at 
org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:80)
 - waiting to lock <0x0000000300fd86e8> (a 
org.apache.spark.sql.execution.ExplainUtils$)
at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
 Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:3459)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles(DataSkippingReader.scala:1091)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles$(DataSkippingReader.scala:1090)
at 
org.apache.spark.sql.delta.Snapshot.convertDataFrameToAddFiles(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$2(DataSkippingReader.scala:850)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5257/0x0000000841a9e040.apply(Unknown
 Source)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$1(DataSkippingReader.scala:848)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5201/0x0000000841a71040.apply(Unknown
 Source)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
...
at 
com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
at 
com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
at org.apache.spark.sql.delta.Snapshot.recordOperation(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
at org.apache.spark.sql.delta.Snapshot.recordDeltaOperation(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan(DataSkippingReader.scala:922)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan$(DataSkippingReader.scala:867)
at org.apache.spark.sql.delta.Snapshot.filesForScan(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex.matchingFiles(PrepareDeltaScan.scala:362)
at 
org.apache.spark.sql.delta.files.TahoeFileIndex.listAddFiles(TahoeFileIndex.scala:72)
at 
org.apache.spark.sql.delta.files.TahoeFileIndex.listFiles(TahoeFileIndex.scala:64)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions$lzycompute(GpuFileSourceScanExec.scala:129)
 - locked <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions(GpuFileSourceScanExec.scala:125)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions$lzycompute(GpuFileSourceScanExec.scala:160)
 - locked <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions(GpuFileSourceScanExec.scala:141)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.$anonfun$createNonBucketedReadRDD$1(GpuFileSourceScanExec.scala:530)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec$$Lambda$5684/0x0000000841c32840.apply(Unknown
 Source)
at scala.Option.getOrElse(Option.scala:189)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.createNonBucketedReadRDD(GpuFileSourceScanExec.scala:527)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD$lzycompute(GpuFileSourceScanExec.scala:350)
 - locked <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD(GpuFileSourceScanExec.scala:328)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.internalDoExecuteColumnar(GpuFileSourceScanExec.scala:440)
at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar(GpuExec.scala:192)
at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar$(GpuExec.scala:190)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.doExecuteColumnar(GpuFileSourceScanExec.scala:67)
...
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$2(GpuBroadcastExchangeExec.scala:400)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5677/0x0000000841c2f840.apply(Unknown
 Source)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$1(GpuBroadcastExchangeExec.scala:399)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5671/0x0000000841c2bc40.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$5672/0x0000000841c2c040.call(Unknown
 Source)
at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
at java.lang.Thread.run([email protected]/Thread.java:829)
 


> Deadlock in the executor with delta input and AQE on
> ----------------------------------------------------
>
>                 Key: SPARK-52520
>                 URL: https://issues.apache.org/jira/browse/SPARK-52520
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.3, 3.4.4
>            Reporter: Jihoon Son
>            Priority: Major
>
> A deadlock was found while running a query, which got stuck while scheduling 
> next stages to run. Per stack traces, I believe this is what happened. Note 
> that I used the Spark-RAPIDS plugin.
>  * Thread A (Thread-5 in the stack traces below) acquired a lock on 
> ExplainUtils while running AdaptiveSparkPlanExec. With the lock acquired, it 
> tried to materialize all subqueries in the plan. It was waiting to lock 
> GpuFileSourceScanExec. 
>  * Thread B (gpu-broadcast-exchange-127 in the below) acquired a lock on 
> GpuFileSourceScanExec.selectedPartitions, which is a lazy val. While 
> evaluating the lazy val, it tried to update delta log, which triggered 
> evaluating a data frame (because of delta). Evaluating a data frame required 
> to explain the query plan which required to lock ExplainUtils.
> Even though I am seeing this issue with the Spark-RAPIDS plugin, I believe 
> that Apache Spark has the same issue as the function call hierarchy is almost 
> identical. The FileSourceScanExec class in Apache Spark also has the 
> [selectedPartitions lazy 
> val|https://github.com/apache/spark/blob/v3.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L251-L262],
>  which is computed in the same way as in the [Spark-RAPIDS 
> plugin|https://github.com/NVIDIA/spark-rapids/blob/v25.04.0/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala#L125-L136].
>  
> I noticed that this problem doesn't exist in Spark 3.5.x as the 
> synchronization on ExplainUtils was removed in 
> https://issues.apache.org/jira/browse/SPARK-48610. Can we consider 
> backporting this fix to the 3.4 branch as well?
> Here are more details of the deadlock.
>  
> Found one Java-level deadlock:
> =============================
> "Thread-5":
>   waiting to lock monitor 0x00007f3cbc005280 (object 0x0000000306482908, a 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec),
>   which is held by "gpu-broadcast-exchange-127"
> "gpu-broadcast-exchange-127":
>   waiting to lock monitor 0x00007f3adc00d580 (object 0x0000000300fd86e8, a 
> org.apache.spark.sql.execution.ExplainUtils$),
>   which is held by "Thread-5"
>  
> Java stack information for the threads listed above:
> ===================================================
> "Thread-5":
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries$lzycompute(QueryPlan.scala:450)
>  - waiting to lock <0x0000000306482908> (a 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries(QueryPlan.scala:450)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.innerChildren(QueryPlan.scala:533)
> ...
> at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
> at 
> org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:87)
>  - locked <0x0000000300fd86e8> (a 
> org.apache.spark.sql.execution.ExplainUtils$)
> at 
> org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
> at 
> org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
> at 
> org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
> at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:730)
> at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:255)
> ...
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
> at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:242)
>  - locked <0x000000030d336ba0> (a java.lang.Object)
> at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:387)
> at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:360)
> at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
> at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$2041/0x0000000840f12c40.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
> at 
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
> at 
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
>  Source)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
> at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
> at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
> ...
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
> at 
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
> at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
>  - locked <0x0000000317df3270> (a 
> org.apache.spark.sql.execution.QueryExecution)
> at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
> at 
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
> at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
> at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
> ...
>  
> "gpu-broadcast-exchange-127":
> at 
> org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:80)
>  - waiting to lock <0x0000000300fd86e8> (a 
> org.apache.spark.sql.execution.ExplainUtils$)
> at 
> org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
> at 
> org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
> at 
> org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
> at 
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
> at 
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
>  Source)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206)
> at org.apache.spark.sql.Dataset.collect(Dataset.scala:3459)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles(DataSkippingReader.scala:1091)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles$(DataSkippingReader.scala:1090)
> at 
> org.apache.spark.sql.delta.Snapshot.convertDataFrameToAddFiles(Snapshot.scala:66)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$2(DataSkippingReader.scala:850)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5257/0x0000000841a9e040.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
> at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:66)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$1(DataSkippingReader.scala:848)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5201/0x0000000841a71040.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
> ...
> at 
> com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
> at 
> com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
> at org.apache.spark.sql.delta.Snapshot.recordOperation(Snapshot.scala:66)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
> at 
> org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
> at org.apache.spark.sql.delta.Snapshot.recordDeltaOperation(Snapshot.scala:66)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan(DataSkippingReader.scala:922)
> at 
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan$(DataSkippingReader.scala:867)
> at org.apache.spark.sql.delta.Snapshot.filesForScan(Snapshot.scala:66)
> at 
> org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex.matchingFiles(PrepareDeltaScan.scala:362)
> at 
> org.apache.spark.sql.delta.files.TahoeFileIndex.listAddFiles(TahoeFileIndex.scala:72)
> at 
> org.apache.spark.sql.delta.files.TahoeFileIndex.listFiles(TahoeFileIndex.scala:64)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions$lzycompute(GpuFileSourceScanExec.scala:129)
>  - locked <0x0000000306482908> (a 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions(GpuFileSourceScanExec.scala:125)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions$lzycompute(GpuFileSourceScanExec.scala:160)
>  - locked <0x0000000306482908> (a 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions(GpuFileSourceScanExec.scala:141)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.$anonfun$createNonBucketedReadRDD$1(GpuFileSourceScanExec.scala:530)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec$$Lambda$5684/0x0000000841c32840.apply(Unknown
>  Source)
> at scala.Option.getOrElse(Option.scala:189)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.createNonBucketedReadRDD(GpuFileSourceScanExec.scala:527)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD$lzycompute(GpuFileSourceScanExec.scala:350)
>  - locked <0x0000000306482908> (a 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD(GpuFileSourceScanExec.scala:328)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.internalDoExecuteColumnar(GpuFileSourceScanExec.scala:440)
> at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar(GpuExec.scala:192)
> at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar$(GpuExec.scala:190)
> at 
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.doExecuteColumnar(GpuFileSourceScanExec.scala:67)
> ...
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
> at 
> org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
> at 
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$2(GpuBroadcastExchangeExec.scala:400)
> at 
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5677/0x0000000841c2f840.apply(Unknown
>  Source)
> at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
> at 
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$1(GpuBroadcastExchangeExec.scala:399)
> at 
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5671/0x0000000841c2bc40.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217)
> at 
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$5672/0x0000000841c2c040.call(Unknown
>  Source)
> at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
> at java.lang.Thread.run([email protected]/Thread.java:829)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to