Repository: spark
Updated Branches:
  refs/heads/master f799700ee -> 6b4852252


[SPARK-4006] In long running contexts, we encountered the situation of double 
registe...

...r without a remove in between. The cause for that is unknown, and assumed a 
temp network issue.

However, since the second register is with a BlockManagerId on a different 
port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor 
returns Some. This inconsistency is caught in a conditional statement that does 
System.exit(1), which is a huge robustness issue for us.

The fix - simply remove the old id from both maps during register when this 
happens. We are mimicking the behavior of expireDeadHosts(), by doing local 
cleanup of the maps before trying to add new ones.

Also - added some logging for register and unregister.

This is just like https://github.com/apache/spark/pull/2854 except it's on 
master

Author: Tal Sliwowicz <[email protected]>

Closes #2886 from tsliwowicz/master-block-mgr-removal and squashes the 
following commits:

094d508 [Tal Sliwowicz] some more white space change undone
41a2217 [Tal Sliwowicz] some more whitspaces change undone
7bcfc3d [Tal Sliwowicz] whitspaces fix
df9d98f [Tal Sliwowicz] Code review comments fixed
f48bce9 [Tal Sliwowicz] In long running contexts, we encountered the situation 
of double register without a remove in between. The cause for that is unknown, 
and assumed a temp network issue.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b485225
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b485225
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b485225

Branch: refs/heads/master
Commit: 6b485225271a3c616c4fa1231c20090a95c86f32
Parents: f799700
Author: Tal Sliwowicz <[email protected]>
Authored: Thu Oct 23 10:51:06 2014 -0700
Committer: Andrew Or <[email protected]>
Committed: Thu Oct 23 10:53:53 2014 -0700

----------------------------------------------------------------------
 .../spark/storage/BlockManagerMasterActor.scala | 25 ++++++++++----------
 1 file changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6b485225/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 088f06e..5e375a2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -203,6 +203,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
       }
     }
     
listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), 
blockManagerId))
+    logInfo(s"Removing block manager $blockManagerId")
   }
 
   private def expireDeadHosts() {
@@ -327,20 +328,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
     val time = System.currentTimeMillis()
     if (!blockManagerInfo.contains(id)) {
       blockManagerIdByExecutor.get(id.executorId) match {
-        case Some(manager) =>
-          // A block manager of the same executor already exists.
-          // This should never happen. Let's just quit.
-          logError("Got two different block manager registrations on " + 
id.executorId)
-          System.exit(1)
+        case Some(oldId) =>
+          // A block manager of the same executor already exists, so remove it 
(assumed dead)
+          logError("Got two different block manager registrations on same 
executor - " 
+              + s" will replace old one $oldId with new one $id")
+          removeExecutor(id.executorId)  
         case None =>
-          blockManagerIdByExecutor(id.executorId) = id
       }
-
-      logInfo("Registering block manager %s with %s RAM".format(
-        id.hostPort, Utils.bytesToString(maxMemSize)))
-
-      blockManagerInfo(id) =
-        new BlockManagerInfo(id, time, maxMemSize, slaveActor)
+      logInfo("Registering block manager %s with %s RAM, %s".format(
+        id.hostPort, Utils.bytesToString(maxMemSize), id))
+      
+      blockManagerIdByExecutor(id.executorId) = id
+      
+      blockManagerInfo(id) = new BlockManagerInfo(
+        id, System.currentTimeMillis(), maxMemSize, slaveActor)
     }
     listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to