Repository: spark
Updated Branches:
  refs/heads/master 5328c0aaa -> 8d338f64c


SPARK-2099. Report progress while task is running.

This is a sketch of a patch that allows the UI to show metrics for tasks that 
have not yet completed.  It adds a heartbeat every 2 seconds from the executors 
to the driver, reporting metrics for all of the executor's tasks.

It still needs unit tests, polish, and cluster testing, but I wanted to put it 
up to get feedback on the approach.

Author: Sandy Ryza <[email protected]>

Closes #1056 from sryza/sandy-spark-2099 and squashes the following commits:

93b9fdb [Sandy Ryza] Up heartbeat interval to 10 seconds and other tidying
132aec7 [Sandy Ryza] Heartbeat and HeartbeatResponse are already Serializable 
as case classes
38dffde [Sandy Ryza] Additional review feedback and restore test that was 
removed in BlockManagerSuite
51fa396 [Sandy Ryza] Remove hostname race, add better comments about threading, 
and some stylistic improvements
3084f10 [Sandy Ryza] Make TaskUIData a case class again
3bda974 [Sandy Ryza] Stylistic fixes
0dae734 [Sandy Ryza] SPARK-2099. Report progress while task is running.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d338f64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d338f64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d338f64

Branch: refs/heads/master
Commit: 8d338f64c4eda45d22ae33f61ef7928011cc2846
Parents: 5328c0a
Author: Sandy Ryza <[email protected]>
Authored: Fri Aug 1 11:08:39 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Fri Aug 1 11:08:39 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/HeartbeatReceiver.scala    |  46 ++++++++
 .../scala/org/apache/spark/SparkContext.scala   |   4 +
 .../main/scala/org/apache/spark/SparkEnv.scala  |   8 +-
 .../org/apache/spark/executor/Executor.scala    |  55 ++++++++-
 .../org/apache/spark/executor/TaskMetrics.scala |  10 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  21 +++-
 .../apache/spark/scheduler/SparkListener.scala  |  11 ++
 .../spark/scheduler/SparkListenerBus.scala      |   2 +
 .../scala/org/apache/spark/scheduler/Task.scala |   3 +
 .../apache/spark/scheduler/TaskScheduler.scala  |  10 ++
 .../spark/scheduler/TaskSchedulerImpl.scala     |  23 ++++
 .../spark/scheduler/local/LocalBackend.scala    |   9 +-
 .../org/apache/spark/storage/BlockManager.scala |  25 +---
 .../spark/storage/BlockManagerMaster.scala      |  43 +------
 .../spark/storage/BlockManagerMasterActor.scala |  29 ++---
 .../spark/storage/BlockManagerMessages.scala    |   6 +-
 .../spark/ui/jobs/JobProgressListener.scala     | 117 ++++++++++++-------
 .../scala/org/apache/spark/ui/jobs/UIData.scala |   9 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala |  66 ++++++++++-
 .../SparkContextSchedulerCreationSuite.scala    |   6 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |   5 +
 .../spark/storage/BlockManagerSuite.scala       |  23 ++--
 .../ui/jobs/JobProgressListenerSuite.scala      |  86 +++++++++++++-
 docs/configuration.md                           |   7 ++
 24 files changed, 467 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
