Repository: spark
Updated Branches:
  refs/heads/branch-0.9 3fba7b7bc -> 6665df6b7


[SPARK-4006] Block Manager - Double Register Crash

This issue affects all versions since 0.7 up to (including) 1.1

In long running contexts, we encountered the situation of double register 
without a remove in between. The cause for that is unknown, and assumed a temp 
network issue.

However, since the second register is with a BlockManagerId on a different 
port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor 
returns Some. This inconsistency is caught in a conditional statement that does 
System.exit(1), which is a huge robustness issue for us.

The fix - simply remove the old id from both maps during register when this 
happens. We are mimicking the behavior of expireDeadHosts(), by doing local 
cleanup of the maps before trying to add new ones.

Also - added some logging for register and unregister.

https://issues.apache.org/jira/browse/SPARK-4006

Author: Tal Sliwowicz <[email protected]>

Closes #2854 from tsliwowicz/branch-0.9.2-block-mgr-removal and squashes the 
following commits:

95ae4db [Tal Sliwowicz] [SPARK-4006] In long running contexts, we encountered 
the situation of double registe...
81d69f0 [Tal Sliwowicz] fixed comment
efd93f2 [Tal Sliwowicz] In long running contexts, we encountered the situation 
of double register without a remove in between. The cause for that is unknown, 
and assumed a temp network issue.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6665df6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6665df6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6665df6b

Branch: refs/heads/branch-0.9
Commit: 6665df6b79a707357bd2b5be77223665077f17a2
Parents: 3fba7b7
Author: Tal Sliwowicz <[email protected]>
Authored: Fri Jan 9 12:06:48 2015 -0800
Committer: Andrew Or <[email protected]>
Committed: Fri Jan 9 12:06:48 2015 -0800

----------------------------------------------------------------------
 .../spark/storage/BlockManagerMasterActor.scala    | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6665df6b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 2c1a4e2..8a82dc1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -160,6 +160,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf) extends Act
         blockLocations.remove(locations)
       }
     }
+    logInfo(s"Removing block manager $blockManagerId")
   }
 
   private def expireDeadHosts() {
@@ -225,14 +226,18 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf) extends Act
   private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: 
ActorRef) {
     if (!blockManagerInfo.contains(id)) {
       blockManagerIdByExecutor.get(id.executorId) match {
-        case Some(manager) =>
-          // A block manager of the same executor already exists.
-          // This should never happen. Let's just quit.
-          logError("Got two different block manager registrations on " + 
id.executorId)
-          System.exit(1)
+        case Some(oldId) =>
+          // A block manager of the same executor already exists, so remove it 
(assumed dead)
+          logError("Got two different block manager registrations on same 
executor - " 
+              + s" will replace old one $oldId with new one $id")
+          removeExecutor(id.executorId)  
         case None =>
-          blockManagerIdByExecutor(id.executorId) = id
       }
+      logInfo("Registering block manager %s with %s RAM, %s".format(
+        id.hostPort, Utils.bytesToString(maxMemSize), id))
+      
+      blockManagerIdByExecutor(id.executorId) = id
+      
       blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
         id, System.currentTimeMillis(), maxMemSize, slaveActor)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to