Repository: spark Updated Branches: refs/heads/master ea2fdc0d2 -> 641aec68e
[SPARK-23806] Broadcast.unpersist can cause fatal exception when used⦠⦠with dynamic allocation ## What changes were proposed in this pull request? ignore errors when you are waiting for a broadcast.unpersist. This is handling it the same way as doing rdd.unpersist in https://issues.apache.org/jira/browse/SPARK-22618 ## How was this patch tested? Patch was tested manually against a couple jobs that exhibit this behavior, with the change the application no longer dies due to this and just prints the warning. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Thomas Graves <[email protected]> Closes #20924 from tgravescs/SPARK-23806. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/641aec68 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/641aec68 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/641aec68 Branch: refs/heads/master Commit: 641aec68e8167546dbb922874c086c9b90198f08 Parents: ea2fdc0 Author: Thomas Graves <[email protected]> Authored: Thu Mar 29 16:37:46 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Thu Mar 29 16:37:46 2018 +0800 ---------------------------------------------------------------------- .../spark/storage/BlockManagerMasterEndpoint.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/641aec68/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 89a6a71..56b95c3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -192,11 +192,15 @@ class BlockManagerMasterEndpoint( val requiredBlockManagers = blockManagerInfo.values.filter { info => removeFromDriver || !info.blockManagerId.isDriver } - Future.sequence( - requiredBlockManagers.map { bm => - bm.slaveEndpoint.ask[Int](removeMsg) - }.toSeq - ) + val futures = requiredBlockManagers.map { bm => + bm.slaveEndpoint.ask[Int](removeMsg).recover { + case e: IOException => + logWarning(s"Error trying to remove broadcast $broadcastId", e) + 0 // zero blocks were removed + } + }.toSeq + + Future.sequence(futures) } private def removeBlockManager(blockManagerId: BlockManagerId) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