new file mode 100644
index 0000000..24ccce2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import akka.actor.Actor
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.scheduler.TaskScheduler
+
+/**
+ * A heartbeat from executors to the driver. This is a shared message used by 
several internal
+ * components to convey liveness or execution information for in-progress 
tasks.
+ */
+private[spark] case class Heartbeat(
+    executorId: String,
+    taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
+    blockManagerId: BlockManagerId)
+
+private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
+
+/**
+ * Lives in the driver to receive heartbeats from executors..
+ */
+private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor 
{
+  override def receive = {
+    case Heartbeat(executorId, taskMetrics, blockManagerId) =>
+      val response = HeartbeatResponse(
+        !scheduler.executorHeartbeatReceived(executorId, taskMetrics, 
blockManagerId))
+      sender ! response
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0e51356..5f75c1d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, 
InputFormat, JobConf, Sequence
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => 
NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
NewFileInputFormat}
 import org.apache.mesos.MesosNativeLibrary
+import akka.actor.Props
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.broadcast.Broadcast
@@ -307,6 +308,8 @@ class SparkContext(config: SparkConf) extends Logging {
 
   // Create and start the scheduler
   private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, 
master)
+  private val heartbeatReceiver = env.actorSystem.actorOf(
+    Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
   @volatile private[spark] var dagScheduler: DAGScheduler = _
   try {
     dagScheduler = new DAGScheduler(this)
@@ -992,6 +995,7 @@ class SparkContext(config: SparkConf) extends Logging {
     if (dagSchedulerCopy != null) {
       env.metricsSystem.report()
       metadataCleaner.cancel()
+      env.actorSystem.stop(heartbeatReceiver)
       cleaner.foreach(_.stop())
       dagSchedulerCopy.stop()
       taskScheduler = null

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 6ee731b..92c809d 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -193,13 +193,7 @@ object SparkEnv extends Logging {
         logInfo("Registering " + name)
         actorSystem.actorOf(Props(newActor), name = name)
       } else {
-        val driverHost: String = conf.get("spark.driver.host", "localhost")
-        val driverPort: Int = conf.getInt("spark.driver.port", 7077)
-        Utils.checkHost(driverHost, "Expected hostname")
-        val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
-        val timeout = AkkaUtils.lookupTimeout(conf)
-        logInfo(s"Connecting to $name: $url")
-        Await.result(actorSystem.actorSelection(url).resolveOne(timeout), 
timeout)
+        AkkaUtils.makeDriverRef(name, conf, actorSystem)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 99d650a..1bb1b4a 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
 import java.util.concurrent._
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{ArrayBuffer, HashMap}
 
 import org.apache.spark._
 import org.apache.spark.scheduler._
@@ -48,6 +48,8 @@ private[spark] class Executor(
 
   private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
 
+  @volatile private var isStopped = false
+
   // No ip or host:port - just hostname
   Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname")
   // must not have port specified.
@@ -107,6 +109,8 @@ private[spark] class Executor(
   // Maintains the list of running tasks.
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
 
+  startDriverHeartbeater()
+
   def launchTask(
       context: ExecutorBackend, taskId: Long, taskName: String, 
serializedTask: ByteBuffer) {
     val tr = new TaskRunner(context, taskId, taskName, serializedTask)
@@ -121,8 +125,10 @@ private[spark] class Executor(
     }
   }
 
-  def stop(): Unit = {
+  def stop() {
     env.metricsSystem.report()
+    isStopped = true
+    threadPool.shutdown()
   }
 
   /** Get the Yarn approved local directories. */
@@ -141,11 +147,12 @@ private[spark] class Executor(
   }
 
   class TaskRunner(
-      execBackend: ExecutorBackend, taskId: Long, taskName: String, 
serializedTask: ByteBuffer)
+      execBackend: ExecutorBackend, val taskId: Long, taskName: String, 
serializedTask: ByteBuffer)
     extends Runnable {
 
     @volatile private var killed = false
-    @volatile private var task: Task[Any] = _
+    @volatile var task: Task[Any] = _
+    @volatile var attemptedTask: Option[Task[Any]] = None
 
     def kill(interruptThread: Boolean) {
       logInfo(s"Executor is trying to kill $taskName (TID $taskId)")
@@ -162,7 +169,6 @@ private[spark] class Executor(
       val ser = SparkEnv.get.closureSerializer.newInstance()
       logInfo(s"Running $taskName (TID $taskId)")
       execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
-      var attemptedTask: Option[Task[Any]] = None
       var taskStart: Long = 0
       def gcTime = 
ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
       val startGCTime = gcTime
@@ -204,7 +210,6 @@ private[spark] class Executor(
         val afterSerialization = System.currentTimeMillis()
 
         for (m <- task.metrics) {
-          m.hostname = Utils.localHostName()
           m.executorDeserializeTime = taskStart - startTime
           m.executorRunTime = taskFinish - taskStart
           m.jvmGCTime = gcTime - startGCTime
@@ -354,4 +359,42 @@ private[spark] class Executor(
       }
     }
   }
+
+  def startDriverHeartbeater() {
+    val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    val retryAttempts = AkkaUtils.numRetries(conf)
+    val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
+    val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", 
conf, env.actorSystem)
+
+    val t = new Thread() {
+      override def run() {
+        // Sleep a random interval so the heartbeats don't end up in sync
+        Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
+
+        while (!isStopped) {
+          val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
+          for (taskRunner <- runningTasks.values()) {
+            if (!taskRunner.attemptedTask.isEmpty) {
+              Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
+                tasksMetrics += ((taskRunner.taskId, metrics))
+              }
+            }
+          }
+
+          val message = Heartbeat(executorId, tasksMetrics.toArray, 
env.blockManager.blockManagerId)
+          val response = AkkaUtils.askWithReply[HeartbeatResponse](message, 
heartbeatReceiverRef,
+            retryAttempts, retryIntervalMs, timeout)
+          if (response.reregisterBlockManager) {
+            logWarning("Told to re-register on heartbeat")
+            env.blockManager.reregister()
+          }
+          Thread.sleep(interval)
+        }
+      }
+    }
+    t.setDaemon(true)
+    t.setName("Driver Heartbeater")
+    t.start()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 21fe643..56cd872 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -23,6 +23,14 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
 /**
  * :: DeveloperApi ::
  * Metrics tracked during the execution of a task.
+ *
+ * This class is used to house metrics both for in-progress and completed 
tasks. In executors,
+ * both the task thread and the heartbeat thread write to the TaskMetrics. The 
heartbeat thread
+ * reads it to send in-progress metrics, and the task thread reads it to send 
metrics along with
+ * the completed task.
+ *
+ * So, when adding new fields, take into consideration that the whole object 
can be serialized for
+ * shipping off at any time to consumers of the SparkListener interface.
  */
 @DeveloperApi
 class TaskMetrics extends Serializable {
@@ -143,7 +151,7 @@ class ShuffleReadMetrics extends Serializable {
   /**
    * Absolute time when this task finished reading shuffle data
    */
-  var shuffleFinishTime: Long = _
+  var shuffleFinishTime: Long = -1
 
   /**
    * Number of blocks fetched in this shuffle by this task (remote or local)

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 50186d0..c7e3d7c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -29,7 +29,6 @@ import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
 import akka.actor._
-import akka.actor.OneForOneStrategy
 import akka.actor.SupervisorStrategy.Stop
 import akka.pattern.ask
 import akka.util.Timeout
@@ -39,8 +38,9 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, 
RDDBlockId}
+import org.apache.spark.storage._
 import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils}
+import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
 
 /**
  * The high-level scheduling layer that implements stage-oriented scheduling. 
It computes a DAG of
@@ -154,6 +154,23 @@ class DAGScheduler(
     eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, 
taskInfo, taskMetrics)
   }
 
+  /**
+   * Update metrics for in-progress tasks and let the master know that the 
BlockManager is still
+   * alive. Return true if the driver knows about the given block manager. 
Otherwise, return false,
+   * indicating that the block manager should re-register.
+   */
+  def executorHeartbeatReceived(
+      execId: String,
+      taskMetrics: Array[(Long, Int, TaskMetrics)], // (taskId, stageId, 
metrics)
+      blockManagerId: BlockManagerId): Boolean = {
+    listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
+    implicit val timeout = Timeout(600 seconds)
+
+    Await.result(
+      blockManagerMaster.driverActor ? BlockManagerHeartbeat(blockManagerId),
+      timeout.duration).asInstanceOf[Boolean]
+  }
+
   // Called by TaskScheduler when an executor fails.
   def executorLost(execId: String) {
     eventProcessActor ! ExecutorLost(execId)

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 82163ea..d01d318 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -76,6 +76,12 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: 
BlockManagerId)
 case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
 
 @DeveloperApi
+case class SparkListenerExecutorMetricsUpdate(
+    execId: String,
+    taskMetrics: Seq[(Long, Int, TaskMetrics)])
+  extends SparkListenerEvent
+
+@DeveloperApi
 case class SparkListenerApplicationStart(appName: String, time: Long, 
sparkUser: String)
   extends SparkListenerEvent
 
@@ -158,6 +164,11 @@ trait SparkListener {
    * Called when the application ends
    */
   def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }
+
+  /**
+   * Called when the driver receives task metrics from an executor in a 
heartbeat.
+   */
+  def onExecutorMetricsUpdate(executorMetricsUpdate: 
SparkListenerExecutorMetricsUpdate) { }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index ed9fb24..e79ffd7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -68,6 +68,8 @@ private[spark] trait SparkListenerBus extends Logging {
         foreachListener(_.onApplicationStart(applicationStart))
       case applicationEnd: SparkListenerApplicationEnd =>
         foreachListener(_.onApplicationEnd(applicationEnd))
+      case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
+        foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
       case SparkListenerShutdown =>
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 5871ede..5c5e421 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -26,6 +26,8 @@ import org.apache.spark.TaskContext
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.Utils
+
 
 /**
  * A unit of execution. We have two kinds of Task's in Spark:
@@ -44,6 +46,7 @@ private[spark] abstract class Task[T](val stageId: Int, var 
partitionId: Int) ex
 
   final def run(attemptId: Long): T = {
     context = new TaskContext(stageId, partitionId, attemptId, runningLocally 
= false)
+    context.taskMetrics.hostname = Utils.localHostName();
     taskThread = Thread.currentThread()
     if (_killed) {
       kill(interruptThread = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 819c352..1a0b877 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.scheduler
 
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage.BlockManagerId
 
 /**
  * Low-level task scheduler interface, currently implemented exclusively by 
TaskSchedulerImpl.
@@ -54,4 +56,12 @@ private[spark] trait TaskScheduler {
 
   // Get the default level of parallelism to use in the cluster, as a hint for 
sizing jobs.
   def defaultParallelism(): Int
+
+  /**
+   * Update metrics for in-progress tasks and let the master know that the 
BlockManager is still
+   * alive. Return true if the driver knows about the given block manager. 
Otherwise, return false,
+   * indicating that the block manager should re-register.
+   */
+  def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, 
TaskMetrics)],
+    blockManagerId: BlockManagerId): Boolean
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index be3673c..d2f764f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -32,6 +32,9 @@ import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.util.Utils
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage.BlockManagerId
+import akka.actor.Props
 
 /**
  * Schedules tasks for multiple types of clusters by acting through a 
SchedulerBackend.
@@ -320,6 +323,26 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
+  /**
+   * Update metrics for in-progress tasks and let the master know that the 
BlockManager is still
+   * alive. Return true if the driver knows about the given block manager. 
Otherwise, return false,
+   * indicating that the block manager should re-register.
+   */
+  override def executorHeartbeatReceived(
+      execId: String,
+      taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
+      blockManagerId: BlockManagerId): Boolean = {
+    val metricsWithStageIds = taskMetrics.flatMap {
+      case (id, metrics) => {
+        taskIdToTaskSetId.get(id)
+          .flatMap(activeTaskSets.get)
+          .map(_.stageId)
+          .map(x => (id, x, metrics))
+      }
+    }
+    dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, 
blockManagerId)
+  }
+
   def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
     taskSetManager.handleTaskGettingResult(tid)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 5b89759..3d1cf31 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -23,8 +23,9 @@ import akka.actor.{Actor, ActorRef, Props}
 
 import org.apache.spark.{Logging, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.executor.{Executor, ExecutorBackend}
+import org.apache.spark.executor.{TaskMetrics, Executor, ExecutorBackend}
 import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, 
WorkerOffer}
+import org.apache.spark.storage.BlockManagerId
 
 private case class ReviveOffers()
 
@@ -32,6 +33,8 @@ private case class StatusUpdate(taskId: Long, state: 
TaskState, serializedData:
 
 private case class KillTask(taskId: Long, interruptThread: Boolean)
 
+private case class StopExecutor()
+
 /**
  * Calls to LocalBackend are all serialized through LocalActor. Using an actor 
makes the calls on
  * LocalBackend asynchronous, which is necessary to prevent deadlock between 
LocalBackend
@@ -63,6 +66,9 @@ private[spark] class LocalActor(
 
     case KillTask(taskId, interruptThread) =>
       executor.killTask(taskId, interruptThread)
+
+    case StopExecutor =>
+      executor.stop()
   }
 
   def reviveOffers() {
@@ -91,6 +97,7 @@ private[spark] class LocalBackend(scheduler: 
TaskSchedulerImpl, val totalCores:
   }
 
   override def stop() {
+    localActor ! StopExecutor
   }
 
   override def reviveOffers() {

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d746526..c0a0601 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -116,15 +116,6 @@ private[spark] class BlockManager(
   private var asyncReregisterTask: Future[Unit] = null
   private val asyncReregisterLock = new Object
 
-  private def heartBeat(): Unit = {
-    if (!master.sendHeartBeat(blockManagerId)) {
-      reregister()
-    }
-  }
-
-  private val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
-  private var heartBeatTask: Cancellable = null
-
   private val metadataCleaner = new MetadataCleaner(
     MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
   private val broadcastCleaner = new MetadataCleaner(
@@ -161,11 +152,6 @@ private[spark] class BlockManager(
   private def initialize(): Unit = {
     master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
     BlockManagerWorker.startBlockManagerWorker(this)
-    if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
-      heartBeatTask = actorSystem.scheduler.schedule(0.seconds, 
heartBeatFrequency.milliseconds) {
-        Utils.tryOrExit { heartBeat() }
-      }
-    }
   }
 
   /**
@@ -195,7 +181,7 @@ private[spark] class BlockManager(
    *
    * Note that this method must be called without any BlockInfo locks held.
    */
-  private def reregister(): Unit = {
+  def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo("BlockManager re-registering with master")
     master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
@@ -1065,9 +1051,6 @@ private[spark] class BlockManager(
   }
 
   def stop(): Unit = {
-    if (heartBeatTask != null) {
-      heartBeatTask.cancel()
-    }
     connectionManager.stop()
     shuffleBlockManager.stop()
     diskBlockManager.stop()
@@ -1095,12 +1078,6 @@ private[spark] object BlockManager extends Logging {
     (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
   }
 
-  def getHeartBeatFrequency(conf: SparkConf): Long =
-    conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000) / 4
-
-  def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean =
-    conf.getBoolean("spark.test.disableBlockManagerHeartBeat", false)
-
   /**
    * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an 
*unsafe* Sun API that
    * might cause errors if one attempts to read from the unmapped buffer, but 
it's better than

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 7897fad..6693077 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -21,7 +21,6 @@ import scala.concurrent.{Await, Future}
 import scala.concurrent.ExecutionContext.Implicits.global
 
 import akka.actor._
-import akka.pattern.ask
 
 import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.storage.BlockManagerMessages._
@@ -29,8 +28,8 @@ import org.apache.spark.util.AkkaUtils
 
 private[spark]
 class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends 
Logging {
-  val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3)
-  val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000)
+  private val AKKA_RETRY_ATTEMPTS: Int = AkkaUtils.numRetries(conf)
+  private val AKKA_RETRY_INTERVAL_MS: Int = AkkaUtils.retryWaitMs(conf)
 
   val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
 
@@ -42,15 +41,6 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: 
SparkConf) extends Log
     logInfo("Removed " + execId + " successfully in removeExecutor")
   }
 
-  /**
-   * Send the driver actor a heart beat from the slave. Returns true if 
everything works out,
-   * false if the driver does not know about the given block manager, which 
means the block
-   * manager should re-register.
-   */
-  def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = {
-    askDriverWithReply[Boolean](HeartBeat(blockManagerId))
-  }
-
   /** Register the BlockManager's id with the driver. */
   def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, 
slaveActor: ActorRef) {
     logInfo("Trying to register BlockManager")
@@ -223,33 +213,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: 
SparkConf) extends Log
    * throw a SparkException if this fails.
    */
   private def askDriverWithReply[T](message: Any): T = {
-    // TODO: Consider removing multiple attempts
-    if (driverActor == null) {
-      throw new SparkException("Error sending message to BlockManager as 
driverActor is null " +
-        "[message = " + message + "]")
-    }
-    var attempts = 0
-    var lastException: Exception = null
-    while (attempts < AKKA_RETRY_ATTEMPTS) {
-      attempts += 1
-      try {
-        val future = driverActor.ask(message)(timeout)
-        val result = Await.result(future, timeout)
-        if (result == null) {
-          throw new SparkException("BlockManagerMaster returned null")
-        }
-        return result.asInstanceOf[T]
-      } catch {
-        case ie: InterruptedException => throw ie
-        case e: Exception =>
-          lastException = e
-          logWarning("Error sending message to BlockManagerMaster in " + 
attempts + " attempts", e)
-      }
-      Thread.sleep(AKKA_RETRY_INTERVAL_MS)
-    }
-
-    throw new SparkException(
-      "Error sending message to BlockManagerMaster [message = " + message + 
"]", lastException)
+    AkkaUtils.askWithReply(message, driverActor, AKKA_RETRY_ATTEMPTS, 
AKKA_RETRY_INTERVAL_MS,
+      timeout)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index de1cc55..94f5a4b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -52,25 +52,24 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
 
   private val akkaTimeout = AkkaUtils.askTimeout(conf)
 
-  val slaveTimeout = conf.get("spark.storage.blockManagerSlaveTimeoutMs",
-    "" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong
+  val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
+    math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 
45000))
 
-  val checkTimeoutInterval = 
conf.get("spark.storage.blockManagerTimeoutIntervalMs",
-    "60000").toLong
+  val checkTimeoutInterval = 
conf.getLong("spark.storage.blockManagerTimeoutIntervalMs",
+    60000)
 
   var timeoutCheckingTask: Cancellable = null
 
   override def preStart() {
-    if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
-      import context.dispatcher
-      timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
-        checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
-    }
+    import context.dispatcher
+    timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
+      checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
     super.preStart()
   }
 
   def receive = {
     case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
+      logInfo("received a register")
       register(blockManagerId, maxMemSize, slaveActor)
       sender ! true
 
@@ -129,8 +128,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
     case ExpireDeadHosts =>
       expireDeadHosts()
 
-    case HeartBeat(blockManagerId) =>
-      sender ! heartBeat(blockManagerId)
+    case BlockManagerHeartbeat(blockManagerId) =>
+      sender ! heartbeatReceived(blockManagerId)
 
     case other =>
       logWarning("Got unknown message: " + other)
@@ -216,7 +215,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
     val minSeenTime = now - slaveTimeout
     val toRemove = new mutable.HashSet[BlockManagerId]
     for (info <- blockManagerInfo.values) {
-      if (info.lastSeenMs < minSeenTime) {
+      if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != 
"<driver>") {
         logWarning("Removing BlockManager " + info.blockManagerId + " with no 
recent heart beats: "
           + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
         toRemove += info.blockManagerId
@@ -230,7 +229,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
     blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
-  private def heartBeat(blockManagerId: BlockManagerId): Boolean = {
+  /**
+   * Return true if the driver knows about the given block manager. Otherwise, 
return false,
+   * indicating that the block manager should re-register.
+   */
+  private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
     if (!blockManagerInfo.contains(blockManagerId)) {
       blockManagerId.executorId == "<driver>" && !isLocal
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 2b53bf3..10b6528 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -21,7 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
 import akka.actor.ActorRef
 
-private[storage] object BlockManagerMessages {
+private[spark] object BlockManagerMessages {
   
//////////////////////////////////////////////////////////////////////////////////
   // Messages from the master to slaves.
   
//////////////////////////////////////////////////////////////////////////////////
@@ -53,8 +53,6 @@ private[storage] object BlockManagerMessages {
       sender: ActorRef)
     extends ToBlockManagerMaster
 
-  case class HeartBeat(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster
-
   class UpdateBlockInfo(
       var blockManagerId: BlockManagerId,
       var blockId: BlockId,
@@ -124,5 +122,7 @@ private[storage] object BlockManagerMessages {
   case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: 
Boolean = true)
     extends ToBlockManagerMaster
 
+  case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster
+
   case object ExpireDeadHosts extends ToBlockManagerMaster
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index efb527b..da2f5d3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -130,32 +130,16 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
         new StageUIData
       })
 
-      // create executor summary map if necessary
-      val executorSummaryMap = stageData.executorSummary
-      executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new 
ExecutorSummary)
-
-      executorSummaryMap.get(info.executorId).foreach { y =>
-        // first update failed-task, succeed-task
-        taskEnd.reason match {
-          case Success =>
-            y.succeededTasks += 1
-          case _ =>
-            y.failedTasks += 1
-        }
-
-        // update duration
-        y.taskTime += info.duration
-
-        val metrics = taskEnd.taskMetrics
-        if (metrics != null) {
-          metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
-          metrics.shuffleReadMetrics.foreach { y.shuffleRead += 
_.remoteBytesRead }
-          metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += 
_.shuffleBytesWritten }
-          y.memoryBytesSpilled += metrics.memoryBytesSpilled
-          y.diskBytesSpilled += metrics.diskBytesSpilled
-        }
+      val execSummaryMap = stageData.executorSummary
+      val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new 
ExecutorSummary)
+
+      taskEnd.reason match {
+        case Success =>
+          execSummary.succeededTasks += 1
+        case _ =>
+          execSummary.failedTasks += 1
       }
-
+      execSummary.taskTime += info.duration
       stageData.numActiveTasks -= 1
 
       val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
@@ -171,28 +155,75 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
             (Some(e.toErrorString), None)
         }
 
+      if (!metrics.isEmpty) {
+        val oldMetrics = 
stageData.taskData.get(info.taskId).flatMap(_.taskMetrics)
+        updateAggregateMetrics(stageData, info.executorId, metrics.get, 
oldMetrics)
+      }
 
-      val taskRunTime = metrics.map(_.executorRunTime).getOrElse(0L)
-      stageData.executorRunTime += taskRunTime
-      val inputBytes = 
metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)
-      stageData.inputBytes += inputBytes
-
-      val shuffleRead = 
metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
-      stageData.shuffleReadBytes += shuffleRead
-
-      val shuffleWrite =
-        
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
-      stageData.shuffleWriteBytes += shuffleWrite
-
-      val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)
-      stageData.memoryBytesSpilled += memoryBytesSpilled
+      val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new 
TaskUIData(info))
+      taskData.taskInfo = info
+      taskData.taskMetrics = metrics
+      taskData.errorMessage = errorMessage
+    }
+  }
 
-      val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L)
-      stageData.diskBytesSpilled += diskBytesSpilled
+  /**
+   * Upon receiving new metrics for a task, updates the per-stage and 
per-executor-per-stage
+   * aggregate metrics by calculating deltas between the currently recorded 
metrics and the new
+   * metrics.
+   */
+  def updateAggregateMetrics(
+      stageData: StageUIData,
+      execId: String,
+      taskMetrics: TaskMetrics,
+      oldMetrics: Option[TaskMetrics]) {
+    val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new 
ExecutorSummary)
+
+    val shuffleWriteDelta =
+      (taskMetrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L)
+      - 
oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L))
+    stageData.shuffleWriteBytes += shuffleWriteDelta
+    execSummary.shuffleWrite += shuffleWriteDelta
+
+    val shuffleReadDelta =
+      (taskMetrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L)
+      - 
oldMetrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L))
+    stageData.shuffleReadBytes += shuffleReadDelta
+    execSummary.shuffleRead += shuffleReadDelta
+
+    val diskSpillDelta =
+      taskMetrics.diskBytesSpilled - 
oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
+    stageData.diskBytesSpilled += diskSpillDelta
+    execSummary.diskBytesSpilled += diskSpillDelta
+
+    val memorySpillDelta =
+      taskMetrics.memoryBytesSpilled - 
oldMetrics.map(_.memoryBytesSpilled).getOrElse(0L)
+    stageData.memoryBytesSpilled += memorySpillDelta
+    execSummary.memoryBytesSpilled += memorySpillDelta
+
+    val timeDelta =
+      taskMetrics.executorRunTime - 
oldMetrics.map(_.executorRunTime).getOrElse(0L)
+    stageData.executorRunTime += timeDelta
+  }
 
-      stageData.taskData(info.taskId) = new TaskUIData(info, metrics, 
errorMessage)
+  override def onExecutorMetricsUpdate(executorMetricsUpdate: 
SparkListenerExecutorMetricsUpdate) {
+    for ((taskId, sid, taskMetrics) <- executorMetricsUpdate.taskMetrics) {
+      val stageData = stageIdToData.getOrElseUpdate(sid, {
+        logWarning("Metrics update for task in unknown stage " + sid)
+        new StageUIData
+      })
+      val taskData = stageData.taskData.get(taskId)
+      taskData.map { t =>
+        if (!t.taskInfo.finished) {
+          updateAggregateMetrics(stageData, executorMetricsUpdate.execId, 
taskMetrics,
+            t.taskMetrics)
+
+          // Overwrite task metrics
+          t.taskMetrics = Some(taskMetrics)
+        }
+      }
     }
-  }  // end of onTaskEnd
+  }
 
   override def onEnvironmentUpdate(environmentUpdate: 
SparkListenerEnvironmentUpdate) {
     synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index be11a11..2f96f79 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -55,8 +55,11 @@ private[jobs] object UIData {
     var executorSummary = new HashMap[String, ExecutorSummary]
   }
 
+  /**
+   * These are kept mutable and reused throughout a task's lifetime to avoid 
excessive reallocation.
+   */
   case class TaskUIData(
-      taskInfo: TaskInfo,
-      taskMetrics: Option[TaskMetrics] = None,
-      errorMessage: Option[String] = None)
+      var taskInfo: TaskInfo,
+      var taskMetrics: Option[TaskMetrics] = None,
+      var errorMessage: Option[String] = None)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala 
b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 9930c71..feafd65 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -18,13 +18,16 @@
 package org.apache.spark.util
 
 import scala.collection.JavaConversions.mapAsJavaMap
+import scala.concurrent.Await
 import scala.concurrent.duration.{Duration, FiniteDuration}
 
-import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem}
+import akka.pattern.ask
+
 import com.typesafe.config.ConfigFactory
 import org.apache.log4j.{Level, Logger}
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
 
 /**
  * Various utility classes for working with Akka.
@@ -124,4 +127,63 @@ private[spark] object AkkaUtils extends Logging {
 
   /** Space reserved for extra data in an Akka message besides serialized task 
or task result. */
   val reservedSizeBytes = 200 * 1024
+
+  /** Returns the configured number of times to retry connecting */
+  def numRetries(conf: SparkConf): Int = {
+    conf.getInt("spark.akka.num.retries", 3)
+  }
+
+  /** Returns the configured number of milliseconds to wait on each retry */
+  def retryWaitMs(conf: SparkConf): Int = {
+    conf.getInt("spark.akka.retry.wait", 3000)
+  }
+
+  /**
+   * Send a message to the given actor and get its result within a default 
timeout, or
+   * throw a SparkException if this fails.
+   */
+  def askWithReply[T](
+      message: Any,
+      actor: ActorRef,
+      retryAttempts: Int,
+      retryInterval: Int,
+      timeout: FiniteDuration): T = {
+    // TODO: Consider removing multiple attempts
+    if (actor == null) {
+      throw new SparkException("Error sending message as driverActor is null " 
+
+        "[message = " + message + "]")
+    }
+    var attempts = 0
+    var lastException: Exception = null
+    while (attempts < retryAttempts) {
+      attempts += 1
+      try {
+        val future = actor.ask(message)(timeout)
+        val result = Await.result(future, timeout)
+        if (result == null) {
+          throw new SparkException("Actor returned null")
+        }
+        return result.asInstanceOf[T]
+      } catch {
+        case ie: InterruptedException => throw ie
+        case e: Exception =>
+          lastException = e
+          logWarning("Error sending message in " + attempts + " attempts", e)
+      }
+      Thread.sleep(retryInterval)
+    }
+
+    throw new SparkException(
+      "Error sending message [message = " + message + "]", lastException)
+  }
+
+  def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): 
ActorRef = {
+    val driverHost: String = conf.get("spark.driver.host", "localhost")
+    val driverPort: Int = conf.getInt("spark.driver.port", 7077)
+    Utils.checkHost(driverHost, "Expected hostname")
+    val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    logInfo(s"Connecting to $name: $url")
+    Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 4b727e5..495a0d4 100644
--- 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import org.scalatest.{FunSuite, PrivateMethodTester}
+import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester}
 
 import org.apache.spark.scheduler.{TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, 
SparkDeploySchedulerBackend}
@@ -25,12 +25,12 @@ import 
org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
 import org.apache.spark.scheduler.local.LocalBackend
 
 class SparkContextSchedulerCreationSuite
