[
https://issues.apache.org/jira/browse/SPARK-52520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17985755#comment-17985755
]
Kent Yao commented on SPARK-52520:
----------------------------------
Hi [~jihoonson],
Unfortunately, 3.4 is EOL. There won't be a new release for 3.4 anymore. You
can either backport the PR yourself to your fork or simply upgrade Spark to
3.5.2 or upper.
> Deadlock in the executor with delta input and AQE on
> ----------------------------------------------------
>
> Key: SPARK-52520
> URL: https://issues.apache.org/jira/browse/SPARK-52520
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.3, 3.4.4
> Reporter: Jihoon Son
> Priority: Major
>
> A deadlock was found while running a query, which got stuck while scheduling
> next stages to run. Per stack traces, I believe this is what happened. Note
> that I used the Spark-RAPIDS plugin.
> * Thread A (Thread-5 in the stack traces below) acquired a lock on
> ExplainUtils while running AdaptiveSparkPlanExec. With the lock acquired, it
> tried to materialize all subqueries in the plan. It was waiting to lock
> GpuFileSourceScanExec.
> * Thread B (gpu-broadcast-exchange-127 in the below) acquired a lock on
> GpuFileSourceScanExec.selectedPartitions, which is a lazy val. While
> evaluating the lazy val, it tried to update delta log, which triggered
> evaluating a data frame (because of delta). Evaluating a data frame required
> to explain the query plan which required to lock ExplainUtils.
> Even though I am seeing this issue with the Spark-RAPIDS plugin, I believe
> that Apache Spark has the same issue as the function call hierarchy is almost
> identical. The FileSourceScanExec class in Apache Spark also has the
> [selectedPartitions lazy
> val|https://github.com/apache/spark/blob/v3.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L251-L262],
> which is computed in the same way as in the [Spark-RAPIDS
> plugin|https://github.com/NVIDIA/spark-rapids/blob/v25.04.0/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala#L125-L136].
>
> I noticed that this problem doesn't exist in Spark 3.5.x as the
> synchronization on ExplainUtils was removed in
> https://issues.apache.org/jira/browse/SPARK-48610. Can we consider
> backporting this fix to the 3.4 branch as well?
> Here are more details of the deadlock.
>
> Found one Java-level deadlock:
> =============================
> "Thread-5":
> waiting to lock monitor 0x00007f3cbc005280 (object 0x0000000306482908, a
> org.apache.spark.sql.rapids.GpuFileSourceScanExec),
> which is held by "gpu-broadcast-exchange-127"
> "gpu-broadcast-exchange-127":
> waiting to lock monitor 0x00007f3adc00d580 (object 0x0000000300fd86e8, a
> org.apache.spark.sql.execution.ExplainUtils$),
> which is held by "Thread-5"
>
> Java stack information for the threads listed above:
> ===================================================
> "Thread-5":
> at
> org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries$lzycompute(QueryPlan.scala:450)
> - waiting to lock <0x0000000306482908> (a
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at
> org.apache.spark.sql.catalyst.plans.QueryPlan.subqueries(QueryPlan.scala:450)
> at
> org.apache.spark.sql.catalyst.plans.QueryPlan.innerChildren(QueryPlan.scala:533)
> ...
> at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
> at
> org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:87)
> - locked <0x0000000300fd86e8> (a
> org.apache.spark.sql.execution.ExplainUtils$)
> at
> org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
> at
> org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
> at
> org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
> at
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:730)
> at
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:255)
> ...
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
> at
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:242)
> - locked <0x000000030d336ba0> (a java.lang.Object)
> at
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:387)
> at
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:360)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$2041/0x0000000840f12c40.apply(Unknown
> Source)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
> at
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
> Source)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
> Source)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
> ...
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
> at
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
> - locked <0x0000000317df3270> (a
> org.apache.spark.sql.execution.QueryExecution)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
> at
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
> ...
>
> "gpu-broadcast-exchange-127":
> at
> org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:80)
> - waiting to lock <0x0000000300fd86e8> (a
> org.apache.spark.sql.execution.ExplainUtils$)
> at
> org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
> at
> org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
> at
> org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
> at
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2051/0x0000000840f24840.apply(Unknown
> Source)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$2042/0x0000000840f11840.apply(Unknown
> Source)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206)
> at org.apache.spark.sql.Dataset.collect(Dataset.scala:3459)
> at
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles(DataSkippingReader.scala:1091)
> at
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.convertDataFrameToAddFiles$(DataSkippingReader.scala:1090)
> at
> org.apache.spark.sql.delta.Snapshot.convertDataFrameToAddFiles(Snapshot.scala:66)
> at
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$2(DataSkippingReader.scala:850)
> at
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5257/0x0000000841a9e040.apply(Unknown
> Source)
> at
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
> at
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
> at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:66)
> at
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$anonfun$getDataSkippedFiles$1(DataSkippingReader.scala:848)
> at
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase$$Lambda$5201/0x0000000841a71040.apply(Unknown
> Source)
> at
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
> at
> org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
> ...
> at
> com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
> at
> com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
> at org.apache.spark.sql.delta.Snapshot.recordOperation(Snapshot.scala:66)
> at
> org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
> at
> org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
> at
> org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
> at org.apache.spark.sql.delta.Snapshot.recordDeltaOperation(Snapshot.scala:66)
> at
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan(DataSkippingReader.scala:922)
> at
> org.apache.spark.sql.delta.stats.DataSkippingReaderBase.filesForScan$(DataSkippingReader.scala:867)
> at org.apache.spark.sql.delta.Snapshot.filesForScan(Snapshot.scala:66)
> at
> org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex.matchingFiles(PrepareDeltaScan.scala:362)
> at
> org.apache.spark.sql.delta.files.TahoeFileIndex.listAddFiles(TahoeFileIndex.scala:72)
> at
> org.apache.spark.sql.delta.files.TahoeFileIndex.listFiles(TahoeFileIndex.scala:64)
> at
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions$lzycompute(GpuFileSourceScanExec.scala:129)
> - locked <0x0000000306482908> (a
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.selectedPartitions(GpuFileSourceScanExec.scala:125)
> at
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions$lzycompute(GpuFileSourceScanExec.scala:160)
> - locked <0x0000000306482908> (a
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.dynamicallySelectedPartitions(GpuFileSourceScanExec.scala:141)
> at
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.$anonfun$createNonBucketedReadRDD$1(GpuFileSourceScanExec.scala:530)
> at
> org.apache.spark.sql.rapids.GpuFileSourceScanExec$$Lambda$5684/0x0000000841c32840.apply(Unknown
> Source)
> at scala.Option.getOrElse(Option.scala:189)
> at
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.createNonBucketedReadRDD(GpuFileSourceScanExec.scala:527)
> at
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD$lzycompute(GpuFileSourceScanExec.scala:350)
> - locked <0x0000000306482908> (a
> org.apache.spark.sql.rapids.GpuFileSourceScanExec)
> at
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.inputRDD(GpuFileSourceScanExec.scala:328)
> at
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.internalDoExecuteColumnar(GpuFileSourceScanExec.scala:440)
> at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar(GpuExec.scala:192)
> at com.nvidia.spark.rapids.GpuExec.doExecuteColumnar$(GpuExec.scala:190)
> at
> org.apache.spark.sql.rapids.GpuFileSourceScanExec.doExecuteColumnar(GpuFileSourceScanExec.scala:67)
> ...
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
> at
> org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
> at
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$2(GpuBroadcastExchangeExec.scala:400)
> at
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5677/0x0000000841c2f840.apply(Unknown
> Source)
> at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
> at
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.$anonfun$relationFuture$1(GpuBroadcastExchangeExec.scala:399)
> at
> org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$Lambda$5671/0x0000000841c2bc40.apply(Unknown
> Source)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217)
> at
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$5672/0x0000000841c2c040.call(Unknown
> Source)
> at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
> at java.lang.Thread.run([email protected]/Thread.java:829)
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]