Updated Branches:
  refs/heads/branch-0.8 0fcb2348a -> 616ea6f32

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
new file mode 100644
index 0000000..a0233a7
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.Logging
+import org.apache.zookeeper._
+
+import akka.serialization.Serialization
+
+class ZooKeeperPersistenceEngine(serialization: Serialization)
+  extends PersistenceEngine
+  with SparkZooKeeperWatcher
+  with Logging
+{
+  val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") 
+ "/master_status"
+
+  val zk = new SparkZooKeeperSession(this)
+
+  zk.connect()
+
+  override def zkSessionCreated() {
+    zk.mkdirRecursive(WORKING_DIR)
+  }
+
+  override def zkDown() {
+    logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.")
+  }
+
+  override def addApplication(app: ApplicationInfo) {
+    serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)
+  }
+
+  override def removeApplication(app: ApplicationInfo) {
+    zk.delete(WORKING_DIR + "/app_" + app.id)
+  }
+
+  override def addWorker(worker: WorkerInfo) {
+    serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker)
+  }
+
+  override def removeWorker(worker: WorkerInfo) {
+    zk.delete(WORKING_DIR + "/worker_" + worker.id)
+  }
+
+  override def close() {
+    zk.close()
+  }
+
+  override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+    val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted
+    val appFiles = sortedFiles.filter(_.startsWith("app_"))
+    val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+    val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
+    val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
+    (apps, workers)
+  }
+
+  private def serializeIntoFile(path: String, value: Serializable) {
+    val serializer = serialization.findSerializerFor(value)
+    val serialized = serializer.toBinary(value)
+    zk.create(path, serialized, CreateMode.PERSISTENT)
+  }
+
+  def deserializeFromFile[T <: Serializable](filename: String)(implicit m: 
Manifest[T]): T = {
+    val fileData = zk.getData("/spark/master_status/" + filename)
+    val clazz = m.erasure.asInstanceOf[Class[T]]
+    val serializer = serialization.serializerFor(clazz)
+    serializer.fromBinary(fileData).asInstanceOf[T]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index e3dc30e..8fabc95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -43,7 +43,8 @@ private[spark] class ExecutorRunner(
     val workerId: String,
     val host: String,
     val sparkHome: File,
-    val workDir: File)
+    val workDir: File,
+    var state: ExecutorState.Value)
   extends Logging {
 
   val fullId = appId + "/" + execId
@@ -83,7 +84,8 @@ private[spark] class ExecutorRunner(
         process.destroy()
         process.waitFor()
       }
-      worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, 
None)
+      state = ExecutorState.KILLED
+      worker ! ExecutorStateChanged(appId, execId, state, None, None)
       Runtime.getRuntime.removeShutdownHook(shutdownHook)
     }
   }
@@ -180,9 +182,9 @@ private[spark] class ExecutorRunner(
       // long-lived processes only. However, in the future, we might restart 
the executor a few
       // times on the same machine.
       val exitCode = process.waitFor()
+      state = ExecutorState.FAILED
       val message = "Command exited with code " + exitCode
-      worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, 
Some(message),
-                                    Some(exitCode))
+      worker ! ExecutorStateChanged(appId, execId, state, Some(message), 
Some(exitCode))
     } catch {
       case interrupted: InterruptedException =>
         logInfo("Runner thread for executor " + fullId + " interrupted")
@@ -192,8 +194,9 @@ private[spark] class ExecutorRunner(
         if (process != null) {
           process.destroy()
         }
+        state = ExecutorState.FAILED
         val message = e.getClass + ": " + e.getMessage
-        worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, 
Some(message), None)
+        worker ! ExecutorStateChanged(appId, execId, state, Some(message), 
None)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 09530be..216d9d4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -23,26 +23,28 @@ import java.io.File
 
 import scala.collection.mutable.HashMap
 
-import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.actor._
 import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, 
RemoteClientDisconnected}
 import akka.util.duration._
 
-import org.apache.spark.{Logging}
-import org.apache.spark.deploy.ExecutorState
+import org.apache.spark.Logging
+import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{Utils, AkkaUtils}
 
