Repository: spark
Updated Branches:
  refs/heads/branch-2.3 1c39dfaef -> 38c0bd7db


[SPARK-23806] Broadcast.unpersist can cause fatal exception when used…

… with dynamic allocation

## What changes were proposed in this pull request?

ignore errors when you are waiting for a broadcast.unpersist. This is handling 
it the same way as doing rdd.unpersist in 
https://issues.apache.org/jira/browse/SPARK-22618

## How was this patch tested?

Patch was tested manually against a couple jobs that exhibit this behavior, 
with the change the application no longer dies due to this and just prints the 
warning.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Thomas Graves <[email protected]>

Closes #20924 from tgravescs/SPARK-23806.

(cherry picked from commit 641aec68e8167546dbb922874c086c9b90198f08)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38c0bd7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38c0bd7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38c0bd7d

Branch: refs/heads/branch-2.3
Commit: 38c0bd7db86ca2b7e167b89338028863bcc26906
Parents: 1c39dfa
Author: Thomas Graves <[email protected]>
Authored: Thu Mar 29 16:37:46 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu Mar 29 16:38:07 2018 +0800

----------------------------------------------------------------------
 .../spark/storage/BlockManagerMasterEndpoint.scala    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/38c0bd7d/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 89a6a71..56b95c3 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -192,11 +192,15 @@ class BlockManagerMasterEndpoint(
     val requiredBlockManagers = blockManagerInfo.values.filter { info =>
       removeFromDriver || !info.blockManagerId.isDriver
     }
-    Future.sequence(
-      requiredBlockManagers.map { bm =>
-        bm.slaveEndpoint.ask[Int](removeMsg)
-      }.toSeq
-    )
+    val futures = requiredBlockManagers.map { bm =>
+      bm.slaveEndpoint.ask[Int](removeMsg).recover {
+        case e: IOException =>
+          logWarning(s"Error trying to remove broadcast $broadcastId", e)
+          0 // zero blocks were removed
+      }
+    }.toSeq
+
+    Future.sequence(futures)
   }
 
   private def removeBlockManager(blockManagerId: BlockManagerId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to