Fine tuning defaults for akka and restored tracking of dissassociated events, 
for they are delivered when a remote TCP socket is closed. Also made transport 
failure heartbeats larger interval for it is mostly not needed. As we are using 
remote death watch instead.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/77929cfe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/77929cfe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/77929cfe

Branch: refs/heads/master
Commit: 77929cfeed95905106f5b3891e8de1b1c312d119
Parents: 95d8dbc
Author: Prashant Sharma <[email protected]>
Authored: Fri Nov 22 19:46:39 2013 +0530
Committer: Prashant Sharma <[email protected]>
Committed: Mon Nov 25 14:13:21 2013 +0530

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala | 23 +++++++++++++++-----
 .../org/apache/spark/deploy/worker/Worker.scala | 12 ++++++----
 .../executor/CoarseGrainedExecutorBackend.scala | 11 +++++-----
 .../cluster/CoarseGrainedSchedulerBackend.scala |  3 +++
 .../scala/org/apache/spark/util/AkkaUtils.scala | 12 +++++-----
 5 files changed, 40 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/77929cfe/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a7cfc25..25f5927 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.deploy.master
 
-import java.util.Date
 import java.text.SimpleDateFormat
+import java.util.concurrent.TimeUnit
+import java.util.Date
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.concurrent.Await
@@ -28,6 +29,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
 import akka.actor._
 import akka.pattern.ask
 import akka.remote._
+import akka.serialization.SerializationExtension
 import akka.util.Timeout
 
 import org.apache.spark.{Logging, SparkException}
@@ -40,11 +42,6 @@ import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
 import org.apache.spark.deploy.DeployMessages.KillExecutor
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import scala.Some
-import akka.actor.Terminated
-import akka.serialization.SerializationExtension
-import java.util.concurrent.TimeUnit
-
 
 private[spark] class Master(host: String, port: Int, webUiPort: Int) extends 
Actor with Logging {
   import context.dispatcher
@@ -102,6 +99,7 @@ private[spark] class Master(host: String, port: Int, 
webUiPort: Int) extends Act
   override def preStart() {
     logInfo("Starting Spark master at " + masterUrl)
     // Listen for remote client disconnection events, since they don't go 
through Akka's watch()
+    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
     webUi.start()
     masterWebUiUrl = "http://"; + masterPublicAddress + ":" + 
webUi.boundPort.get
     context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, 
CheckForWorkerTimeOut)
@@ -267,11 +265,20 @@ private[spark] class Master(host: String, port: Int, 
webUiPort: Int) extends Act
     case Terminated(actor) => {
       // The disconnected actor could've been either a worker or an app; 
remove whichever of
       // those we have an entry for in the corresponding actor hashmap
+      logInfo(s"$actor got terminated, removing it.")
       actorToWorker.get(actor).foreach(removeWorker)
       actorToApp.get(actor).foreach(finishApplication)
       if (state == RecoveryState.RECOVERING && canCompleteRecovery) { 
completeRecovery() }
     }
 
+    case DisassociatedEvent(_, address, _) => {
+      // The disconnected client could've been either a worker or an app; 
remove whichever it was
+      logInfo(s"$address got disassociated, removing it.")
+      addressToWorker.get(address).foreach(removeWorker)
+      addressToApp.get(address).foreach(finishApplication)
+      if (state == RecoveryState.RECOVERING && canCompleteRecovery) { 
completeRecovery() }
+    }
+
     case RequestMasterState => {
       sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, 
completedApps.toArray,
         state)
@@ -431,6 +438,8 @@ private[spark] class Master(host: String, port: Int, 
webUiPort: Int) extends Act
         exec.id, ExecutorState.LOST, Some("worker lost"), None)
       exec.application.removeExecutor(exec)
     }
+    context.stop(worker.actor)
+    context.unwatch(worker.actor)
     persistenceEngine.removeWorker(worker)
   }
 
@@ -493,6 +502,8 @@ private[spark] class Master(host: String, port: Int, 
webUiPort: Int) extends Act
         app.driver ! ApplicationRemoved(state.toString)
       }
       persistenceEngine.removeApplication(app)
+      context.stop(app.driver)
+      context.unwatch(app.driver)
       schedule()
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/77929cfe/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 9472c9a..3a7d0b8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -17,14 +17,16 @@
 
 package org.apache.spark.deploy.worker
 
