This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 57a77e593170 [SPARK-57041][SQL] Fix deadlock between waitForSubqueries 
and lazy val initialization in subquery
57a77e593170 is described below

commit 57a77e593170d5e2eef4d9334fc0bd2d6d5b6bb0
Author: Eric Yang <[email protected]>
AuthorDate: Tue May 26 18:57:24 2026 +0800

    [SPARK-57041][SQL] Fix deadlock between waitForSubqueries and lazy val 
initialization in subquery
    
    ### What changes were proposed in this pull request?
    Replace `synchronized` with an explicit `subqueryLock` object in 
`waitForSubqueries()` and `prepare()`.
    
    ### Why are the changes needed?
      `waitForSubqueries()` holds the plan's obj lock (with `synchronized`) 
while blocking on subquery futures. A concurrent thread computing a Scala lazy 
val on the same plan object (e.g. `FileSourceScanLike.metadata` via AQE's 
`onUpdatePlan` → `SparkPlanInfo.fromSparkPlan`) also needs the plan's obj lock, 
causing a deadlock.
    
    See the Jira https://issues.apache.org/jira/browse/SPARK-57041 for the 
thread dump when the deadlock happens.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added a UT case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes. Claude Code.
    
    Closes #56095 from jiwen624/SPARK-waitForSubqueries-deadlock-fix.
    
    Authored-by: Eric Yang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 66272e9cf69c30961684462be0f49c65c1f82015)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/execution/SparkPlan.scala |   8 +-
 .../spark/sql/execution/SparkPlanSuite.scala       | 106 ++++++++++++++++++++-
 2 files changed, 109 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index cf2d0218d0fd..4f8f66eb5969 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -277,6 +277,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   @transient
   private val runningSubqueries = new ArrayBuffer[ExecSubqueryExpression]
 
+  @transient private val prepareLock = new Object()
+
   /**
    * Finds scalar subquery expressions in this plan node and starts evaluating 
them.
    */
@@ -293,7 +295,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   /**
    * Blocks the thread until all subqueries finish evaluation and update the 
results.
    */
-  protected def waitForSubqueries(): Unit = synchronized {
+  protected def waitForSubqueries(): Unit = prepareLock.synchronized {
     // fill in the result of subqueries
     runningSubqueries.foreach { sub =>
       sub.updateResult()
@@ -312,7 +314,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   final def prepare(): Unit = {
     // doPrepare() may depend on it's children, we should call prepare() on 
all the children first.
     children.foreach(_.prepare())
-    synchronized {
+    prepareLock.synchronized {
       if (!prepared) {
         prepareSubqueries()
         doPrepare()
@@ -329,7 +331,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
    * @note `prepare` method has already walked down the tree, so the 
implementation doesn't have
    * to call children's `prepare` methods.
    *
-   * This will only be called once, protected by `this`.
+   * This will only be called once, protected by [[prepareLock]].
    */
   protected def doPrepare(): Unit = {}
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index 991a92dc8976..b167dd13dcbd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -17,16 +17,26 @@
 
 package org.apache.spark.sql.execution
 
+import java.lang.management.ManagementFactory
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+
 import org.apache.spark.{SparkEnv, SparkException, 
SparkUnsupportedOperationException}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Literal}
+import org.apache.spark.sql.catalyst.expressions.{
+  Attribute, AttributeReference, Expression, ExprId, Literal}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.catalyst.plans.logical.Deduplicate
+import org.apache.spark.sql.catalyst.trees.LeafLike
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.{DataType, IntegerType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.ThreadUtils
 
 class SparkPlanSuite extends SharedSparkSession {
 
@@ -168,6 +178,60 @@ class SparkPlanSuite extends SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-57041: waitForSubqueries must not hold the plan's monitor " +
+    "while awaiting subquery results") {
+    val enteredLatch = new CountDownLatch(1)
+    val releaseLatch = new CountDownLatch(1)
+
+    val subqueryExec = TestSubqueryExec(LocalTableScanExec(Nil, Nil, None))
+    val subqueryExpr = BlockingSubquery(subqueryExec, ExprId(0), enteredLatch, 
releaseLatch)
+    val plan = TestPlanWithSubquery(subqueryExpr)
+
+    val executor = 
ThreadUtils.newDaemonSingleThreadExecutor("test-wait-for-subqueries")
+    implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
+
+    plan.testPrepare()
+    val futureA = Future { plan.testWaitForSubqueries() }
+
+    try {
+      assert(enteredLatch.await(10, TimeUnit.SECONDS),
+        "Thread A did not enter updateResult() within 10s")
+
+      val threadB = new Thread(() => plan.synchronized {})
+      threadB.setDaemon(true)
+      threadB.start()
+
+      val bean = ManagementFactory.getThreadMXBean
+      val deadline = System.currentTimeMillis() + 5000L
+      var threadBBlocked = false
+      var waiting = true
+      while (waiting) {
+        if (!threadB.isAlive || System.currentTimeMillis() > deadline) {
+          waiting = false
+        } else {
+          val state = 
Option(bean.getThreadInfo(threadB.getId)).map(_.getThreadState).orNull
+          if (state == Thread.State.BLOCKED) {
+            threadBBlocked = true
+            waiting = false
+          } else if (state != null) {
+            Thread.sleep(1)
+          }
+        }
+      }
+
+      releaseLatch.countDown()
+      ThreadUtils.awaitResult(futureA, Duration(10, "seconds"))
+      threadB.join(5000L)
+
+      assert(!threadBBlocked,
+        "Deadlock: plan.this.synchronized could not be acquired while 
waitForSubqueries() was " +
+        "blocking on a subquery future. waitForSubqueries() must not hold the 
plan's monitor.")
+    } finally {
+      releaseLatch.countDown()
+      executor.shutdown()
+    }
+  }
 }
 
 case class ColumnarOp(child: SparkPlan) extends UnaryExecNode {
@@ -179,3 +243,41 @@ case class ColumnarOp(child: SparkPlan) extends 
UnaryExecNode {
   override protected def withNewChildInternal(newChild: SparkPlan): ColumnarOp 
=
     copy(child = newChild)
 }
+
+private case class TestSubqueryExec(child: SparkPlan) extends BaseSubqueryExec 
{
+  override def name: String = "TestSubqueryExec"
+  override def children: Seq[SparkPlan] = Seq(child)
+  override protected def doExecute(): RDD[InternalRow] = child.execute()
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[SparkPlan]): TestSubqueryExec = copy(child = 
newChildren.head)
+}
+
+private case class BlockingSubquery(
+    plan: BaseSubqueryExec,
+    exprId: ExprId,
+    enteredLatch: CountDownLatch,
+    releaseLatch: CountDownLatch)
+    extends ExecSubqueryExpression with LeafLike[Expression] {
+
+  override def dataType: DataType = IntegerType
+  override def nullable: Boolean = true
+  override def eval(input: InternalRow): Any = null
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    throw new UnsupportedOperationException("test only")
+  override def withNewPlan(plan: BaseSubqueryExec): ExecSubqueryExpression =
+    copy(plan = plan)
+
+  override def updateResult(): Unit = {
+    enteredLatch.countDown()
+    releaseLatch.await(30, TimeUnit.SECONDS)
+  }
+}
+
+private case class TestPlanWithSubquery(subqueryExpr: ExecSubqueryExpression)
+    extends LeafExecNode {
+  override def output: Seq[Attribute] = Nil
+  override protected def doExecute(): RDD[InternalRow] =
+    throw new UnsupportedOperationException("test only")
+  def testPrepare(): Unit = prepare()
+  def testWaitForSubqueries(): Unit = waitForSubqueries()
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to