Updated Branches:
  refs/heads/master 793020961 -> 0ff38c222

Merge pull request #494 from tyro89/worker_registration_issue

Issue with failed worker registrations

I've been going through the spark source after having some odd issues with 
workers dying and not coming back. After some digging (I'm very new to scala 
and spark) I believe I've found a worker registration issue. It looks to me 
like a failed registration follows the same code path as a successful 
registration which end up with workers believing they are connected (since they 
received a `RegisteredWorker` event) even tho they are not registered on the 
Master.

This is a quick fix that I hope addresses this issue (assuming I didn't 
completely miss-read the code and I'm about to look like a silly person :P)

I'm opening this pr now to start a chat with you guys while I do some more 
testing on my side :)

Author: Erik Selin <erik.se...@jadedpixel.com>

== Merge branch commits ==

commit 973012f8a2dcf1ac1e68a69a2086a1b9a50f401b
Author: Erik Selin <erik.se...@jadedpixel.com>
Date:   Tue Jan 28 23:36:12 2014 -0500

    break logwarning into two lines to respect line character limit.

commit e3754dc5b94730f37e9806974340e6dd93400f85
Author: Erik Selin <erik.se...@jadedpixel.com>
Date:   Tue Jan 28 21:16:21 2014 -0500

    add log warning when worker registration fails due to attempt to 
re-register on same address.

commit 14baca241fa7823e1213cfc12a3ff2a9b865b1ed
Author: Erik Selin <erik.se...@jadedpixel.com>
Date:   Wed Jan 22 21:23:26 2014 -0500

    address code style comment

commit 71c0d7e6f59cd378d4e24994c21140ab893954ee
Author: Erik Selin <erik.se...@jadedpixel.com>
Date:   Wed Jan 22 16:01:42 2014 -0500

    Make a failed registration not persist, not send a `RegisteredWordker` 
event and not run `schedule` but rather send a `RegisterWorkerFailed` message 
to the worker attempting to register.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0ff38c22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0ff38c22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0ff38c22

Branch: refs/heads/master
Commit: 0ff38c22205f14770ecca1e66378e7c207ca2d1d
Parents: 7930209
Author: Erik Selin <erik.se...@jadedpixel.com>
Authored: Wed Jan 29 12:44:54 2014 -0800
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Wed Jan 29 12:44:54 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala  | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0ff38c22/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d49401f..3800557 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -176,10 +176,16 @@ private[spark] class Master(host: String, port: Int, 
webUiPort: Int) extends Act
       } else {
         val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
           sender, workerWebUiPort, publicAddress)
-        registerWorker(worker)
-        persistenceEngine.addWorker(worker)
-        sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
-        schedule()
+        if (registerWorker(worker)) {
+          persistenceEngine.addWorker(worker)
+          sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
+          schedule()
+        } else {
+          val workerAddress = worker.actor.path.address
+          logWarning("Worker registration failed. Attempted to re-register 
worker at same address: " +
+            workerAddress)
+          sender ! RegisterWorkerFailed("Attempted to re-register worker at 
same address: " + workerAddress)
+        }
       }
     }
 
@@ -511,7 +517,7 @@ private[spark] class Master(host: String, port: Int, 
webUiPort: Int) extends Act
       exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
   }
 
-  def registerWorker(worker: WorkerInfo): Unit = {
+  def registerWorker(worker: WorkerInfo): Boolean = {
     // There may be one or more refs to dead workers on this same node (w/ 
different ID's),
     // remove them.
     workers.filter { w =>
@@ -523,13 +529,14 @@ private[spark] class Master(host: String, port: Int, 
webUiPort: Int) extends Act
     val workerAddress = worker.actor.path.address
     if (addressToWorker.contains(workerAddress)) {
       logInfo("Attempted to re-register worker at same address: " + 
workerAddress)
-      return
+      return false
     }
 
     workers += worker
     idToWorker(worker.id) = worker
     actorToWorker(worker.actor) = worker
     addressToWorker(workerAddress) = worker
+    true
   }
 
   def removeWorker(worker: WorkerInfo) {

Reply via email to