This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 66272e9cf69c [SPARK-57041][SQL] Fix deadlock between waitForSubqueries
and lazy val initialization in subquery
66272e9cf69c is described below
commit 66272e9cf69c30961684462be0f49c65c1f82015
Author: Eric Yang <[email protected]>
AuthorDate: Tue May 26 18:57:24 2026 +0800
[SPARK-57041][SQL] Fix deadlock between waitForSubqueries and lazy val
initialization in subquery
### What changes were proposed in this pull request?
Replace `synchronized` with an explicit `subqueryLock` object in
`waitForSubqueries()` and `prepare()`.
### Why are the changes needed?
`waitForSubqueries()` holds the plan's obj lock (with `synchronized`)
while blocking on subquery futures. A concurrent thread computing a Scala lazy
val on the same plan object (e.g. `FileSourceScanLike.metadata` via AQE's
`onUpdatePlan` → `SparkPlanInfo.fromSparkPlan`) also needs the plan's obj lock,
causing a deadlock.
See the Jira https://issues.apache.org/jira/browse/SPARK-57041 for the
thread dump when the deadlock happens.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a UT case.
### Was this patch authored or co-authored using generative AI tooling?
Yes. Claude Code.
Closes #56095 from jiwen624/SPARK-waitForSubqueries-deadlock-fix.
Authored-by: Eric Yang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/execution/SparkPlan.scala | 8 +-
.../spark/sql/execution/SparkPlanSuite.scala | 106 ++++++++++++++++++++-
2 files changed, 109 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index cf2d0218d0fd..4f8f66eb5969 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -277,6 +277,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
@transient
private val runningSubqueries = new ArrayBuffer[ExecSubqueryExpression]
+ @transient private val prepareLock = new Object()
+
/**
* Finds scalar subquery expressions in this plan node and starts evaluating
them.
*/
@@ -293,7 +295,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
/**
* Blocks the thread until all subqueries finish evaluation and update the
results.
*/
- protected def waitForSubqueries(): Unit = synchronized {
+ protected def waitForSubqueries(): Unit = prepareLock.synchronized {
// fill in the result of subqueries
runningSubqueries.foreach { sub =>
sub.updateResult()
@@ -312,7 +314,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
final def prepare(): Unit = {
// doPrepare() may depend on it's children, we should call prepare() on
all the children first.
children.foreach(_.prepare())
- synchronized {
+ prepareLock.synchronized {
if (!prepared) {
prepareSubqueries()
doPrepare()
@@ -329,7 +331,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
* @note `prepare` method has already walked down the tree, so the
implementation doesn't have
* to call children's `prepare` methods.
*
- * This will only be called once, protected by `this`.
+ * This will only be called once, protected by [[prepareLock]].
*/
protected def doPrepare(): Unit = {}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index 991a92dc8976..b167dd13dcbd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -17,16 +17,26 @@
package org.apache.spark.sql.execution
+import java.lang.management.ManagementFactory
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+
import org.apache.spark.{SparkEnv, SparkException,
SparkUnsupportedOperationException}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Literal}
+import org.apache.spark.sql.catalyst.expressions.{
+ Attribute, AttributeReference, Expression, ExprId, Literal}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.Deduplicate
+import org.apache.spark.sql.catalyst.trees.LeafLike
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.{DataType, IntegerType}
import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.ThreadUtils
class SparkPlanSuite extends SharedSparkSession {
@@ -168,6 +178,60 @@ class SparkPlanSuite extends SharedSparkSession {
}
}
}
+
+ test("SPARK-57041: waitForSubqueries must not hold the plan's monitor " +
+ "while awaiting subquery results") {
+ val enteredLatch = new CountDownLatch(1)
+ val releaseLatch = new CountDownLatch(1)
+
+ val subqueryExec = TestSubqueryExec(LocalTableScanExec(Nil, Nil, None))
+ val subqueryExpr = BlockingSubquery(subqueryExec, ExprId(0), enteredLatch,
releaseLatch)
+ val plan = TestPlanWithSubquery(subqueryExpr)
+
+ val executor =
ThreadUtils.newDaemonSingleThreadExecutor("test-wait-for-subqueries")
+ implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
+
+ plan.testPrepare()
+ val futureA = Future { plan.testWaitForSubqueries() }
+
+ try {
+ assert(enteredLatch.await(10, TimeUnit.SECONDS),
+ "Thread A did not enter updateResult() within 10s")
+
+ val threadB = new Thread(() => plan.synchronized {})
+ threadB.setDaemon(true)
+ threadB.start()
+
+ val bean = ManagementFactory.getThreadMXBean
+ val deadline = System.currentTimeMillis() + 5000L
+ var threadBBlocked = false
+ var waiting = true
+ while (waiting) {
+ if (!threadB.isAlive || System.currentTimeMillis() > deadline) {
+ waiting = false
+ } else {
+ val state =
Option(bean.getThreadInfo(threadB.getId)).map(_.getThreadState).orNull
+ if (state == Thread.State.BLOCKED) {
+ threadBBlocked = true
+ waiting = false
+ } else if (state != null) {
+ Thread.sleep(1)
+ }
+ }
+ }
+
+ releaseLatch.countDown()
+ ThreadUtils.awaitResult(futureA, Duration(10, "seconds"))
+ threadB.join(5000L)
+
+ assert(!threadBBlocked,
+ "Deadlock: plan.this.synchronized could not be acquired while
waitForSubqueries() was " +
+ "blocking on a subquery future. waitForSubqueries() must not hold the
plan's monitor.")
+ } finally {
+ releaseLatch.countDown()
+ executor.shutdown()
+ }
+ }
}
case class ColumnarOp(child: SparkPlan) extends UnaryExecNode {
@@ -179,3 +243,41 @@ case class ColumnarOp(child: SparkPlan) extends
UnaryExecNode {
override protected def withNewChildInternal(newChild: SparkPlan): ColumnarOp
=
copy(child = newChild)
}
+
+private case class TestSubqueryExec(child: SparkPlan) extends BaseSubqueryExec
{
+ override def name: String = "TestSubqueryExec"
+ override def children: Seq[SparkPlan] = Seq(child)
+ override protected def doExecute(): RDD[InternalRow] = child.execute()
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[SparkPlan]): TestSubqueryExec = copy(child =
newChildren.head)
+}
+
+private case class BlockingSubquery(
+ plan: BaseSubqueryExec,
+ exprId: ExprId,
+ enteredLatch: CountDownLatch,
+ releaseLatch: CountDownLatch)
+ extends ExecSubqueryExpression with LeafLike[Expression] {
+
+ override def dataType: DataType = IntegerType
+ override def nullable: Boolean = true
+ override def eval(input: InternalRow): Any = null
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode =
+ throw new UnsupportedOperationException("test only")
+ override def withNewPlan(plan: BaseSubqueryExec): ExecSubqueryExpression =
+ copy(plan = plan)
+
+ override def updateResult(): Unit = {
+ enteredLatch.countDown()
+ releaseLatch.await(30, TimeUnit.SECONDS)
+ }
+}
+
+private case class TestPlanWithSubquery(subqueryExpr: ExecSubqueryExpression)
+ extends LeafExecNode {
+ override def output: Seq[Attribute] = Nil
+ override protected def doExecute(): RDD[InternalRow] =
+ throw new UnsupportedOperationException("test only")
+ def testPrepare(): Unit = prepare()
+ def testWaitForSubqueries(): Unit = waitForSubqueries()
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]