-
+/**
+  * @param masterUrls Each url should look like spark://host:port.
+  */
 private[spark] class Worker(
     host: String,
     port: Int,
     webUiPort: Int,
     cores: Int,
     memory: Int,
-    masterUrl: String,
+    masterUrls: Array[String],
     workDirPath: String = null)
   extends Actor with Logging {
 
@@ -54,8 +56,18 @@ private[spark] class Worker(
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
   val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", 
"60").toLong * 1000 / 4
 
+  val REGISTRATION_TIMEOUT = 20.seconds
+  val REGISTRATION_RETRIES = 3
+
+  // Index into masterUrls that we're currently trying to register with.
+  var masterIndex = 0
+
+  val masterLock: Object = new Object()
   var master: ActorRef = null
-  var masterWebUiUrl : String = ""
+  var activeMasterUrl: String = ""
+  var activeMasterWebUiUrl : String = ""
+  @volatile var registered = false
+  @volatile var connected = false
   val workerId = generateWorkerId()
   var sparkHome: File = null
   var workDir: File = null
@@ -95,6 +107,7 @@ private[spark] class Worker(
   }
 
   override def preStart() {
+    assert(!registered)
     logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
       host, port, cores, Utils.megabytesToString(memory)))
     sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
@@ -103,44 +116,98 @@ private[spark] class Worker(
     webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
 
     webUi.start()
-    connectToMaster()
+    registerWithMaster()
 
     metricsSystem.registerSource(workerSource)
     metricsSystem.start()
   }
 
-  def connectToMaster() {
-    logInfo("Connecting to master " + masterUrl)
-    master = context.actorFor(Master.toAkkaUrl(masterUrl))
-    master ! RegisterWorker(workerId, host, port, cores, memory, 
webUi.boundPort.get, publicAddress)
-    context.system.eventStream.subscribe(self, 
classOf[RemoteClientLifeCycleEvent])
-    context.watch(master) // Doesn't work with remote actors, but useful for 
testing
+  def changeMaster(url: String, uiUrl: String) {
+    masterLock.synchronized {
+      activeMasterUrl = url
+      activeMasterWebUiUrl = uiUrl
+      master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
+      context.system.eventStream.subscribe(self, 
classOf[RemoteClientLifeCycleEvent])
+      context.watch(master) // Doesn't work with remote actors, but useful for 
testing
+      connected = true
+    }
+  }
+
+  def tryRegisterAllMasters() {
+    for (masterUrl <- masterUrls) {
+      logInfo("Connecting to master " + masterUrl + "...")
+      val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
+      actor ! RegisterWorker(workerId, host, port, cores, memory, 
webUi.boundPort.get,
+        publicAddress)
+    }
+  }
+
+  def registerWithMaster() {
+    tryRegisterAllMasters()
+
+    var retries = 0
+    lazy val retryTimer: Cancellable =
+      context.system.scheduler.schedule(REGISTRATION_TIMEOUT, 
REGISTRATION_TIMEOUT) {
+        retries += 1
+        if (registered) {
+          retryTimer.cancel()
+        } else if (retries >= REGISTRATION_RETRIES) {
+          logError("All masters are unresponsive! Giving up.")
+          System.exit(1)
+        } else {
+          tryRegisterAllMasters()
+        }
+      }
+    retryTimer // start timer
   }
 
   override def receive = {
-    case RegisteredWorker(url) =>
-      masterWebUiUrl = url
-      logInfo("Successfully registered with master")
-      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
-        master ! Heartbeat(workerId)
+    case RegisteredWorker(masterUrl, masterWebUiUrl) =>
+      logInfo("Successfully registered with master " + masterUrl)
+      registered = true
+      changeMaster(masterUrl, masterWebUiUrl)
+      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, 
self, SendHeartbeat)
+
+    case SendHeartbeat =>
+      masterLock.synchronized {
+        if (connected) { master ! Heartbeat(workerId) }
       }
 
+    case MasterChanged(masterUrl, masterWebUiUrl) =>
+      logInfo("Master has changed, new master is at " + masterUrl)
+      context.unwatch(master)
+      changeMaster(masterUrl, masterWebUiUrl)
+
+      val execs = executors.values.
+        map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
+      sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
+
     case RegisterWorkerFailed(message) =>
-      logError("Worker registration failed: " + message)
-      System.exit(1)
-
-    case LaunchExecutor(appId, execId, appDesc, cores_, memory_, 
execSparkHome_) =>
-      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, 
appDesc.name))
-      val manager = new ExecutorRunner(
-        appId, execId, appDesc, cores_, memory_, self, workerId, host, new 
File(execSparkHome_), workDir)
-      executors(appId + "/" + execId) = manager
-      manager.start()
-      coresUsed += cores_
-      memoryUsed += memory_
-      master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, 
None, None)
+      if (!registered) {
+        logError("Worker registration failed: " + message)
+        System.exit(1)
+      }
+
+    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, 
execSparkHome_) =>
+      if (masterUrl != activeMasterUrl) {
+        logWarning("Invalid Master (" + masterUrl + ") attempted to launch 
executor.")
+      } else {
+        logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, 
appDesc.name))
+        val manager = new ExecutorRunner(appId, execId, appDesc, cores_, 
memory_,
+          self, workerId, host, new File(execSparkHome_), workDir, 
ExecutorState.RUNNING)
+        executors(appId + "/" + execId) = manager
+        manager.start()
+        coresUsed += cores_
+        memoryUsed += memory_
+        masterLock.synchronized {
+          master ! ExecutorStateChanged(appId, execId, manager.state, None, 
None)
+        }
+      }
 
     case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