+import java.io.File
 import java.text.SimpleDateFormat
 import java.util.Date
-import java.io.File
 
 import scala.collection.mutable.HashMap
 import scala.concurrent.duration._
 
 import akka.actor._
+import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
+
 import org.apache.spark.Logging
 import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
@@ -36,10 +38,8 @@ import 
org.apache.spark.deploy.DeployMessages.WorkerStateResponse
 import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
 import org.apache.spark.deploy.DeployMessages.KillExecutor
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import scala.Some
 import org.apache.spark.deploy.DeployMessages.Heartbeat
 import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import akka.remote.DisassociatedEvent
 import org.apache.spark.deploy.DeployMessages.LaunchExecutor
 import org.apache.spark.deploy.DeployMessages.RegisterWorker
 
@@ -124,7 +124,7 @@ private[spark] class Worker(
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
     webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
-
+    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
     webUi.start()
     registerWithMaster()
 
@@ -249,6 +249,10 @@ private[spark] class Worker(
       logInfo(s"$actor_ terminated !")
       masterDisconnected()
 
+    case x: DisassociatedEvent =>
+      logInfo(s"$x Disassociated !")
+      masterDisconnected()
+
     case RequestWorkerState => {
       sender ! WorkerStateResponse(host, port, workerId, 
executors.values.toList,
         finishedExecutors.values.toList, activeMasterUrl, cores, memory,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/77929cfe/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index a98ec06..2818a77 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -26,11 +26,6 @@ import org.apache.spark.Logging
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{Utils, AkkaUtils}
-import akka.remote.DisassociatedEvent
-import akka.remote.AssociationErrorEvent
-import akka.remote.DisassociatedEvent
-import akka.actor.Terminated
-
 
 private[spark] class CoarseGrainedExecutorBackend(
     driverUrl: String,
@@ -82,7 +77,11 @@ private[spark] class CoarseGrainedExecutorBackend(
       }
 
     case Terminated(actor) =>
-      logError(s"Driver $actor terminated or disconnected! Shutting down.")
+      logError(s"Driver $actor terminated, Shutting down.")
+      System.exit(1)
+
+    case x: DisassociatedEvent =>
+      logError(s"Driver $x disassociated! Shutting down.")
       System.exit(1)
 
     case StopExecutor =>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/77929cfe/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 821c30a..e316f6b 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -121,6 +121,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
ClusterScheduler, actorSystem: Ac
       case Terminated(actor) =>
         actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor 
terminated"))
 
+      case DisassociatedEvent(_, address, _) => 
+        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote 
Akka client disassociated"))
+
     }
 
     // Make fake resource offers on all executors

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/77929cfe/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala 
b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 2a83138..90a5387 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -44,9 +44,11 @@ private[spark] object AkkaUtils {
     val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
     val lifecycleEvents = if 
(System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" 
else "off"
 
-    val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", 
"30").toInt
-    val akkaFailureDetector = 
System.getProperty("spark.akka.failure-detector.threshold", "30").toInt
-    val akkaHeartBeatInterval = 
System.getProperty("spark.akka.heartbeat.interval", "3").toInt
+    val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", 
"60").toInt
+    val akkaFailureDetector = 
System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble
+    // Since we have our own Heart Beat mechanism and TCP already tracks 
connections. 
+    // Using this makes very little sense. So setting this to a relatively 
larger value suffices.
+    val akkaHeartBeatInterval = 
System.getProperty("spark.akka.heartbeat.interval", "3").toInt 
 
     val akkaConf = ConfigFactory.parseString(
       s"""
@@ -56,8 +58,8 @@ private[spark] object AkkaUtils {
       |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 
$akkaHeartBeatPauses s
       |akka.remote.watch-failure-detector.heartbeat-interval = 
$akkaHeartBeatInterval s
       |akka.remote.watch-failure-detector.threshold = $akkaFailureDetector
-      |akka.remote.transport-failure-detector.heartbeat-interval = 
$akkaHeartBeatInterval s
-      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 
$akkaHeartBeatPauses s
+      |akka.remote.transport-failure-detector.heartbeat-interval = 30 s
+      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 
${akkaHeartBeatPauses + 10} s
       |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.transport-class = 
"akka.remote.transport.netty.NettyTransport"

Reply via email to