Jihoon Son created SPARK-52520:
----------------------------------

             Summary: Deadlock in the executor with delta input and AQE on
                 Key: SPARK-52520
                 URL: https://issues.apache.org/jira/browse/SPARK-52520
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.4.4, 3.4.3
            Reporter: Jihoon Son


A deadlock was found while running a query, which got stuck while scheduling 
next stages to run. Per stack traces, I believe this is what happened. Note 
that I used the Spark-RAPIDS plugin.
 * Thread A (Thread-5 in the stack traces below) acquired a lock on 
ExplainUtils while running AdaptiveSparkPlanExec. With the lock acquired, it 
tried to materialize all subqueries in the plan. It was waiting to lock 
GpuFileSourceScanExec. 
 * Thread B (gpu-broadcast-exchange-127 in the below) acquired a lock on 
GpuFileSourceScanExec.selectedPartitions, which is a lazy val. While evaluating 
the lazy val, it tried to update delta log, which triggered evaluating a data 
frame (because of delta). Evaluating a data frame required to explain the query 
plan which required to lock ExplainUtils.

Even though I am seeing this issue with the Spark-RAPIDS plugin, I believe that 
Apache Spark has the same issue as the function call hierarchy is almost 
identical. The FileSourceScanExec class in Apache Spark also has the 
[selectedPartitions lazy 
val|https://github.com/apache/spark/blob/v3.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L251-L262],
 which is computed in the same way as in the [Spark-RAPIDS 
plugin|https://github.com/NVIDIA/spark-rapids/blob/v25.04.0/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala#L125-L136].
 

I noticed that this problem doesn't exist in Spark 3.5.x as the synchronization 
on ExplainUtils were removed in 
https://issues.apache.org/jira/browse/SPARK-48610. We should consider 
backporting this fix to 3.4 branch as well.

Here are more details of the deadlock.

 
Found one Java-level deadlock:
=============================
"Thread-5":
  waiting to lock monitor 0x00007f3cbc005280 (object 0x0000000306482908, a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec),
  which is held by "gpu-broadcast-exchange-127"
"gpu-broadcast-exchange-127":
  waiting to lock monitor 0x00007f3adc00d580 (object 0x0000000300fd86e8, a 
org.apache.spark.sql.execution.ExplainUtils$),
  which is held by "Thread-5"
 
Java stack information for the threads listed above:
===================================================
"Thread-5":
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries$lzycompute(QueryPlan.scala:450)
- waiting to lock <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries(QueryPlan.scala:450)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.innerChildren(QueryPlan.scala:533)
at 
org.apache.spark.sql.execution.ExplainUtils$.$anonfun$generateOperatorIDs$1(ExplainUtils.scala:155)
at 
org.apache.spark.sql.execution.ExplainUtils$.$anonfun$generateOperatorIDs$1$adapted(ExplainUtils.scala:141)
at 
org.apache.spark.sql.execution.ExplainUtils$$$Lambda$2707/0x00000008411aa840.apply(Unknown
 Source)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:294)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:294)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1702/0x0000000840cac040.apply(Unknown
 Source)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        ...
at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
at 
org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:87)
- locked <0x0000000300fd86e8> (a org.apache.spark.sql.execution.ExplainUtils$)
at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:730)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:255)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$4804/0x000000084191f840.apply$mcVJ$sp(Unknown
 Source)
at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:255)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$4778/0x000000084190d840.apply(Unknown
 Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:242)
- locked <0x000000030d336ba0> (a java.lang.Object)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:387)
at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:360)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$2041/0x0000000840f12c40.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
 Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$2031/0x0000000840f10c40.apply(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
- locked <0x0000000317df3270> (a org.apache.spark.sql.execution.QueryExecution)
at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
...
 
"gpu-broadcast-exchange-127":
at 
org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:80)
- waiting to lock <0x0000000300fd86e8> (a 
org.apache.spark.sql.execution.ExplainUtils$)
at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
 Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:3459)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles(DataSkippingReader.scala:1091)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles$(DataSkippingReader.scala:1090)
at 
org.apache.spark.sql.delta.Snapshot.convertDataFrameToAddFiles(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$2(DataSkippingReader.scala:850)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5257/0x0000000841a9e040.apply(Unknown
 Source)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$1(DataSkippingReader.scala:848)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5201/0x0000000841a71040.apply(Unknown
 Source)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.getDataSkippedFiles(DataSkippingReader.scala:828)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.getDataSkippedFiles$(DataSkippingReader.scala:824)
at org.apache.spark.sql.delta.Snapshot.getDataSkippedFiles(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$filesForScan$5(DataSkippingReader.scala:946)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5166/0x0000000841a58c40.apply(Unknown
 Source)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133)
at 
org.apache.spark.sql.delta.metering.DeltaLogging$$Lambda$2890/0x0000000841256440.apply(Unknown
 Source)
at 
com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
at 
com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
at org.apache.spark.sql.delta.Snapshot.recordOperation(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
at 
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
at org.apache.spark.sql.delta.Snapshot.recordDeltaOperation(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan(DataSkippingReader.scala:922)
at 
org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan$(DataSkippingReader.scala:867)
at org.apache.spark.sql.delta.Snapshot.filesForScan(Snapshot.scala:66)
at 
org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex.matchingFiles(PrepareDeltaScan.scala:362)
at 
org.apache.spark.sql.delta.files.TahoeFileIndex.listAddFiles(TahoeFileIndex.scala:72)
at 
org.apache.spark.sql.delta.files.TahoeFileIndex.listFiles(TahoeFileIndex.scala:64)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions$lzycompute(GpuFileSourceScanExec.scala:129)
- locked <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions(GpuFileSourceScanExec.scala:125)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions$lzycompute(GpuFileSourceScanExec.scala:160)
- locked <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions(GpuFileSourceScanExec.scala:141)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.$anonfun$createNonBucketedReadRDD$1(GpuFileSourceScanExec.scala:530)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec$$Lambda$5684/0x0000000841c32840.apply(Unknown
 Source)
at scala.Option.getOrElse(Option.scala:189)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.createNonBucketedReadRDD(GpuFileSourceScanExec.scala:527)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD$lzycompute(GpuFileSourceScanExec.scala:350)
- locked <0x0000000306482908> (a 
org.apache.spark.sql.rapids.GpuFileSourceScanExec)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD(GpuFileSourceScanExec.scala:328)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.internalDoExecuteColumnar(GpuFileSourceScanExec.scala:440)
at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar(GpuExec.scala:192)
at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar$(GpuExec.scala:190)
at 
org.apache.spark.sql.rapids.GpuFileSourceScanExec.doExecuteColumnar(GpuFileSourceScanExec.scala:67)
...
at 
com.nvidia.spark.rapids.GpuProjectExec.doExecuteColumnar(basicPhysicalOperators.scala:647)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222)
at 
org.apache.spark.sql.execution.SparkPlan$$Lambda$5680/0x0000000841c31040.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$Lambda$3411/0x000000084142c440.apply(Unknown
 Source)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$2(GpuBroadcastExchangeExec.scala:400)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5677/0x0000000841c2f840.apply(Unknown
 Source)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$1(GpuBroadcastExchangeExec.scala:399)
at 
org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5671/0x0000000841c2bc40.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217)
at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$5672/0x0000000841c2c040.call(Unknown
 Source)
at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
at java.lang.Thread.run([email protected]/Thread.java:829)
 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to