-      master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+      masterLock.synchronized {
+        master ! ExecutorStateChanged(appId, execId, state, message, 
exitStatus)
+      }
       val fullId = appId + "/" + execId
       if (ExecutorState.isFinished(state)) {
         val executor = executors(fullId)
@@ -153,32 +220,39 @@ private[spark] class Worker(
         memoryUsed -= executor.memory
       }
 
-    case KillExecutor(appId, execId) =>
-      val fullId = appId + "/" + execId
-      executors.get(fullId) match {
-        case Some(executor) =>
-          logInfo("Asked to kill executor " + fullId)
-          executor.kill()
-        case None =>
-          logInfo("Asked to kill unknown executor " + fullId)
+    case KillExecutor(masterUrl, appId, execId) =>
+      if (masterUrl != activeMasterUrl) {
+        logWarning("Invalid Master (" + masterUrl + ") attempted to launch 
executor " + execId)
+      } else {
+        val fullId = appId + "/" + execId
+        executors.get(fullId) match {
+          case Some(executor) =>
+            logInfo("Asked to kill executor " + fullId)
+            executor.kill()
+          case None =>
+            logInfo("Asked to kill unknown executor " + fullId)
+        }
       }
 
-    case Terminated(_) | RemoteClientDisconnected(_, _) | 
RemoteClientShutdown(_, _) =>
+    case Terminated(actor_) if actor_ == master =>
+      masterDisconnected()
+
+    case RemoteClientDisconnected(transport, address) if address == 
master.path.address =>
+      masterDisconnected()
+
+    case RemoteClientShutdown(transport, address) if address == 
master.path.address =>
       masterDisconnected()
 
     case RequestWorkerState => {
       sender ! WorkerStateResponse(host, port, workerId, 
executors.values.toList,
-        finishedExecutors.values.toList, masterUrl, cores, memory,
-        coresUsed, memoryUsed, masterWebUiUrl)
+        finishedExecutors.values.toList, activeMasterUrl, cores, memory,
+        coresUsed, memoryUsed, activeMasterWebUiUrl)
     }
   }
 
   def masterDisconnected() {
-    // TODO: It would be nice to try to reconnect to the master, but just shut 
down for now.
-    // (Note that if reconnecting we would also need to assign IDs 
differently.)
-    logError("Connection to master failed! Shutting down.")
-    executors.values.foreach(_.kill())
-    System.exit(1)
+    logError("Connection to master failed! Waiting for master to reconnect...")
+    connected = false
   }
 
   def generateWorkerId(): String = {
@@ -196,17 +270,18 @@ private[spark] object Worker {
   def main(argStrings: Array[String]) {
     val args = new WorkerArguments(argStrings)
     val (actorSystem, _) = startSystemAndActor(args.host, args.port, 
args.webUiPort, args.cores,
-      args.memory, args.master, args.workDir)
+      args.memory, args.masters, args.workDir)
     actorSystem.awaitTermination()
   }
 
   def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, 
memory: Int,
-    masterUrl: String, workDir: String, workerNumber: Option[Int] = None): 
(ActorSystem, Int) = {
+    masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = 
None)
+    : (ActorSystem, Int) = {
     // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
     val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, 
host, port)
     val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, 
