This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 51209b11b0ea [SPARK-53807][CORE][FOLLOW-UP] Fix deadlock between 
unlock and releaseAllLocksForTask for write lock in BlockInfoManager
51209b11b0ea is described below

commit 51209b11b0ea6104799dd8e877893ecee865d33f
Author: Yi Wu <[email protected]>
AuthorDate: Wed Mar 11 08:20:21 2026 +0900

    [SPARK-53807][CORE][FOLLOW-UP] Fix deadlock between unlock and 
releaseAllLocksForTask for write lock in BlockInfoManager
    
    ### What changes were proposed in this pull request?
    
    This PR introduces `registerWriteLockForTask` / 
`unregisterWriteLockForTask` helpers that use ConcurrentHashMap.compute / 
computeIfPresent instead of operating directly on the synchronizedSet under a 
block lock. Especially, with `ConcurrentHashMap.computeIfPresent`, we decouple 
the lock contention between `blockIds.remove(blockId)` in `unblock` and 
`writeLocks.forEach` in `releaseAllLocksForTask` (where `blockIds` and 
`writeLocks` are actually same synchronized Set instance from `writ [...]
    
    This PR has two small changes along with:
    
    * Change from `writeLocksByTask.get(taskAttemptId)` to 
`writeLocksByTask.get(info.writerTask)` in `unlock` for safety in case 
`unblock` is onvoked out of the task context. This change should be safe as we 
are holding the block's lock at the point.
    
    * Relax the assertion `assert(info.writerTask == taskAttemptId)` in 
`releaseAllLocksForTask` and changed to if condition since the concurrent 
`unlock` could break this assertion after the fix.
    
    ### Why are the changes needed?
    
    The race condition still exists between `unlock` and 
`releaseAllLocksForTask` for write lock in `BlockInfoManager`. And the race 
condition results in the deadlock in this way:
    
    Thread 1 (releaseAllLocksForTask): writeLocks.forEach { ... } acquires the 
set's intrinsic monitor for the entire iteration → then tries 
blockInfo(blockId) to acquire the block's ReentrantLock
    Thread 2 (unlock): holds the block's ReentrantLock (inside blockInfo) → 
then tries blockIds.remove(blockId) which needs the same set's intrinsic monitor
    
    The example thread stacks:
    
    ```
    "pool-1-thread-1-ScalaTest-running-BlockInfoManagerSuite" prio=5 Id=17 
RUNNABLE
            at 
java.management17.0.14/sun.management.ThreadImpl.dumpThreads0(Native Method)
            at 
java.management17.0.14/sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:528)
            at 
java.management17.0.14/sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:516)
            at app//org.apache.spark.util.Utils$.getThreadDump(Utils.scala:2141)
            at 
app//org.apache.spark.storage.BlockInfoManager.$anonfun$releaseAllLocksForTask$2(BlockInfoManager.scala:500)
            at 
app//org.apache.spark.storage.BlockInfoManager$$Lambda$452/0x00000098012db008.accept(Unknown
 Source)
            at java.base17.0.14/java.lang.Iterable.forEach(Iterable.java:75)
            at 
java.base17.0.14/java.util.Collections$SynchronizedCollection.forEach(Collections.java:2131)
            -  **locked java.util.Collections$SynchronizedSet3e7652b2**
            at 
app//org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:495)
            at 
app//org.apache.spark.storage.BlockInfoManagerSuite.$anonfun$new$99(BlockInfoManagerSuite.scala:461)
            at 
app//org.apache.spark.storage.BlockInfoManagerSuite$$Lambda$439/0x00000098012c7c28.apply$mcV$sp(Unknown
 Source)
            at 
app//scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
            at 
app//org.apache.spark.storage.BlockInfoManagerSuite.withTaskId(BlockInfoManagerSuite.scala:66)
            at 
app//org.apache.spark.storage.BlockInfoManagerSuite.$anonfun$new$98(BlockInfoManagerSuite.scala:452)
            ...
            at java.base17.0.14/java.lang.Thread.run(Thread.java:840)
    
            Number of locked synchronizers = 1
            - java.util.concurrent.ThreadPoolExecutor$Worker7ee8290b
    ```
    
    ```
    "scala-execution-context-global-21" daemon prio=5 Id=21 BLOCKED on 
java.util.Collections$SynchronizedSet3e7652b2owned by 
"pool-1-thread-1-ScalaTest-running-BlockInfoManagerSuite"
            at 
java.base17.0.14/java.util.Collections$SynchronizedCollection.remove(Collections.java:2107)
            -  blocked on java.util.Collections$SynchronizedSet3e7652b2
            at 
app//org.apache.spark.storage.BlockInfoManager.$anonfun$unlock$3(BlockInfoManager.scala:404)
            at 
app//org.apache.spark.storage.BlockInfoManager.$anonfun$unlock$3$adapted(BlockInfoManager.scala:399)
            at 
