This is an automated email from the ASF dual-hosted git repository.
sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 51209b11b0ea [SPARK-53807][CORE][FOLLOW-UP] Fix deadlock between
unlock and releaseAllLocksForTask for write lock in BlockInfoManager
51209b11b0ea is described below
commit 51209b11b0ea6104799dd8e877893ecee865d33f
Author: Yi Wu <[email protected]>
AuthorDate: Wed Mar 11 08:20:21 2026 +0900
[SPARK-53807][CORE][FOLLOW-UP] Fix deadlock between unlock and
releaseAllLocksForTask for write lock in BlockInfoManager
### What changes were proposed in this pull request?
This PR introduces `registerWriteLockForTask` /
`unregisterWriteLockForTask` helpers that use ConcurrentHashMap.compute /
computeIfPresent instead of operating directly on the synchronizedSet under a
block lock. Especially, with `ConcurrentHashMap.computeIfPresent`, we decouple
the lock contention between `blockIds.remove(blockId)` in `unblock` and
`writeLocks.forEach` in `releaseAllLocksForTask` (where `blockIds` and
`writeLocks` are actually same synchronized Set instance from `writ [...]
This PR has two small changes along with:
* Change from `writeLocksByTask.get(taskAttemptId)` to
`writeLocksByTask.get(info.writerTask)` in `unlock` for safety in case
`unblock` is onvoked out of the task context. This change should be safe as we
are holding the block's lock at the point.
* Relax the assertion `assert(info.writerTask == taskAttemptId)` in
`releaseAllLocksForTask` and changed to if condition since the concurrent
`unlock` could break this assertion after the fix.
### Why are the changes needed?
The race condition still exists between `unlock` and
`releaseAllLocksForTask` for write lock in `BlockInfoManager`. And the race
condition results in the deadlock in this way:
Thread 1 (releaseAllLocksForTask): writeLocks.forEach { ... } acquires the
set's intrinsic monitor for the entire iteration → then tries
blockInfo(blockId) to acquire the block's ReentrantLock
Thread 2 (unlock): holds the block's ReentrantLock (inside blockInfo) →
then tries blockIds.remove(blockId) which needs the same set's intrinsic monitor
The example thread stacks:
```
"pool-1-thread-1-ScalaTest-running-BlockInfoManagerSuite" prio=5 Id=17
RUNNABLE
at
java.management17.0.14/sun.management.ThreadImpl.dumpThreads0(Native Method)
at
java.management17.0.14/sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:528)
at
java.management17.0.14/sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:516)
at app//org.apache.spark.util.Utils$.getThreadDump(Utils.scala:2141)
at
app//org.apache.spark.storage.BlockInfoManager.$anonfun$releaseAllLocksForTask$2(BlockInfoManager.scala:500)
at
app//org.apache.spark.storage.BlockInfoManager$$Lambda$452/0x00000098012db008.accept(Unknown
Source)
at java.base17.0.14/java.lang.Iterable.forEach(Iterable.java:75)
at
java.base17.0.14/java.util.Collections$SynchronizedCollection.forEach(Collections.java:2131)
- **locked java.util.Collections$SynchronizedSet3e7652b2**
at
app//org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:495)
at
app//org.apache.spark.storage.BlockInfoManagerSuite.$anonfun$new$99(BlockInfoManagerSuite.scala:461)
at
app//org.apache.spark.storage.BlockInfoManagerSuite$$Lambda$439/0x00000098012c7c28.apply$mcV$sp(Unknown
Source)
at
app//scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at
app//org.apache.spark.storage.BlockInfoManagerSuite.withTaskId(BlockInfoManagerSuite.scala:66)
at
app//org.apache.spark.storage.BlockInfoManagerSuite.$anonfun$new$98(BlockInfoManagerSuite.scala:452)
...
at java.base17.0.14/java.lang.Thread.run(Thread.java:840)
Number of locked synchronizers = 1
- java.util.concurrent.ThreadPoolExecutor$Worker7ee8290b
```
```
"scala-execution-context-global-21" daemon prio=5 Id=21 BLOCKED on
java.util.Collections$SynchronizedSet3e7652b2owned by
"pool-1-thread-1-ScalaTest-running-BlockInfoManagerSuite"
at
java.base17.0.14/java.util.Collections$SynchronizedCollection.remove(Collections.java:2107)
- blocked on java.util.Collections$SynchronizedSet3e7652b2
at
app//org.apache.spark.storage.BlockInfoManager.$anonfun$unlock$3(BlockInfoManager.scala:404)
at
app//org.apache.spark.storage.BlockInfoManager.$anonfun$unlock$3$adapted(BlockInfoManager.scala:399)
at
app//org.apache.spark.storage.BlockInfoManager$$Lambda$437/0x00000098012cc000.apply(Unknown
Source)
at
app//org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:104)
at
app//org.apache.spark.storage.BlockInfoManager.blockInfo(BlockInfoManager.scala:280)
at
app//org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:399)
....
at
java.base17.0.14/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Number of locked synchronizers = 1
- java.util.concurrent.locks.ReentrantLock$NonfairSync7c2179cd
```
The deadlock could also be easily reproduce by inserting
`Thread.sleep(300)` right before `blockIds.remove(blockId)` in `unlock` and
then run unit test SPARK-53807 in `BlockInforManagerSuite`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually tested. Before the fix, SPARK-53807 unit test is esaily to fail
with the reproduce step. After the fix, the test pass as expected.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54720 from Ngone51/SPARK-SPARK-53807-followup.
Authored-by: Yi Wu <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
---
.../apache/spark/storage/BlockInfoManager.scala | 42 +++++++++++++++++-----
1 file changed, 34 insertions(+), 8 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index f551d23dd6d4..417705e40599 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -179,6 +179,33 @@ private[storage] class
BlockInfoManager(trackingCacheVisibility: Boolean = false
*/
private[this] val writeLocksByTask = new ConcurrentHashMap[TaskAttemptId,
util.Set[BlockId]]
+ /**
+ * Register a write lock on `blockId` for `taskAttemptId`. This method
creates a new entry
+ * for the task if none exist yet.
+ */
+ private def registerWriteLockForTask(taskAttemptId: TaskAttemptId, blockId:
BlockId): Unit = {
+ writeLocksByTask.compute(taskAttemptId, (_, blockIds) => {
+ val newBlockIds = if (blockIds == null) {
+ util.Collections.synchronizedSet(new util.HashSet[BlockId])
+ } else {
+ blockIds
+ }
+ newBlockIds.add(blockId)
+ newBlockIds
+ })
+ }
+
+ /**
+ * Unregister a write lock on `blockId` for `taskAttemptId`. The entry for
the task is
+ * cleaned up later by `releaseAllLocksForTask`.
+ */
+ private def unregisterWriteLockForTask(taskAttemptId: TaskAttemptId,
blockId: BlockId): Unit = {
+ writeLocksByTask.computeIfPresent(taskAttemptId, (_, blockIds) => {
+ blockIds.remove(blockId)
+ blockIds
+ })
+ }
+
/**
* Tracks the set of blocks that each task has locked for reading, along
with the number of times
* that a block has been locked (since our read locks are re-entrant).
@@ -333,7 +360,7 @@ private[storage] class
BlockInfoManager(trackingCacheVisibility: Boolean = false
val acquire = info.writerTask == BlockInfo.NO_WRITER && info.readerCount
== 0
if (acquire) {
info.writerTask = taskAttemptId
- writeLocksByTask.get(taskAttemptId).add(blockId)
+ registerWriteLockForTask(taskAttemptId, blockId)
logTrace(s"Task $taskAttemptId acquired write lock for $blockId")
}
acquire
@@ -396,10 +423,10 @@ private[storage] class
BlockInfoManager(trackingCacheVisibility: Boolean = false
logTrace(s"Task $taskAttemptId releasing lock for $blockId")
blockInfo(blockId) { (info, condition) =>
if (info.writerTask != BlockInfo.NO_WRITER) {
- val blockIds = writeLocksByTask.get(taskAttemptId)
+ val blockIds = writeLocksByTask.get(info.writerTask)
if (blockIds != null) {
+ unregisterWriteLockForTask(info.writerTask, blockId)
info.writerTask = BlockInfo.NO_WRITER
- blockIds.remove(blockId)
}
} else {
// There can be a race between unlock and releaseAllLocksForTask which
causes negative
@@ -492,11 +519,10 @@ private[storage] class
BlockInfoManager(trackingCacheVisibility: Boolean = false
val writeLocks =
Option(writeLocksByTask.remove(taskAttemptId)).getOrElse(util.Set.of())
writeLocks.forEach { blockId =>
blockInfo(blockId) { (info, condition) =>
- // Check the existence of `blockId` because `unlock` may have already
removed it
- // concurrently.
- if (writeLocks.contains(blockId)) {
+ // Check the existence of `blockId` and also if it is still held by
this task because
+ // `unlock` may have already released it concurrently. See SPARK-53807
for details.
+ if (writeLocks.contains(blockId) && info.writerTask == taskAttemptId) {
blocksWithReleasedLocks += blockId
- assert(info.writerTask == taskAttemptId)
info.writerTask = BlockInfo.NO_WRITER
condition.signalAll()
}
@@ -595,7 +621,7 @@ private[storage] class
BlockInfoManager(trackingCacheVisibility: Boolean = false
}
info.readerCount = 0
info.writerTask = BlockInfo.NO_WRITER
- writeLocksByTask.get(taskAttemptId).remove(blockId)
+ unregisterWriteLockForTask(taskAttemptId, blockId)
}
condition.signalAll()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]