webUiPort, cores, memory,
-      masterUrl, workDir)), name = "Worker")
+      masterUrls, workDir)), name = "Worker")
     (actorSystem, boundPort)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 0ae89a8..3ed528e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -29,7 +29,7 @@ private[spark] class WorkerArguments(args: Array[String]) {
   var webUiPort = 8081
   var cores = inferDefaultCores()
   var memory = inferDefaultMemory()
-  var master: String = null
+  var masters: Array[String] = null
   var workDir: String = null
   
   // Check for settings in environment variables 
@@ -86,14 +86,14 @@ private[spark] class WorkerArguments(args: Array[String]) {
       printUsageAndExit(0)
 
     case value :: tail =>
-      if (master != null) {  // Two positional arguments were given
+      if (masters != null) {  // Two positional arguments were given
         printUsageAndExit(1)
       }
-      master = value
+      masters = value.stripPrefix("spark://").split(",").map("spark://" + _)
       parse(tail)
 
     case Nil =>
-      if (master == null) {  // No positional argument was given
+      if (masters == null) {  // No positional argument was given
         printUsageAndExit(1)
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 95d6007..800f1ca 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -105,7 +105,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, 
requestedPort: Option[I
 
     val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
 
-    val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to 
Master</a></p>
+    val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to 
Master</a></p>
 
     val range = <span>Bytes {startByte.toString} - {endByte.toString} of 
{logLength}</span>
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9c49768..cb88159 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
 private[spark] class SparkDeploySchedulerBackend(
     scheduler: ClusterScheduler,
     sc: SparkContext,
-    master: String,
+    masters: Array[String],
     appName: String)
   extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
   with ClientListener
@@ -52,7 +52,7 @@ private[spark] class SparkDeploySchedulerBackend(
     val appDesc = new ApplicationDescription(appName, maxCores, 
executorMemory, command, sparkHome,
         "http://"; + sc.ui.appUIAddress)
 
-    client = new Client(sc.env.actorSystem, master, appDesc, this)
+    client = new Client(sc.env.actorSystem, masters, appDesc, this)
     client.start()
   }
 
@@ -71,8 +71,14 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def disconnected() {
     if (!stopping) {
-      logError("Disconnected from Spark cluster!")
-      scheduler.error("Disconnected from Spark cluster")
+      logWarning("Disconnected from Spark cluster! Waiting for 
reconnection...")
+    }
+  }
+
+  override def dead() {
+    if (!stopping) {
+      logError("Spark cluster looks dead, giving up.")
+      scheduler.error("Spark cluster looks down")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala 
b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 453394d..fcd1b51 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -35,7 +35,7 @@ private[spark] object UIWorkloadGenerator {
 
   def main(args: Array[String]) {
     if (args.length < 2) {
-      println("usage: ./spark-class spark.ui.UIWorkloadGenerator [master] 
[FIFO|FAIR]")
+      println("usage: ./spark-class org.apache.spark.ui.UIWorkloadGenerator 
[master] [FIFO|FAIR]")
       System.exit(1)
     }
     val master = args(0)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 05f8545..0b38e23 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -25,7 +25,7 @@ import net.liftweb.json.JsonAST.JValue
 import org.scalatest.FunSuite
 
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, 
WorkerStateResponse}
-import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState, 
WorkerInfo}
 import org.apache.spark.deploy.worker.ExecutorRunner
 
 class JsonProtocolSuite extends FunSuite {
@@ -53,7 +53,8 @@ class JsonProtocolSuite extends FunSuite {
     val workers = Array[WorkerInfo](createWorkerInfo(), createWorkerInfo())
     val activeApps = Array[ApplicationInfo](createAppInfo())
     val completedApps = Array[ApplicationInfo]()
-    val stateResponse = new MasterStateResponse("host", 8080, workers, 
activeApps, completedApps)
+    val stateResponse = new MasterStateResponse("host", 8080, workers, 
activeApps, completedApps,
+      RecoveryState.ALIVE)
     val output = JsonProtocol.writeMasterState(stateResponse)
     assertValidJson(output)
   }
@@ -79,7 +80,7 @@ class JsonProtocolSuite extends FunSuite {
   }
   def createExecutorRunner() : ExecutorRunner = {
     new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, 
"workerId", "host",
-      new File("sparkHome"), new File("workDir"))
+      new File("sparkHome"), new File("workDir"), ExecutorState.RUNNING)
   }
 
   def assertValidJson(json: JValue) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docker/README.md
----------------------------------------------------------------------
diff --git a/docker/README.md b/docker/README.md
new file mode 100644
index 0000000..bf59e77
--- /dev/null
+++ b/docker/README.md
@@ -0,0 +1,5 @@
+Spark docker files
+===========
+
+Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles),
+as well as some updates from Andre Schumacher 
(https://github.com/AndreSchumacher/docker).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docker/build
----------------------------------------------------------------------
diff --git a/docker/build b/docker/build
new file mode 100755
index 0000000..253a2fc
--- /dev/null
+++ b/docker/build
@@ -0,0 +1,22 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+docker images > /dev/null || { echo Please install docker in non-sudo mode. ; 
exit; }
+
+./spark-test/build
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docker/spark-test/README.md
----------------------------------------------------------------------
diff --git a/docker/spark-test/README.md b/docker/spark-test/README.md
new file mode 100644
index 0000000..addea27
--- /dev/null
+++ b/docker/spark-test/README.md
@@ -0,0 +1,10 @@
+Spark Docker files usable for testing and development purposes.
+
+These images are intended to be run like so:
+docker run -v $SPARK_HOME:/opt/spark spark-test-master
+docker run -v $SPARK_HOME:/opt/spark spark-test-worker <master_ip>
+
+Using this configuration, the containers will have their Spark directories
+mounted to your actual SPARK_HOME, allowing you to modify and recompile
+your Spark source and have them immediately usable in the docker images
+(without rebuilding them).

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docker/spark-test/base/Dockerfile
----------------------------------------------------------------------
diff --git a/docker/spark-test/base/Dockerfile 
b/docker/spark-test/base/Dockerfile
new file mode 100644
index 0000000..6096277
--- /dev/null
+++ b/docker/spark-test/base/Dockerfile
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM ubuntu:precise
+
+RUN echo "deb http://archive.ubuntu.com/ubuntu precise main universe" > 
/etc/apt/sources.list
+
+# Upgrade package index
+RUN apt-get update
+
+# install a few other useful packages plus Open Jdk 7
+RUN apt-get install -y less openjdk-7-jre-headless net-tools vim-tiny sudo 
openssh-server
+
+ENV SCALA_VERSION 2.9.3
+ENV SPARK_VERSION 0.8.1
+ENV CDH_VERSION cdh4
+ENV SCALA_HOME /opt/scala-$SCALA_VERSION
+ENV SPARK_HOME /opt/spark
+ENV PATH $SPARK_HOME:$SCALA_HOME/bin:$PATH
+
+# Install Scala
+ADD http://www.scala-lang.org/files/archive/scala-$SCALA_VERSION.tgz /
+RUN (cd / && gunzip < scala-$SCALA_VERSION.tgz)|(cd /opt && tar -xvf -)
+RUN rm /scala-$SCALA_VERSION.tgz

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docker/spark-test/build
----------------------------------------------------------------------
diff --git a/docker/spark-test/build b/docker/spark-test/build
new file mode 100755
index 0000000..6f9e197
--- /dev/null
+++ b/docker/spark-test/build
@@ -0,0 +1,22 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+docker build -t spark-test-base spark-test/base/
+docker build -t spark-test-master spark-test/master/
+docker build -t spark-test-worker spark-test/worker/

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docker/spark-test/master/Dockerfile
----------------------------------------------------------------------
diff --git a/docker/spark-test/master/Dockerfile 
b/docker/spark-test/master/Dockerfile
new file mode 100644
index 0000000..f729534
--- /dev/null
+++ b/docker/spark-test/master/Dockerfile
@@ -0,0 +1,21 @@
+# Spark Master
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-test-base
+ADD default_cmd /root/
+CMD ["/root/default_cmd"]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docker/spark-test/master/default_cmd
----------------------------------------------------------------------
diff --git a/docker/spark-test/master/default_cmd 
b/docker/spark-test/master/default_cmd
new file mode 100755
index 0000000..a5b1303
--- /dev/null
+++ b/docker/spark-test/master/default_cmd
@@ -0,0 +1,22 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { 
print $1 }')
+echo "CONTAINER_IP=$IP"
+/opt/spark/spark-class org.apache.spark.deploy.master.Master -i $IP

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docker/spark-test/worker/Dockerfile
----------------------------------------------------------------------
diff --git a/docker/spark-test/worker/Dockerfile 
b/docker/spark-test/worker/Dockerfile
new file mode 100644
index 0000000..890febe
--- /dev/null
+++ b/docker/spark-test/worker/Dockerfile
@@ -0,0 +1,22 @@
+# Spark Worker
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-test-base
+ENV SPARK_WORKER_PORT 8888
+ADD default_cmd /root/
+ENTRYPOINT ["/root/default_cmd"]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docker/spark-test/worker/default_cmd
----------------------------------------------------------------------
diff --git a/docker/spark-test/worker/default_cmd 
b/docker/spark-test/worker/default_cmd
new file mode 100755
index 0000000..ab6336f
--- /dev/null
+++ b/docker/spark-test/worker/default_cmd
@@ -0,0 +1,22 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { 
print $1 }')
+echo "CONTAINER_IP=$IP"
+/opt/spark/spark-class org.apache.spark.deploy.worker.Worker $1

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docs/spark-standalone.md
----------------------------------------------------------------------
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 81cdbef..17066ef 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -3,6 +3,9 @@ layout: global
 title: Spark Standalone Mode
 ---
 
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
 In addition to running on the Mesos or YARN cluster managers, Spark also 
provides a simple standalone deploy mode. You can launch a standalone cluster 
either manually, by starting a master and workers by hand, or use our provided 
[launch scripts](#cluster-launch-scripts). It is also possible to run these 
daemons on a single machine for testing.
 
 # Installing Spark Standalone to a Cluster
@@ -169,3 +172,75 @@ In addition, detailed log output for each job is also 
written to the work direct
 
 You can run Spark alongside your existing Hadoop cluster by just launching it 
as a separate service on the same machines. To access Hadoop data from Spark, 
just use a hdfs:// URL (typically `hdfs://<namenode>:9000/path`, but you can 
find the right URL on your Hadoop Namenode's web UI). Alternatively, you can 
set up a separate cluster for Spark, and still have it access HDFS over the 
network; this will be slower than disk-local access, but may not be a concern 
if you are still running in the same local area network (e.g. you place a few 
Spark machines on each rack that you have Hadoop on).
 
+
+# High Availability
+
+By default, standalone scheduling clusters are resilient to Worker failures 
(insofar as Spark itself is resilient to losing work by moving it to other 
workers). However, the scheduler uses a Master to make scheduling decisions, 
and this (by default) creates a single point of failure: if the Master crashes, 
no new applications can be created. In order to circumvent this, we have two 
high availability schemes, detailed below.
+
+## Standby Masters with ZooKeeper
+
+**Overview**
+
+Utilizing ZooKeeper to provide leader election and some state storage, you can 
launch multiple Masters in your cluster connected to the same ZooKeeper 
instance. One will be elected "leader" and the others will remain in standby 
mode. If the current leader dies, another Master will be elected, recover the 
old Master's state, and then resume scheduling. The entire recovery process 
(from the time the the first leader goes down) should take between 1 and 2 
minutes. Note that this delay only affects scheduling _new_ applications -- 
applications that were already running during Master failover are unaffected.
+
+Learn more about getting started with ZooKeeper 
[here](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html).
+
+**Configuration**
+
+In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in 
spark-env using this configuration:
+
+<table class="table">
+  <tr><th style="width:21%">System property</th><th>Meaning</th></tr>
+  <tr>
+    <td><code>spark.deploy.recoveryMode</code></td>
+    <td>Set to ZOOKEEPER to enable standby Master recovery mode (default: 
NONE).</td>
+  </tr>
+  <tr>
+    <td><code>spark.deploy.zookeeper.url</code></td>
+    <td>The ZooKeeper cluster url (e.g., 
192.168.1.100:2181,192.168.1.101:2181).</td>
+  </tr>
+  <tr>
+    <td><code>spark.deploy.zookeeper.dir</code></td>
+    <td>The directory in ZooKeeper to store recovery state (default: 
/spark).</td>
+  </tr>
+</table>
+
+Possible gotcha: If you have multiple Masters in your cluster but fail to 
correctly configure the Masters to use ZooKeeper, the Masters will fail to 
discover each other and think they're all leaders. This will not lead to a 
healthy cluster state (as all Masters will schedule independently).
+
+**Details**
+
+After you have a ZooKeeper cluster set up, enabling high availability is 
straightforward. Simply start multiple Master processes on different nodes with 
the same ZooKeeper configuration (ZooKeeper URL and directory). Masters can be 
added and removed at any time.
+
+In order to schedule new applications or add Workers to the cluster, they need 
to know the IP address of the current leader. This can be accomplished by 
simply passing in a list of Masters where you used to pass in a single one. For 
example, you might start your SparkContext pointing to 
``spark://host1:port1,host2:port2``. This would cause your SparkContext to try 
registering with both Masters -- if ``host1`` goes down, this configuration 
would still be correct as we'd find the new leader, ``host2``.
+
+There's an important distinction to be made between "registering with a 
Master" and normal operation. When starting up, an application or Worker needs 
to be able to find and register with the current lead Master. Once it 
successfully registers, though, it is "in the system" (i.e., stored in 
ZooKeeper). If failover occurs, the new leader will contact all previously 
registered applications and Workers to inform them of the change in leadership, 
so they need not even have known of the existence of the new Master at startup.
+
+Due to this property, new Masters can be created at any time, and the only 
thing you need to worry about is that _new_ applications and Workers can find 
it to register with in case it becomes the leader. Once registered, you're 
taken care of.
+
+## Single-Node Recovery with Local File System
+
+**Overview**
+
+ZooKeeper is the best way to go for production-level high availability, but if 
you just want to be able to restart the Master if it goes down, FILESYSTEM mode 
can take care of it. When applications and Workers register, they have enough 
state written to the provided directory so that they can be recovered upon a 
restart of the Master process.
+
+**Configuration**
+
+In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in 
spark-env using this configuration:
+
+<table class="table">
+  <tr><th style="width:21%">System property</th><th>Meaning</th></tr>
+  <tr>
+    <td><code>spark.deploy.recoveryMode</code></td>
+    <td>Set to FILESYSTEM to enable single-node recovery mode (default: 
NONE).</td>
+  </tr>
+  <tr>
+    <td><code>spark.deploy.recoveryDirectory</code></td>
+    <td>The directory in which Spark will store recovery state, accessible 
from the Master's perspective.</td>
+  </tr>
+</table>
+
+**Details**
+
+* This solution can be used in tandem with a process monitor/manager like 
[monit](http://mmonit.com/monit/), or just to enable manual recovery via 
restart.
+* While filesystem recovery seems straightforwardly better than not doing any 
recovery at all, this mode may be suboptimal for certain development or 
experimental purposes. In particular, killing a master via stop-master.sh does 
not clean up its recovery state, so whenever you start a new Master, it will 
enter recovery mode. This could increase the startup time by up to 1 minute if 
it needs to wait for all previously-registered Workers/clients to timeout.
+* While it's not officially supported, you could mount an NFS directory as the 
recovery directory. If the original Master node dies completely, you could then 
start a Master on a different node, which would correctly recover all 
previously registered Workers/applications (equivalent to ZooKeeper recovery). 
Future applications will have to be able to find the new Master, however, in 
order to register.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index c7df172..835b257 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -122,12 +122,12 @@ Spark Streaming features windowed computations, which 
allow you to apply transfo
 <table class="table">
 <tr><th style="width:30%">Transformation</th><th>Meaning</th></tr>
 <tr>
-  <td> <b>window</b>(<i>windowDuration</i>, </i>slideDuration</i>) </td>
+  <td> <b>window</b>(<i>windowDuration</i>, <i>slideDuration</i>) </td>
   <td> Return a new DStream which is computed based on windowed batches of the 
source DStream. <i>windowDuration</i> is the width of the window and 
<i>slideTime</i> is the frequency during which the window is calculated. Both 
times must be multiples of the batch interval.
   </td>
 </tr>
 <tr>
-  <td> <b>countByWindow</b>(<i>windowDuration</i>, </i>slideDuration</i>) </td>
+  <td> <b>countByWindow</b>(<i>windowDuration</i>, <i>slideDuration</i>) </td>
   <td> Return a sliding count of elements in the stream. <i>windowDuration</i> 
and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
   </td>
 </tr>
@@ -161,7 +161,6 @@ Spark Streaming features windowed computations, which allow 
you to apply transfo
  <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in 
<code>window()</code>.
 </td>
 </tr>
-
 </table>
 
 A complete list of DStream operations is available in the API documentation of 
[DStream](api/streaming/index.html#org.apache.spark.streaming.DStream) and 
[PairDStreamFunctions](api/streaming/index.html#org.apache.spark.streaming.PairDStreamFunctions).

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/docs/tuning.md
----------------------------------------------------------------------
diff --git a/docs/tuning.md b/docs/tuning.md
index 28d88a2..f491ae9 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -175,7 +175,7 @@ To further tune garbage collection, we first need to 
understand some basic infor
 * Java Heap space is divided in to two regions Young and Old. The Young 
generation is meant to hold short-lived objects
   while the Old generation is intended for objects with longer lifetimes.
 
-* The Young generation is further divided into three regions [Eden, Survivor1, 
Survivor2].
+* The Young generation is further divided into three regions \[Eden, 
Survivor1, Survivor2\].
 
 * A simplified description of the garbage collection procedure: When Eden is 
full, a minor GC is run on Eden and objects
   that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor 
regions are swapped. If an object is old

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 049b636..a04e209 100644
--- a/pom.xml
+++ b/pom.xml
@@ -346,6 +346,17 @@
         <scope>test</scope>
       </dependency>
       <dependency>
+        <groupId>org.apache.zookeeper</groupId>
+        <artifactId>zookeeper</artifactId>
+        <version>3.4.5</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.jboss.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-client</artifactId>
         <version>${hadoop.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d66c01f2/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 0eda2d3..0843170 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -216,6 +216,7 @@ object SparkBuild extends Build {
       "net.java.dev.jets3t" % "jets3t" % "0.7.1",
       "org.apache.avro" % "avro" % "1.7.4",
       "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
+      "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty),
       "com.codahale.metrics" % "metrics-core" % "3.0.0",
       "com.codahale.metrics" % "metrics-jvm" % "3.0.0",
       "com.codahale.metrics" % "metrics-json" % "3.0.0",

Reply via email to