This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 4bbaf3777e9 [SPARK-38675][CORE] Fix race during unlock in 
BlockInfoManager
4bbaf3777e9 is described below

commit 4bbaf3777e9cd90151ec526a05dd67aab22da403
Author: Herman van Hovell <her...@databricks.com>
AuthorDate: Thu Jun 2 16:48:11 2022 +0800

    [SPARK-38675][CORE] Fix race during unlock in BlockInfoManager
    
    ### What changes were proposed in this pull request?
    This PR fixes a race in the `BlockInfoManager` between `unlock` and 
`releaseAllLocksForTask`, resulting in a negative reader count for a block 
(which trips an assert). This happens when the following events take place:
    
    1. [THREAD 1] calls `releaseAllLocksForTask`. This starts by collecting all 
the blocks to be unlocked for this task.
    2. [THREAD 2] calls `unlock` for a read lock for the same task (this means 
the block is also in the list collected in step 1). It then proceeds to unlock 
the block by decrementing the reader count.
    3. [THREAD 1] now starts to release the collected locks, it does this by 
decrementing the readers counts for blocks by the number of acquired read 
locks. The problem is that step 2 made the lock counts for blocks incorrect, 
and we decrement by one (or a few) too many. This triggers a negative reader 
count assert.
    
    We fix this by adding a check to `unlock` that makes sure we are not in the 
process of unlocking. We do this by checking if there is a multiset associated 
with the task that contains the read locks.
    
    ### Why are the changes needed?
    It is a bug. Not fixing this can cause negative reader counts for blocks, 
and this causes task failures.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added a regression test in BlockInfoManager suite.
    
    Closes #35991 from hvanhovell/SPARK-38675.
    
    Authored-by: Herman van Hovell <her...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 078b505d2f0a0a4958dec7da816a7d672820b637)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/storage/BlockInfoManager.scala    | 15 +++++++----
 .../spark/storage/BlockInfoManagerSuite.scala      | 31 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 5392c20eefb..9eb1418fd16 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -360,12 +360,17 @@ private[storage] class BlockInfoManager extends Logging {
         info.writerTask = BlockInfo.NO_WRITER
         writeLocksByTask.get(taskAttemptId).remove(blockId)
       } else {
-        assert(info.readerCount > 0, s"Block $blockId is not locked for 
reading")
-        info.readerCount -= 1
+        // There can be a race between unlock and releaseAllLocksForTask which 
causes negative
+        // reader counts. We need to check if the readLocksByTask per tasks 
are present, if they
+        // are not then we know releaseAllLocksForTask has already cleaned up 
the read lock.
         val countsForTask = readLocksByTask.get(taskAttemptId)
-        val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
-        assert(newPinCountForTask >= 0,
-          s"Task $taskAttemptId release lock on block $blockId more times than 
it acquired it")
+        if (countsForTask != null) {
+          assert(info.readerCount > 0, s"Block $blockId is not locked for 
reading")
+          info.readerCount -= 1
+          val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
+          assert(newPinCountForTask >= 0,
+            s"Task $taskAttemptId release lock on block $blockId more times 
than it acquired it")
+        }
       }
       condition.signalAll()
     }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 8ffc6798526..887644a8264 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -360,4 +360,35 @@ class BlockInfoManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     blockInfoManager.releaseAllLocksForTask(0)
     assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries - 1)
   }
+
+  test("SPARK-38675 - concurrent unlock and releaseAllLocksForTask calls 
should not fail") {
+    // Create block
+    val blockId = TestBlockId("block")
+    assert(blockInfoManager.lockNewBlockForWriting(blockId, newBlockInfo()))
+    blockInfoManager.unlock(blockId)
+
+    // Without the fix the block below fails in 50% of the time. By executing 
it
+    // 10 times we increase the chance of failing to ~99.9%.
+    (0 to 10).foreach { task =>
+      withTaskId(task) {
+        blockInfoManager.registerTask(task)
+
+        // Acquire read locks
+        (0 to 50).foreach { _ =>
+          assert(blockInfoManager.lockForReading(blockId).isDefined)
+        }
+
+        // Asynchronously release read locks.
+        val futures = (0 to 50).map { _ =>
+          Future(blockInfoManager.unlock(blockId, Option(0L)))
+        }
+
+        // Remove all lock and hopefully don't hit an assertion error
+        blockInfoManager.releaseAllLocksForTask(task)
+
+        // Wait until all futures complete for the next iteration
+        futures.foreach(ThreadUtils.awaitReady(_, 100.millis))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to