app//org.apache.spark.storage.BlockInfoManager$$Lambda$437/0x00000098012cc000.apply(Unknown
 Source)
            at 
app//org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:104)
            at 
app//org.apache.spark.storage.BlockInfoManager.blockInfo(BlockInfoManager.scala:280)
            at 
app//org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:399)
            ....
            at 
java.base17.0.14/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
    
            Number of locked synchronizers = 1
            - java.util.concurrent.locks.ReentrantLock$NonfairSync7c2179cd
    ```
    
    The deadlock could also be easily reproduce by inserting 
`Thread.sleep(300)` right before `blockIds.remove(blockId)` in `unlock` and 
then run unit test SPARK-53807 in `BlockInforManagerSuite`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually tested. Before the fix, SPARK-53807 unit test is esaily to fail 
with the reproduce step. After the fix, the test pass as expected.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54720 from Ngone51/SPARK-SPARK-53807-followup.
    
    Authored-by: Yi Wu <[email protected]>
    Signed-off-by: Kousuke Saruta <[email protected]>
---
 .../apache/spark/storage/BlockInfoManager.scala    | 42 +++++++++++++++++-----
 1 file changed, 34 insertions(+), 8 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index f551d23dd6d4..417705e40599 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -179,6 +179,33 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
    */
   private[this] val writeLocksByTask = new ConcurrentHashMap[TaskAttemptId, 
util.Set[BlockId]]
 
+  /**
+   * Register a write lock on `blockId` for `taskAttemptId`. This method 
creates a new entry
+   * for the task if none exist yet.
+   */
+  private def registerWriteLockForTask(taskAttemptId: TaskAttemptId, blockId: 
BlockId): Unit = {
+    writeLocksByTask.compute(taskAttemptId, (_, blockIds) => {
+      val newBlockIds = if (blockIds == null) {
+        util.Collections.synchronizedSet(new util.HashSet[BlockId])
+      } else {
+        blockIds
+      }
+      newBlockIds.add(blockId)
+      newBlockIds
+    })
+  }
+
+  /**
+   * Unregister a write lock on `blockId` for `taskAttemptId`. The entry for 
the task is
+   * cleaned up later by `releaseAllLocksForTask`.
+   */
+  private def unregisterWriteLockForTask(taskAttemptId: TaskAttemptId, 
blockId: BlockId): Unit = {
+    writeLocksByTask.computeIfPresent(taskAttemptId, (_, blockIds) => {
+      blockIds.remove(blockId)
+      blockIds
+    })
+  }
+
   /**
    * Tracks the set of blocks that each task has locked for reading, along 
with the number of times
    * that a block has been locked (since our read locks are re-entrant).
@@ -333,7 +360,7 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
       val acquire = info.writerTask == BlockInfo.NO_WRITER && info.readerCount 
== 0
       if (acquire) {
         info.writerTask = taskAttemptId
-        writeLocksByTask.get(taskAttemptId).add(blockId)
+        registerWriteLockForTask(taskAttemptId, blockId)
         logTrace(s"Task $taskAttemptId acquired write lock for $blockId")
       }
       acquire
@@ -396,10 +423,10 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
     logTrace(s"Task $taskAttemptId releasing lock for $blockId")
     blockInfo(blockId) { (info, condition) =>
       if (info.writerTask != BlockInfo.NO_WRITER) {
-        val blockIds = writeLocksByTask.get(taskAttemptId)
+        val blockIds = writeLocksByTask.get(info.writerTask)
         if (blockIds != null) {
+          unregisterWriteLockForTask(info.writerTask, blockId)
           info.writerTask = BlockInfo.NO_WRITER
-          blockIds.remove(blockId)
         }
       } else {
         // There can be a race between unlock and releaseAllLocksForTask which 
causes negative
@@ -492,11 +519,10 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
     val writeLocks = 
Option(writeLocksByTask.remove(taskAttemptId)).getOrElse(util.Set.of())
     writeLocks.forEach { blockId =>
       blockInfo(blockId) { (info, condition) =>
-        // Check the existence of `blockId` because `unlock` may have already 
removed it
-        // concurrently.
-        if (writeLocks.contains(blockId)) {
+        // Check the existence of `blockId` and also if it is still held by 
this task because
+        // `unlock` may have already released it concurrently. See SPARK-53807 
for details.
+        if (writeLocks.contains(blockId) && info.writerTask == taskAttemptId) {
           blocksWithReleasedLocks += blockId
-          assert(info.writerTask == taskAttemptId)
           info.writerTask = BlockInfo.NO_WRITER
           condition.signalAll()
         }
@@ -595,7 +621,7 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
         }
         info.readerCount = 0
         info.writerTask = BlockInfo.NO_WRITER
-        writeLocksByTask.get(taskAttemptId).remove(blockId)
+        unregisterWriteLockForTask(taskAttemptId, blockId)
       }
       condition.signalAll()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to