This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new d4050d7  [SPARK-36782][CORE] Avoid blocking 
dispatcher-BlockManagerMaster during UpdateBlockInfo
d4050d7 is described below

commit d4050d7ee9df72dec238c3f6c9160343951fc5d6
Author: Fabian A.J. Thiele <[email protected]>
AuthorDate: Thu Sep 23 12:56:49 2021 +0800

    [SPARK-36782][CORE] Avoid blocking dispatcher-BlockManagerMaster during 
UpdateBlockInfo
    
    ### What changes were proposed in this pull request?
    Delegate potentially blocking call to `mapOutputTracker.updateMapOutput` 
from within  `UpdateBlockInfo` from `dispatcher-BlockManagerMaster` to the 
threadpool to avoid blocking the endpoint. This code path is only accessed for 
`ShuffleIndexBlockId`, other blocks are still executed on the 
`dispatcher-BlockManagerMaster` itself.
    
    Change `updateBlockInfo` to return `Future[Boolean]` instead of `Boolean`. 
Response will be sent to RPC caller upon successful completion of the future.
    
    Introduce a unit test that forces `MapOutputTracker` to make a broadcast as 
part of `MapOutputTracker.serializeOutputStatuses` when running decommission 
tests.
    
    ### Why are the changes needed?
    [SPARK-36782](https://issues.apache.org/jira/browse/SPARK-36782) describes 
a deadlock occurring if the `dispatcher-BlockManagerMaster` is allowed to block 
while waiting for write access to data structures.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test as introduced in this PR.
    
    ---
    
    Ping eejbyfeldt for notice.
    
    Closes #34043 from f-thiele/SPARK-36782.
    
    Lead-authored-by: Fabian A.J. Thiele <[email protected]>
    Co-authored-by: Emil Ejbyfeldt <[email protected]>
    Co-authored-by: Fabian A.J. Thiele <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
    (cherry picked from commit 4ea54e8672757c0dbe3dd57c81763afdffcbcc1b)
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../spark/storage/BlockManagerMasterEndpoint.scala | 37 ++++++++++++----------
 .../BlockManagerDecommissionIntegrationSuite.scala | 13 +++++++-
 2 files changed, 33 insertions(+), 17 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 322ffae..2225822 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -117,12 +117,15 @@ class BlockManagerMasterEndpoint(
 
     case _updateBlockInfo @
         UpdateBlockInfo(blockManagerId, blockId, storageLevel, 
deserializedSize, size) =>
-      val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel, 
deserializedSize, size)
-      context.reply(isSuccess)
-      // SPARK-30594: we should not post `SparkListenerBlockUpdated` when 
updateBlockInfo
-      // returns false since the block info would be updated again later.
-      if (isSuccess) {
-        
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
+      val response = updateBlockInfo(blockManagerId, blockId, storageLevel, 
deserializedSize, size)
+
+      response.foreach { isSuccess =>
+        // SPARK-30594: we should not post `SparkListenerBlockUpdated` when 
updateBlockInfo
+        // returns false since the block info would be updated again later.
+        if (isSuccess) {
+          
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
+        }
+        context.reply(isSuccess)
       }
 
     case GetLocations(blockId) =>
@@ -573,23 +576,25 @@ class BlockManagerMasterEndpoint(
       blockId: BlockId,
       storageLevel: StorageLevel,
       memSize: Long,
-      diskSize: Long): Boolean = {
+      diskSize: Long): Future[Boolean] = {
     logDebug(s"Updating block info on master ${blockId} for ${blockManagerId}")
 
     if (blockId.isShuffle) {
       blockId match {
         case ShuffleIndexBlockId(shuffleId, mapId, _) =>
           // We need to update this at index file because there exists the 
index-only block
-          logDebug(s"Received shuffle index block update for ${shuffleId} 
${mapId}, updating.")
-          mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
-          return true
+          return Future {
+            logDebug(s"Received shuffle index block update for ${shuffleId} 
${mapId}, updating.")
+            mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
+            true
+          }
         case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) =>
           logDebug(s"Received shuffle data block update for ${shuffleId} 
${mapId}, ignore.")
-          return true
+          return Future.successful(true)
         case _ =>
           logDebug(s"Unexpected shuffle block type ${blockId}" +
             s"as ${blockId.getClass().getSimpleName()}")
-          return false
+          return Future.successful(false)
       }
     }
 
@@ -597,15 +602,15 @@ class BlockManagerMasterEndpoint(
       if (blockManagerId.isDriver && !isLocal) {
         // We intentionally do not register the master (except in local mode),
         // so we should not indicate failure.
-        return true
+        return Future.successful(true)
       } else {
-        return false
+        return Future.successful(false)
       }
     }
 
     if (blockId == null) {
       blockManagerInfo(blockManagerId).updateLastSeenMs()
-      return true
+      return Future.successful(true)
     }
 
     blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, 
memSize, diskSize)
@@ -637,7 +642,7 @@ class BlockManagerMasterEndpoint(
     if (locations.size == 0) {
       blockLocations.remove(blockId)
     }
-    true
+    Future.successful(true)
   }
 
   private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 708d71b..ddec2ee 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -99,12 +99,22 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     runDecomTest(true, true, JobEnded)
   }
 
+  test(s"SPARK-36782 not deadlock if MapOutput uses broadcast") {
+    runDecomTest(false, true, JobEnded, forceMapOutputBroadcast = true)
+  }
+
   private def runDecomTest(
       persist: Boolean,
       shuffle: Boolean,
-      whenToDecom: String): Unit = {
+      whenToDecom: String,
+      forceMapOutputBroadcast: Boolean = false): Unit = {
     val migrateDuring = whenToDecom != JobEnded
     val master = s"local-cluster[${numExecs}, 1, 1024]"
+    val minBroadcastSize = if (forceMapOutputBroadcast) {
+      0
+    } else {
+      config.SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.defaultValue.get
+    }
     val conf = new SparkConf().setAppName("test").setMaster(master)
       .set(config.DECOMMISSION_ENABLED, true)
       .set(config.STORAGE_DECOMMISSION_ENABLED, true)
@@ -115,6 +125,7 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
       // Just replicate blocks quickly during testing, there isn't another
       // workload we need to worry about.
       .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
+      .set(config.SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, minBroadcastSize)
 
     if (whenToDecom == TaskStarted) {
       // We are using accumulators below, make sure those are reported 
frequently.

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to