This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new d4050d7 [SPARK-36782][CORE] Avoid blocking
dispatcher-BlockManagerMaster during UpdateBlockInfo
d4050d7 is described below
commit d4050d7ee9df72dec238c3f6c9160343951fc5d6
Author: Fabian A.J. Thiele <[email protected]>
AuthorDate: Thu Sep 23 12:56:49 2021 +0800
[SPARK-36782][CORE] Avoid blocking dispatcher-BlockManagerMaster during
UpdateBlockInfo
### What changes were proposed in this pull request?
Delegate potentially blocking call to `mapOutputTracker.updateMapOutput`
from within `UpdateBlockInfo` from `dispatcher-BlockManagerMaster` to the
threadpool to avoid blocking the endpoint. This code path is only accessed for
`ShuffleIndexBlockId`, other blocks are still executed on the
`dispatcher-BlockManagerMaster` itself.
Change `updateBlockInfo` to return `Future[Boolean]` instead of `Boolean`.
Response will be sent to RPC caller upon successful completion of the future.
Introduce a unit test that forces `MapOutputTracker` to make a broadcast as
part of `MapOutputTracker.serializeOutputStatuses` when running decommission
tests.
### Why are the changes needed?
[SPARK-36782](https://issues.apache.org/jira/browse/SPARK-36782) describes
a deadlock occurring if the `dispatcher-BlockManagerMaster` is allowed to block
while waiting for write access to data structures.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test as introduced in this PR.
---
Ping eejbyfeldt for notice.
Closes #34043 from f-thiele/SPARK-36782.
Lead-authored-by: Fabian A.J. Thiele <[email protected]>
Co-authored-by: Emil Ejbyfeldt <[email protected]>
Co-authored-by: Fabian A.J. Thiele <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit 4ea54e8672757c0dbe3dd57c81763afdffcbcc1b)
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/storage/BlockManagerMasterEndpoint.scala | 37 ++++++++++++----------
.../BlockManagerDecommissionIntegrationSuite.scala | 13 +++++++-
2 files changed, 33 insertions(+), 17 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 322ffae..2225822 100644
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -117,12 +117,15 @@ class BlockManagerMasterEndpoint(
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel,
deserializedSize, size) =>
- val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel,
deserializedSize, size)
- context.reply(isSuccess)
- // SPARK-30594: we should not post `SparkListenerBlockUpdated` when
updateBlockInfo
- // returns false since the block info would be updated again later.
- if (isSuccess) {
-
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
+ val response = updateBlockInfo(blockManagerId, blockId, storageLevel,
deserializedSize, size)
+
+ response.foreach { isSuccess =>
+ // SPARK-30594: we should not post `SparkListenerBlockUpdated` when
updateBlockInfo
+ // returns false since the block info would be updated again later.
+ if (isSuccess) {
+
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
+ }
+ context.reply(isSuccess)
}
case GetLocations(blockId) =>
@@ -573,23 +576,25 @@ class BlockManagerMasterEndpoint(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long): Boolean = {
+ diskSize: Long): Future[Boolean] = {
logDebug(s"Updating block info on master ${blockId} for ${blockManagerId}")
if (blockId.isShuffle) {
blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
// We need to update this at index file because there exists the
index-only block
- logDebug(s"Received shuffle index block update for ${shuffleId}
${mapId}, updating.")
- mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
- return true
+ return Future {
+ logDebug(s"Received shuffle index block update for ${shuffleId}
${mapId}, updating.")
+ mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
+ true
+ }
case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) =>
logDebug(s"Received shuffle data block update for ${shuffleId}
${mapId}, ignore.")
- return true
+ return Future.successful(true)
case _ =>
logDebug(s"Unexpected shuffle block type ${blockId}" +
s"as ${blockId.getClass().getSimpleName()}")
- return false
+ return Future.successful(false)
}
}
@@ -597,15 +602,15 @@ class BlockManagerMasterEndpoint(
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
- return true
+ return Future.successful(true)
} else {
- return false
+ return Future.successful(false)
}
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
- return true
+ return Future.successful(true)
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel,
memSize, diskSize)
@@ -637,7 +642,7 @@ class BlockManagerMasterEndpoint(
if (locations.size == 0) {
blockLocations.remove(blockId)
}
- true
+ Future.successful(true)
}
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 708d71b..ddec2ee 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -99,12 +99,22 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
runDecomTest(true, true, JobEnded)
}
+ test(s"SPARK-36782 not deadlock if MapOutput uses broadcast") {
+ runDecomTest(false, true, JobEnded, forceMapOutputBroadcast = true)
+ }
+
private def runDecomTest(
persist: Boolean,
shuffle: Boolean,
- whenToDecom: String): Unit = {
+ whenToDecom: String,
+ forceMapOutputBroadcast: Boolean = false): Unit = {
val migrateDuring = whenToDecom != JobEnded
val master = s"local-cluster[${numExecs}, 1, 1024]"
+ val minBroadcastSize = if (forceMapOutputBroadcast) {
+ 0
+ } else {
+ config.SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.defaultValue.get
+ }
val conf = new SparkConf().setAppName("test").setMaster(master)
.set(config.DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
@@ -115,6 +125,7 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// Just replicate blocks quickly during testing, there isn't another
// workload we need to worry about.
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
+ .set(config.SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, minBroadcastSize)
if (whenToDecom == TaskStarted) {
// We are using accumulators below, make sure those are reported
frequently.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]