-  extends FunSuite with PrivateMethodTester with LocalSparkContext with 
Logging {
+  extends FunSuite with PrivateMethodTester with Logging with 
BeforeAndAfterEach {
 
   def createTaskScheduler(master: String): TaskSchedulerImpl = {
     // Create local SparkContext to setup a SparkEnv. We don't actually want 
to start() the
     // real schedulers, so we don't want to create a full SparkContext with 
the desired scheduler.
-    sc = new SparkContext("local", "test")
+    val sc = new SparkContext("local", "test")
     val createTaskSchedulerMethod = 
PrivateMethod[TaskScheduler]('createTaskScheduler)
     val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, 
master)
     sched.asInstanceOf[TaskSchedulerImpl]

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 9021662..0ce13d0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
 import org.apache.spark.util.CallSite
+import org.apache.spark.executor.TaskMetrics
 
 class BuggyDAGEventProcessActor extends Actor {
   val state = 0
@@ -77,6 +78,8 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
     override def schedulingMode: SchedulingMode = SchedulingMode.NONE
     override def start() = {}
     override def stop() = {}
+    override def executorHeartbeatReceived(execId: String, taskMetrics: 
Array[(Long, TaskMetrics)],
+      blockManagerId: BlockManagerId): Boolean = true
     override def submitTasks(taskSet: TaskSet) = {
       // normally done by TaskSetManager
       taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
@@ -342,6 +345,8 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
       }
       override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
       override def defaultParallelism() = 2
+      override def executorHeartbeatReceived(execId: String, taskMetrics: 
Array[(Long, TaskMetrics)],
+        blockManagerId: BlockManagerId): Boolean = true
     }
     val noKillScheduler = new DAGScheduler(
       sc,

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 58ea0cc..0ac0269 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -19,22 +19,28 @@ package org.apache.spark.storage
 
 import java.nio.{ByteBuffer, MappedByteBuffer}
 import java.util.Arrays
+import java.util.concurrent.TimeUnit
 
 import akka.actor._
+import akka.pattern.ask
+import akka.util.Timeout
+
 import org.mockito.Mockito.{mock, when}
 import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.Timeouts._
 import org.scalatest.Matchers
-import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
 import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, 
Utils}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration._
 import scala.language.implicitConversions
 import scala.language.postfixOps
 
@@ -73,7 +79,6 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
     oldArch = System.setProperty("os.arch", "amd64")
     conf.set("os.arch", "amd64")
     conf.set("spark.test.useCompressedOops", "true")
-    conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
     conf.set("spark.driver.port", boundPort.toString)
     conf.set("spark.storage.unrollFraction", "0.4")
     conf.set("spark.storage.unrollMemoryThreshold", "512")
@@ -341,7 +346,6 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
   }
 
   test("reregistration on heart beat") {
-    val heartBeat = PrivateMethod[Unit]('heartBeat)
     store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
 
@@ -353,13 +357,15 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
     master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from 
master")
 
-    store invokePrivate heartBeat()
-    assert(master.getLocations("a1").size > 0, "a1 was not reregistered with 
master")
+    implicit val timeout = Timeout(30, TimeUnit.SECONDS)
+    val reregister = !Await.result(
+      master.driverActor ? BlockManagerHeartbeat(store.blockManagerId),
+      timeout.duration).asInstanceOf[Boolean]
+    assert(reregister == true)
   }
 
   test("reregistration on block update") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
 
@@ -377,7 +383,6 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
   }
 
   test("reregistration doesn't dead lock") {
-    val heartBeat = PrivateMethod[Unit]('heartBeat)
     store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
     val a2 = List(new Array[Byte](400))
@@ -397,7 +402,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
       }
       val t3 = new Thread {
         override def run() {
-          store invokePrivate heartBeat()
+          store.reregister()
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 86a271e..cb82525 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -21,7 +21,8 @@ import org.scalatest.FunSuite
 import org.scalatest.Matchers
 
 import org.apache.spark._
-import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
+import org.apache.spark.{LocalSparkContext, SparkConf, Success}
+import org.apache.spark.executor.{ShuffleWriteMetrics, ShuffleReadMetrics, 
TaskMetrics}
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
@@ -129,4 +130,87 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1)
     assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
   }
+
+  test("test update metrics") {
+    val conf = new SparkConf()
+    val listener = new JobProgressListener(conf)
+
+    val taskType = Utils.getFormattedClassName(new ShuffleMapTask(0))
+    val execId = "exe-1"
+
+    def makeTaskMetrics(base: Int) = {
+      val taskMetrics = new TaskMetrics()
+      val shuffleReadMetrics = new ShuffleReadMetrics()
+      val shuffleWriteMetrics = new ShuffleWriteMetrics()
+      taskMetrics.updateShuffleReadMetrics(shuffleReadMetrics)
+      taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
+      shuffleReadMetrics.remoteBytesRead = base + 1
+      shuffleReadMetrics.remoteBlocksFetched = base + 2
+      shuffleWriteMetrics.shuffleBytesWritten = base + 3
+      taskMetrics.executorRunTime = base + 4
+      taskMetrics.diskBytesSpilled = base + 5
+      taskMetrics.memoryBytesSpilled = base + 6
+      taskMetrics
+    }
+
+    def makeTaskInfo(taskId: Long, finishTime: Int = 0) = {
+      val taskInfo = new TaskInfo(taskId, 0, 1, 0L, execId, "host1", 
TaskLocality.NODE_LOCAL,
+        false)
+      taskInfo.finishTime = finishTime
+      taskInfo
+    }
+
+    listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1234L)))
+    listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1235L)))
+    listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1236L)))
+    listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1237L)))
+
+    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, 
Array(
+      (1234L, 0, makeTaskMetrics(0)),
+      (1235L, 0, makeTaskMetrics(100)),
+      (1236L, 1, makeTaskMetrics(200)))))
+
+    var stage0Data = listener.stageIdToData.get(0).get
+    var stage1Data = listener.stageIdToData.get(1).get
+    assert(stage0Data.shuffleReadBytes == 102)
+    assert(stage1Data.shuffleReadBytes == 201)
+    assert(stage0Data.shuffleWriteBytes == 106)
+    assert(stage1Data.shuffleWriteBytes == 203)
+    assert(stage0Data.executorRunTime == 108)
+    assert(stage1Data.executorRunTime == 204)
+    assert(stage0Data.diskBytesSpilled == 110)
+    assert(stage1Data.diskBytesSpilled == 205)
+    assert(stage0Data.memoryBytesSpilled == 112)
+    assert(stage1Data.memoryBytesSpilled == 206)
+    
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
+      .totalBlocksFetched == 2)
+    
assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
+      .totalBlocksFetched == 102)
+    
assert(stage1Data.taskData.get(1236L).get.taskMetrics.get.shuffleReadMetrics.get
+      .totalBlocksFetched == 202)
+
+    // task that was included in a heartbeat
+    listener.onTaskEnd(SparkListenerTaskEnd(0, taskType, Success, 
makeTaskInfo(1234L, 1),
+      makeTaskMetrics(300)))
+    // task that wasn't included in a heartbeat
+    listener.onTaskEnd(SparkListenerTaskEnd(1, taskType, Success, 
makeTaskInfo(1237L, 1),
+      makeTaskMetrics(400)))
+
+    stage0Data = listener.stageIdToData.get(0).get
+    stage1Data = listener.stageIdToData.get(1).get
+    assert(stage0Data.shuffleReadBytes == 402)
+    assert(stage1Data.shuffleReadBytes == 602)
+    assert(stage0Data.shuffleWriteBytes == 406)
+    assert(stage1Data.shuffleWriteBytes == 606)
+    assert(stage0Data.executorRunTime == 408)
+    assert(stage1Data.executorRunTime == 608)
+    assert(stage0Data.diskBytesSpilled == 410)
+    assert(stage1Data.diskBytesSpilled == 610)
+    assert(stage0Data.memoryBytesSpilled == 412)
+    assert(stage1Data.memoryBytesSpilled == 612)
+    
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
+      .totalBlocksFetched == 302)
+    
assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get
+      .totalBlocksFetched == 402)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d338f64/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index ea69057..2a71d7b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -541,6 +541,13 @@ Apart from these, the following properties are also 
available, and may be useful
     output directories. We recommend that users do not disable this except if 
trying to achieve compatibility with
     previous versions of Spark. Simply use Hadoop's FileSystem API to delete 
output directories by hand.</td>
 </tr>
+<tr>
+    <td>spark.executor.heartbeatInterval</td>
+    <td>10000</td>
+    <td>Interval (milliseconds) between each executor's heartbeats to the 
driver.  Heartbeats let
+    the driver know that the executor is still alive and update it with 
metrics for in-progress
+    tasks.</td>
+</tr>
 </table>
 
 #### Networking

Reply via email to