Repository: spark
Updated Branches:
  refs/heads/master b3ec51bfd -> fb60bec34


[SPARK-2298] Encode stage attempt in SparkListener & UI.

Simple way to reproduce this in the UI:

```scala
val f = new java.io.File("/tmp/test")
f.delete()
sc.parallelize(1 to 2, 2).map(x => (x,x 
)).repartition(3).mapPartitionsWithContext { case (context, iter) =>
  if (context.partitionId == 0) {
    val f = new java.io.File("/tmp/test")
    if (!f.exists) {
      f.mkdir()
      System.exit(0);
    }
  }
  iter
}.count()
```

Author: Reynold Xin <r...@apache.org>

Closes #1545 from rxin/stage-attempt and squashes the following commits:

3ee1d2a [Reynold Xin] - Rename attempt to retry in UI. - Properly report stage 
failure in FetchFailed.
40a6bd5 [Reynold Xin] Updated test suites.
c414c36 [Reynold Xin] Fixed the hanging in JobCancellationSuite.
b3e2eed [Reynold Xin] Oops previous code didn't compile.
0f36075 [Reynold Xin] Mark unknown stage attempt with id -1 and drop that in 
JobProgressListener.
6c08b07 [Reynold Xin] Addressed code review feedback.
4e5faa2 [Reynold Xin] [SPARK-2298] Encode stage attempt in SparkListener & UI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb60bec3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb60bec3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb60bec3

Branch: refs/heads/master
Commit: fb60bec34e0b20ae95b4b865a79744916e0a5737
Parents: b3ec51b
Author: Reynold Xin <r...@apache.org>
Authored: Wed Aug 20 15:37:27 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Wed Aug 20 15:37:27 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  77 +--
 .../apache/spark/scheduler/SparkListener.scala  |  11 +-
 .../org/apache/spark/scheduler/Stage.scala      |   8 +-
 .../org/apache/spark/scheduler/StageInfo.scala  |  11 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |   8 +-
 .../org/apache/spark/scheduler/TaskSet.scala    |   4 -
 .../apache/spark/ui/jobs/ExecutorTable.scala    |   6 +-
 .../spark/ui/jobs/JobProgressListener.scala     |  40 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |  11 +-
 .../org/apache/spark/ui/jobs/StageTable.scala   |  14 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  12 +-
 .../storage/StorageStatusListenerSuite.scala    |  17 +-
 .../ui/jobs/JobProgressListenerSuite.scala      |  68 +--
 .../spark/ui/storage/StorageTabSuite.scala      |  16 +-
 .../apache/spark/util/JsonProtocolSuite.scala   | 476 +++++++++++++++----
 15 files changed, 555 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b86cfbf..3413198 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -164,7 +164,7 @@ class DAGScheduler(
    */
   def executorHeartbeatReceived(
       execId: String,
-      taskMetrics: Array[(Long, Int, TaskMetrics)], // (taskId, stageId, 
metrics)
+      taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, 
stateAttempt, metrics)
       blockManagerId: BlockManagerId): Boolean = {
     listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
     implicit val timeout = Timeout(600 seconds)
@@ -677,7 +677,10 @@ class DAGScheduler(
   }
 
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
-    listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
+    // Note that there is a chance that this task is launched after the stage 
is cancelled.
+    // In that case, we wouldn't have the stage anymore in stageIdToStage.
+    val stageAttemptId = 
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
+    listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
     submitWaitingStages()
   }
 
@@ -695,8 +698,8 @@ class DAGScheduler(
       // is in the process of getting stopped.
       val stageFailedMessage = "Stage cancelled because SparkContext was shut 
down"
       runningStages.foreach { stage =>
-        stage.info.stageFailed(stageFailedMessage)
-        listenerBus.post(SparkListenerStageCompleted(stage.info))
+        stage.latestInfo.stageFailed(stageFailedMessage)
+        listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
       }
       listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
     }
@@ -781,7 +784,16 @@ class DAGScheduler(
     logDebug("submitMissingTasks(" + stage + ")")
     // Get our pending tasks and remember them in our pendingTasks entry
     stage.pendingTasks.clear()
-    var tasks = ArrayBuffer[Task[_]]()
+
+    // First figure out the indexes of partition ids to compute.
+    val partitionsToCompute: Seq[Int] = {
+      if (stage.isShuffleMap) {
+        (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
+      } else {
+        val job = stage.resultOfJob.get
+        (0 until job.numPartitions).filter(id => !job.finished(id))
+      }
+    }
 
     val properties = if (jobIdToActiveJob.contains(jobId)) {
       jobIdToActiveJob(stage.jobId).properties
@@ -795,7 +807,8 @@ class DAGScheduler(
     // serializable. If tasks are not serializable, a 
SparkListenerStageCompleted event
     // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
     // event.
-    listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))
+    stage.latestInfo = StageInfo.fromStage(stage, 
Some(partitionsToCompute.size))
+    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
 
     // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it 
multiple times.
     // Broadcasted binary for the task, used to dispatch tasks to executors. 
Note that we broadcast
@@ -826,20 +839,19 @@ class DAGScheduler(
         return
     }
 
-    if (stage.isShuffleMap) {
-      for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
-        val locs = getPreferredLocs(stage.rdd, p)
-        val part = stage.rdd.partitions(p)
-        tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs)
+    val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
+      partitionsToCompute.map { id =>
+        val locs = getPreferredLocs(stage.rdd, id)
+        val part = stage.rdd.partitions(id)
+        new ShuffleMapTask(stage.id, taskBinary, part, locs)
       }
     } else {
-      // This is a final stage; figure out its job's missing partitions
       val job = stage.resultOfJob.get
-      for (id <- 0 until job.numPartitions if !job.finished(id)) {
+      partitionsToCompute.map { id =>
         val p: Int = job.partitions(id)
         val part = stage.rdd.partitions(p)
         val locs = getPreferredLocs(stage.rdd, p)
-        tasks += new ResultTask(stage.id, taskBinary, part, locs, id)
+        new ResultTask(stage.id, taskBinary, part, locs, id)
       }
     }
 
@@ -869,11 +881,11 @@ class DAGScheduler(
       logDebug("New pending tasks: " + stage.pendingTasks)
       taskScheduler.submitTasks(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), 
stage.jobId, properties))
-      stage.info.submissionTime = Some(clock.getTime())
+      stage.latestInfo.submissionTime = Some(clock.getTime())
     } else {
       // Because we posted SparkListenerStageSubmitted earlier, we should post
       // SparkListenerStageCompleted here in case there are no tasks to run.
-      listenerBus.post(SparkListenerStageCompleted(stage.info))
+      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
       logDebug("Stage " + stage + " is actually done; %b %d %d".format(
         stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
       runningStages -= stage
@@ -892,8 +904,9 @@ class DAGScheduler(
     // The success case is dealt with separately below, since we need to 
compute accumulator
     // updates before posting.
     if (event.reason != Success) {
-      listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, 
event.taskInfo,
-        event.taskMetrics))
+      val attemptId = 
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
+      listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, 
event.reason,
+        event.taskInfo, event.taskMetrics))
     }
 
     if (!stageIdToStage.contains(task.stageId)) {
@@ -902,14 +915,19 @@ class DAGScheduler(
     }
     val stage = stageIdToStage(task.stageId)
 
-    def markStageAsFinished(stage: Stage) = {
-      val serviceTime = stage.info.submissionTime match {
+    def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) 
= {
+      val serviceTime = stage.latestInfo.submissionTime match {
         case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
         case _ => "Unknown"
       }
-      logInfo("%s (%s) finished in %s s".format(stage, stage.name, 
serviceTime))
-      stage.info.completionTime = Some(clock.getTime())
-      listenerBus.post(SparkListenerStageCompleted(stage.info))
+      if (errorMessage.isEmpty) {
+        logInfo("%s (%s) finished in %s s".format(stage, stage.name, 
serviceTime))
+        stage.latestInfo.completionTime = Some(clock.getTime())
+      } else {
+        stage.latestInfo.stageFailed(errorMessage.get)
+        logInfo("%s (%s) failed in %s s".format(stage, stage.name, 
serviceTime))
+      }
+      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
       runningStages -= stage
     }
     event.reason match {
@@ -924,7 +942,7 @@ class DAGScheduler(
                 val name = acc.name.get
                 val stringPartialValue = 
Accumulators.stringifyPartialValue(partialValue)
                 val stringValue = Accumulators.stringifyValue(acc.value)
-                stage.info.accumulables(id) = AccumulableInfo(id, name, 
stringValue)
+                stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, 
stringValue)
                 event.taskInfo.accumulables +=
                   AccumulableInfo(id, name, Some(stringPartialValue), 
stringValue)
               }
@@ -935,8 +953,8 @@ class DAGScheduler(
               logError(s"Failed to update accumulators for $task", e)
           }
         }
-        listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, 
event.taskInfo,
-          event.taskMetrics))
+        listenerBus.post(SparkListenerTaskEnd(stageId, 
stage.latestInfo.attemptId, taskType,
+          event.reason, event.taskInfo, event.taskMetrics))
         stage.pendingTasks -= task
         task match {
           case rt: ResultTask[_, _] =>
@@ -1029,6 +1047,7 @@ class DAGScheduler(
       case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
         // Mark the stage that the reducer was in as unrunnable
         val failedStage = stageIdToStage(task.stageId)
+        markStageAsFinished(failedStage, Some("Fetch failure"))
         runningStages -= failedStage
         // TODO: Cancel running tasks in the stage
         logInfo("Marking " + failedStage + " (" + failedStage.name +
@@ -1142,7 +1161,7 @@ class DAGScheduler(
     }
     val dependentJobs: Seq[ActiveJob] =
       activeJobs.filter(job => stageDependsOn(job.finalStage, 
failedStage)).toSeq
-    failedStage.info.completionTime = Some(clock.getTime())
+    failedStage.latestInfo.completionTime = Some(clock.getTime())
     for (job <- dependentJobs) {
       failJobAndIndependentStages(job, s"Job aborted due to stage failure: 
$reason")
     }
@@ -1182,8 +1201,8 @@ class DAGScheduler(
           if (runningStages.contains(stage)) {
             try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
               taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-              stage.info.stageFailed(failureReason)
-              listenerBus.post(SparkListenerStageCompleted(stage.info))
+              stage.latestInfo.stageFailed(failureReason)
+              listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
             } catch {
               case e: UnsupportedOperationException =>
                 logInfo(s"Could not cancel tasks for stage $stageId", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index d01d318..86ca844 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -39,7 +39,8 @@ case class SparkListenerStageSubmitted(stageInfo: StageInfo, 
properties: Propert
 case class SparkListenerStageCompleted(stageInfo: StageInfo) extends 
SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends 
SparkListenerEvent
+case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: 
TaskInfo)
+  extends SparkListenerEvent
 
 @DeveloperApi
 case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends 
SparkListenerEvent
@@ -47,6 +48,7 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) 
extends SparkListe
 @DeveloperApi
 case class SparkListenerTaskEnd(
     stageId: Int,
+    stageAttemptId: Int,
     taskType: String,
     reason: TaskEndReason,
     taskInfo: TaskInfo,
@@ -75,10 +77,15 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: 
BlockManagerId)
 @DeveloperApi
 case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
 
+/**
+ * Periodic updates from executors.
+ * @param execId executor id
+ * @param taskMetrics sequence of (task id, stage id, stage attempt, metrics)
+ */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
     execId: String,
-    taskMetrics: Seq[(Long, Int, TaskMetrics)])
+    taskMetrics: Seq[(Long, Int, Int, TaskMetrics)])
   extends SparkListenerEvent
 
 @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 8009054..071568c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -43,6 +43,9 @@ import org.apache.spark.util.CallSite
  * stage, the callSite gives the user code that created the RDD being 
shuffled. For a result
  * stage, the callSite gives the user code that executes the associated action 
(e.g. count()).
  *
+ * A single stage can consist of multiple attempts. In that case, the 
latestInfo field will
+ * be updated for each attempt.
+ *
  */
 private[spark] class Stage(
     val id: Int,
@@ -71,8 +74,8 @@ private[spark] class Stage(
   val name = callSite.shortForm
   val details = callSite.longForm
 
-  /** Pointer to the [StageInfo] object, set by DAGScheduler. */
-  var info: StageInfo = StageInfo.fromStage(this)
+  /** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
+  var latestInfo: StageInfo = StageInfo.fromStage(this)
 
   def isAvailable: Boolean = {
     if (!isShuffleMap) {
@@ -116,6 +119,7 @@ private[spark] class Stage(
     }
   }
 
+  /** Return a new attempt id, starting with 0. */
   def newAttemptId(): Int = {
     val id = nextAttemptId
     nextAttemptId += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 2a407e4..c6dc336 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -29,6 +29,7 @@ import org.apache.spark.storage.RDDInfo
 @DeveloperApi
 class StageInfo(
     val stageId: Int,
+    val attemptId: Int,
     val name: String,
     val numTasks: Int,
     val rddInfos: Seq[RDDInfo],
@@ -56,9 +57,15 @@ private[spark] object StageInfo {
    * shuffle dependencies. Therefore, all ancestor RDDs related to this 
Stage's RDD through a
    * sequence of narrow dependencies should also be associated with this Stage.
    */
-  def fromStage(stage: Stage): StageInfo = {
+  def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = {
     val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
     val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
-    new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, 
stage.details)
+    new StageInfo(
+      stage.id,
+      stage.attemptId,
+      stage.name,
+      numTasks.getOrElse(stage.numTasks),
+      rddInfos,
+      stage.details)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 6c0d1b2..ad051e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -333,12 +333,12 @@ private[spark] class TaskSchedulerImpl(
       execId: String,
       taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
       blockManagerId: BlockManagerId): Boolean = {
-    val metricsWithStageIds = taskMetrics.flatMap {
-      case (id, metrics) => {
+
+    val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = 
synchronized {
+      taskMetrics.flatMap { case (id, metrics) =>
         taskIdToTaskSetId.get(id)
           .flatMap(activeTaskSets.get)
-          .map(_.stageId)
-          .map(x => (id, x, metrics))
+          .map(taskSetMgr => (id, taskSetMgr.stageId, 
taskSetMgr.taskSet.attempt, metrics))
       }
     }
     dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, 
blockManagerId)

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 613fa78..c3ad325 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -31,9 +31,5 @@ private[spark] class TaskSet(
     val properties: Properties) {
     val id: String = stageId + "." + attempt
 
-  def kill(interruptThread: Boolean) {
-    tasks.foreach(_.kill(interruptThread))
-  }
-
   override def toString: String = "TaskSet " + id
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 0cc51c8..2987dc0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -24,8 +24,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils}
 import org.apache.spark.ui.jobs.UIData.StageUIData
 import org.apache.spark.util.Utils
 
-/** Page showing executor summary */
-private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
+/** Stage summary grouped by executors. */
+private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: 
JobProgressTab) {
   private val listener = parent.listener
 
   def toNodeSeq: Seq[Node] = {
@@ -65,7 +65,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: 
JobProgressTab) {
       executorIdToAddress.put(executorId, address)
     }
 
-    listener.stageIdToData.get(stageId) match {
+    listener.stageIdToData.get((stageId, stageAttemptId)) match {
       case Some(stageData: StageUIData) =>
         stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
           <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 74cd637..f7f918f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -43,12 +43,16 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   // How many stages to remember
   val retainedStages = conf.getInt("spark.ui.retainedStages", 
DEFAULT_RETAINED_STAGES)
 
-  val activeStages = HashMap[Int, StageInfo]()
+  // Map from stageId to StageInfo
+  val activeStages = new HashMap[Int, StageInfo]
+
+  // Map from (stageId, attemptId) to StageUIData
+  val stageIdToData = new HashMap[(Int, Int), StageUIData]
+
   val completedStages = ListBuffer[StageInfo]()
   val failedStages = ListBuffer[StageInfo]()
 
-  val stageIdToData = new HashMap[Int, StageUIData]
-
+  // Map from pool name to a hash map (map from stage id to StageInfo).
   val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
 
   val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
@@ -59,9 +63,8 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
 
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = 
synchronized {
     val stage = stageCompleted.stageInfo
-    val stageId = stage.stageId
-    val stageData = stageIdToData.getOrElseUpdate(stageId, {
-      logWarning("Stage completed for unknown stage " + stageId)
+    val stageData = stageIdToData.getOrElseUpdate((stage.stageId, 
stage.attemptId), {
+      logWarning("Stage completed for unknown stage " + stage.stageId)
       new StageUIData
     })
 
@@ -69,8 +72,10 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
       stageData.accumulables(id) = info
     }
 
-    poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
-    activeStages.remove(stageId)
+    poolToActiveStages.get(stageData.schedulingPool).foreach { hashMap =>
+      hashMap.remove(stage.stageId)
+    }
+    activeStages.remove(stage.stageId)
     if (stage.failureReason.isEmpty) {
       completedStages += stage
       trimIfNecessary(completedStages)
@@ -84,7 +89,7 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
     if (stages.size > retainedStages) {
       val toRemove = math.max(retainedStages / 10, 1)
-      stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) }
+      stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, 
s.attemptId)) }
       stages.trimStart(toRemove)
     }
   }
@@ -98,21 +103,21 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
       p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
     }.getOrElse(DEFAULT_POOL_NAME)
 
-    val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new 
StageUIData)
+    val stageData = stageIdToData.getOrElseUpdate((stage.stageId, 
stage.attemptId), new StageUIData)
     stageData.schedulingPool = poolName
 
     stageData.description = Option(stageSubmitted.properties).flatMap {
       p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
     }
 
-    val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, 
StageInfo]())
+    val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, 
StageInfo])
     stages(stage.stageId) = stage
   }
 
   override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
     val taskInfo = taskStart.taskInfo
     if (taskInfo != null) {
-      val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, {
+      val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, 
taskStart.stageAttemptId), {
         logWarning("Task start for unknown stage " + taskStart.stageId)
         new StageUIData
       })
@@ -128,8 +133,11 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
     val info = taskEnd.taskInfo
-    if (info != null) {
-      val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
+    // If stage attempt id is -1, it means the DAGScheduler had no idea which 
attempt this task
+    // compeletion event is for. Let's just drop it here. This means we might 
have some speculation
+    // tasks on the web ui that's never marked as complete.
+    if (info != null && taskEnd.stageAttemptId != -1) {
+      val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, 
taskEnd.stageAttemptId), {
         logWarning("Task end for unknown stage " + taskEnd.stageId)
         new StageUIData
       })
@@ -222,8 +230,8 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   }
 
   override def onExecutorMetricsUpdate(executorMetricsUpdate: 
SparkListenerExecutorMetricsUpdate) {
-    for ((taskId, sid, taskMetrics) <- executorMetricsUpdate.taskMetrics) {
-      val stageData = stageIdToData.getOrElseUpdate(sid, {
+    for ((taskId, sid, sAttempt, taskMetrics) <- 
executorMetricsUpdate.taskMetrics) {
+      val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), {
         logWarning("Metrics update for task in unknown stage " + sid)
         new StageUIData
       })

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index d4eb027..db01be5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -34,7 +34,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends 
WebUIPage("stage") {
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
       val stageId = request.getParameter("id").toInt
-      val stageDataOption = listener.stageIdToData.get(stageId)
+      val stageAttemptId = request.getParameter("attempt").toInt
+      val stageDataOption = listener.stageIdToData.get((stageId, 
stageAttemptId))
 
       if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
         val content =
@@ -42,14 +43,15 @@ private[ui] class StagePage(parent: JobProgressTab) extends 
WebUIPage("stage") {
             <h4>Summary Metrics</h4> No tasks have started yet
             <h4>Tasks</h4> No tasks have started yet
           </div>
-        return UIUtils.headerSparkPage("Details for Stage %s".format(stageId), 
content, parent)
+        return UIUtils.headerSparkPage(
+          s"Details for Stage $stageId (Attempt $stageAttemptId)", content, 
parent)
       }
 
       val stageData = stageDataOption.get
       val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
 
       val numCompleted = tasks.count(_.taskInfo.finished)
-      val accumulables = listener.stageIdToData(stageId).accumulables
+      val accumulables = listener.stageIdToData((stageId, 
stageAttemptId)).accumulables
       val hasInput = stageData.inputBytes > 0
       val hasShuffleRead = stageData.shuffleReadBytes > 0
       val hasShuffleWrite = stageData.shuffleWriteBytes > 0
@@ -211,7 +213,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends 
WebUIPage("stage") {
           def quantileRow(data: Seq[Node]): Seq[Node] = <tr>{data}</tr>
           Some(UIUtils.listingTable(quantileHeaders, quantileRow, listings, 
fixedWidth = true))
         }
-      val executorTable = new ExecutorTable(stageId, parent)
+
+      val executorTable = new ExecutorTable(stageId, stageAttemptId, parent)
 
       val maybeAccumulableTable: Seq[Node] =
         if (accumulables.size > 0) { <h4>Accumulators</h4> ++ accumulableTable 
} else Seq()

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 16ad0df..2e67310 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -97,8 +97,8 @@ private[ui] class StageTableBase(
     }
     // scalastyle:on
 
-    val nameLinkUri ="%s/stages/stage?id=%s"
-      .format(UIUtils.prependBaseUri(parent.basePath), s.stageId)
+    val nameLinkUri ="%s/stages/stage?id=%s&attempt=%s"
+      .format(UIUtils.prependBaseUri(parent.basePath), s.stageId, s.attemptId)
     val nameLink = <a href={nameLinkUri}>{s.name}</a>
 
     val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0)
@@ -121,7 +121,7 @@ private[ui] class StageTableBase(
     }
 
     val stageDesc = for {
-      stageData <- listener.stageIdToData.get(s.stageId)
+      stageData <- listener.stageIdToData.get((s.stageId, s.attemptId))
       desc <- stageData.description
     } yield {
       <div><em>{desc}</em></div>
@@ -131,7 +131,7 @@ private[ui] class StageTableBase(
   }
 
   protected def stageRow(s: StageInfo): Seq[Node] = {
-    val stageDataOption = listener.stageIdToData.get(s.stageId)
+    val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId))
     if (stageDataOption.isEmpty) {
       return <td>{s.stageId}</td><td>No data available for this stage</td>
     }
@@ -154,7 +154,11 @@ private[ui] class StageTableBase(
     val shuffleWrite = stageData.shuffleWriteBytes
     val shuffleWriteWithUnit = if (shuffleWrite > 0) 
Utils.bytesToString(shuffleWrite) else ""
 
-    <td>{s.stageId}</td> ++
+    {if (s.attemptId > 0) {
+      <td>{s.stageId} (retry {s.attemptId})</td>
+    } else {
+      <td>{s.stageId}</td>
+    }} ++
     {if (isFairScheduler) {
       <td>
         <a href={"%s/stages/pool?poolname=%s"

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 1e18ec6..db73847 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -96,6 +96,7 @@ private[spark] object JsonProtocol {
     val taskInfo = taskStart.taskInfo
     ("Event" -> Utils.getFormattedClassName(taskStart)) ~
     ("Stage ID" -> taskStart.stageId) ~
+    ("Stage Attempt ID" -> taskStart.stageAttemptId) ~
     ("Task Info" -> taskInfoToJson(taskInfo))
   }
 
@@ -112,6 +113,7 @@ private[spark] object JsonProtocol {
     val taskMetricsJson = if (taskMetrics != null) 
taskMetricsToJson(taskMetrics) else JNothing
     ("Event" -> Utils.getFormattedClassName(taskEnd)) ~
     ("Stage ID" -> taskEnd.stageId) ~
+    ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
     ("Task Type" -> taskEnd.taskType) ~
     ("Task End Reason" -> taskEndReason) ~
     ("Task Info" -> taskInfoToJson(taskInfo)) ~
@@ -187,6 +189,7 @@ private[spark] object JsonProtocol {
     val completionTime = 
stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
     val failureReason = 
stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
     ("Stage ID" -> stageInfo.stageId) ~
+    ("Stage Attempt ID" -> stageInfo.attemptId) ~
     ("Stage Name" -> stageInfo.name) ~
     ("Number of Tasks" -> stageInfo.numTasks) ~
     ("RDD Info" -> rddInfo) ~
@@ -419,8 +422,9 @@ private[spark] object JsonProtocol {
 
   def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
     val stageId = (json \ "Stage ID").extract[Int]
+    val stageAttemptId = (json \ "Stage Attempt 
ID").extractOpt[Int].getOrElse(0)
     val taskInfo = taskInfoFromJson(json \ "Task Info")
-    SparkListenerTaskStart(stageId, taskInfo)
+    SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
   }
 
   def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult 
= {
@@ -430,11 +434,12 @@ private[spark] object JsonProtocol {
 
   def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
     val stageId = (json \ "Stage ID").extract[Int]
+    val stageAttemptId = (json \ "Stage Attempt 
ID").extractOpt[Int].getOrElse(0)
     val taskType = (json \ "Task Type").extract[String]
     val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
     val taskInfo = taskInfoFromJson(json \ "Task Info")
     val taskMetrics = taskMetricsFromJson(json \ "Task Metrics")
-    SparkListenerTaskEnd(stageId, taskType, taskEndReason, taskInfo, 
taskMetrics)
+    SparkListenerTaskEnd(stageId, stageAttemptId, taskType, taskEndReason, 
taskInfo, taskMetrics)
   }
 
   def jobStartFromJson(json: JValue): SparkListenerJobStart = {
@@ -492,6 +497,7 @@ private[spark] object JsonProtocol {
 
   def stageInfoFromJson(json: JValue): StageInfo = {
     val stageId = (json \ "Stage ID").extract[Int]
+    val attemptId = (json \ "Attempt ID").extractOpt[Int].getOrElse(0)
     val stageName = (json \ "Stage Name").extract[String]
     val numTasks = (json \ "Number of Tasks").extract[Int]
     val rddInfos = (json \ "RDD 
Info").extract[List[JValue]].map(rddInfoFromJson(_))
@@ -504,7 +510,7 @@ private[spark] object JsonProtocol {
       case None => Seq[AccumulableInfo]()
     }
 
-    val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, 
details)
+    val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, 
rddInfos, details)
     stageInfo.submissionTime = submissionTime
     stageInfo.completionTime = completionTime
     stageInfo.failureReason = failureReason

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 51fb646..7671cb9 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -69,10 +69,10 @@ class StorageStatusListenerSuite extends FunSuite {
     // Task end with no updated blocks
     assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo1, taskMetrics))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, 
taskInfo1, taskMetrics))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo2, taskMetrics))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, 
taskInfo2, taskMetrics))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
   }
@@ -92,13 +92,13 @@ class StorageStatusListenerSuite extends FunSuite {
     // Task end with new blocks
     assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo1, taskMetrics1))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, 
taskInfo1, taskMetrics1))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
     
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 
1)))
     
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 
2)))
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo2, taskMetrics2))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, 
taskInfo2, taskMetrics2))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
     
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 
1)))
@@ -111,13 +111,14 @@ class StorageStatusListenerSuite extends FunSuite {
     val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 
0L, 0L))
     taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
     taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo1, taskMetrics1))
+
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, 
taskInfo1, taskMetrics1))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
     
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 
1)))
     
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 
2)))
     
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 
0)))
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo2, taskMetrics2))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, 
taskInfo2, taskMetrics2))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
     
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 
1)))
@@ -135,8 +136,8 @@ class StorageStatusListenerSuite extends FunSuite {
     val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 
300L, 0L))
     taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
     taskMetrics2.updatedBlocks = Some(Seq(block3))
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo1, taskMetrics1))
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo1, taskMetrics2))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, 
taskInfo1, taskMetrics1))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, 
taskInfo1, taskMetrics2))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
 
     // Unpersist RDD

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 147ec0b..3370dd4 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -34,12 +34,12 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     val listener = new JobProgressListener(conf)
 
     def createStageStartEvent(stageId: Int) = {
-      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
+      val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
       SparkListenerStageSubmitted(stageInfo)
     }
 
     def createStageEndEvent(stageId: Int) = {
-      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
+      val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
       SparkListenerStageCompleted(stageInfo)
     }
 
@@ -70,33 +70,37 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     taskInfo.finishTime = 1
     var task = new ShuffleMapTask(0)
     val taskType = Utils.getFormattedClassName(task)
-    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, 
taskInfo, taskMetrics))
-    assert(listener.stageIdToData.getOrElse(0, 
fail()).executorSummary.getOrElse("exe-1", fail())
-      .shuffleRead === 1000)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, 
taskMetrics))
+    assert(listener.stageIdToData.getOrElse((0, 0), fail())
+      .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 1000)
 
     // finish a task with unknown executor-id, nothing should happen
     taskInfo =
       new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", 
TaskLocality.NODE_LOCAL, true)
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0)
-    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, 
taskInfo, taskMetrics))
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, 
taskMetrics))
     assert(listener.stageIdToData.size === 1)
 
     // finish this task, should get updated duration
     taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", 
TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0)
-    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, 
taskInfo, taskMetrics))
-    assert(listener.stageIdToData.getOrElse(0, 
fail()).executorSummary.getOrElse("exe-1", fail())
-      .shuffleRead === 2000)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, 
taskMetrics))
+    assert(listener.stageIdToData.getOrElse((0, 0), fail())
+      .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 2000)
 
     // finish this task, should get updated duration
     taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", 
TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0)
-    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, 
taskInfo, taskMetrics))
-    assert(listener.stageIdToData.getOrElse(0, 
fail()).executorSummary.getOrElse("exe-2", fail())
-      .shuffleRead === 1000)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, 
taskMetrics))
+    assert(listener.stageIdToData.getOrElse((0, 0), fail())
+      .executorSummary.getOrElse("exe-2", fail()).shuffleRead === 1000)
   }
 
   test("test task success vs failure counting for different task end reasons") 
{
@@ -119,16 +123,18 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
       UnknownReason)
     var failCount = 0
     for (reason <- taskFailedReasons) {
-      listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, 
taskInfo, metrics))
+      listener.onTaskEnd(
+        SparkListenerTaskEnd(task.stageId, 0, taskType, reason, taskInfo, 
metrics))
       failCount += 1
-      assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0)
-      assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
+      assert(listener.stageIdToData((task.stageId, 0)).numCompleteTasks === 0)
+      assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === 
failCount)
     }
 
     // Make sure we count success as success.
-    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, 
taskInfo, metrics))
-    assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1)
-    assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(task.stageId, 1, taskType, Success, taskInfo, 
metrics))
+    assert(listener.stageIdToData((task.stageId, 1)).numCompleteTasks === 1)
+    assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === 
failCount)
   }
 
   test("test update metrics") {
@@ -163,18 +169,18 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
       taskInfo
     }
 
-    listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1234L)))
-    listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1235L)))
-    listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1236L)))
-    listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1237L)))
+    listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1234L)))
+    listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1235L)))
+    listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L)))
+    listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
 
     
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, 
Array(
-      (1234L, 0, makeTaskMetrics(0)),
-      (1235L, 0, makeTaskMetrics(100)),
-      (1236L, 1, makeTaskMetrics(200)))))
+      (1234L, 0, 0, makeTaskMetrics(0)),
+      (1235L, 0, 0, makeTaskMetrics(100)),
+      (1236L, 1, 0, makeTaskMetrics(200)))))
 
-    var stage0Data = listener.stageIdToData.get(0).get
-    var stage1Data = listener.stageIdToData.get(1).get
+    var stage0Data = listener.stageIdToData.get((0, 0)).get
+    var stage1Data = listener.stageIdToData.get((1, 0)).get
     assert(stage0Data.shuffleReadBytes == 102)
     assert(stage1Data.shuffleReadBytes == 201)
     assert(stage0Data.shuffleWriteBytes == 106)
@@ -195,14 +201,14 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
       .totalBlocksFetched == 202)
 
     // task that was included in a heartbeat
-    listener.onTaskEnd(SparkListenerTaskEnd(0, taskType, Success, 
makeTaskInfo(1234L, 1),
+    listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, 
makeTaskInfo(1234L, 1),
       makeTaskMetrics(300)))
     // task that wasn't included in a heartbeat
-    listener.onTaskEnd(SparkListenerTaskEnd(1, taskType, Success, 
makeTaskInfo(1237L, 1),
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, 
makeTaskInfo(1237L, 1),
       makeTaskMetrics(400)))
 
-    stage0Data = listener.stageIdToData.get(0).get
-    stage1Data = listener.stageIdToData.get(1).get
+    stage0Data = listener.stageIdToData.get((0, 0)).get
+    stage1Data = listener.stageIdToData.get((1, 0)).get
     assert(stage0Data.shuffleReadBytes == 402)
     assert(stage1Data.shuffleReadBytes == 602)
     assert(stage0Data.shuffleWriteBytes == 406)

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 6e68dcb..b860177 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -53,7 +53,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     assert(storageListener.rddInfoList.isEmpty)
 
     // 2 RDDs are known, but none are cached
-    val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0, rddInfo1), 
"details")
+    val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), 
"details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener._rddInfoMap.size === 2)
     assert(storageListener.rddInfoList.isEmpty)
@@ -63,7 +63,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val rddInfo3Cached = rddInfo3
     rddInfo2Cached.numCachedPartitions = 1
     rddInfo3Cached.numCachedPartitions = 1
-    val stageInfo1 = new StageInfo(1, "0", 100, Seq(rddInfo2Cached, 
rddInfo3Cached), "details")
+    val stageInfo1 = new StageInfo(1, 0, "0", 100, Seq(rddInfo2Cached, 
rddInfo3Cached), "details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
     assert(storageListener._rddInfoMap.size === 4)
     assert(storageListener.rddInfoList.size === 2)
@@ -71,7 +71,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     // Submitting RDDInfos with duplicate IDs does nothing
     val rddInfo0Cached = new RDDInfo(0, "freedom", 100, 
StorageLevel.MEMORY_ONLY)
     rddInfo0Cached.numCachedPartitions = 1
-    val stageInfo0Cached = new StageInfo(0, "0", 100, Seq(rddInfo0), "details")
+    val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), 
"details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached))
     assert(storageListener._rddInfoMap.size === 4)
     assert(storageListener.rddInfoList.size === 2)
@@ -87,7 +87,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val rddInfo1Cached = rddInfo1
     rddInfo0Cached.numCachedPartitions = 1
     rddInfo1Cached.numCachedPartitions = 1
-    val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0Cached, 
rddInfo1Cached), "details")
+    val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0Cached, 
rddInfo1Cached), "details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener._rddInfoMap.size === 2)
     assert(storageListener.rddInfoList.size === 2)
@@ -106,7 +106,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val myRddInfo0 = rddInfo0
     val myRddInfo1 = rddInfo1
     val myRddInfo2 = rddInfo2
-    val stageInfo0 = new StageInfo(0, "0", 100, Seq(myRddInfo0, myRddInfo1, 
myRddInfo2), "details")
+    val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, 
myRddInfo2), "details")
     bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener._rddInfoMap.size === 3)
@@ -116,7 +116,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     assert(!storageListener._rddInfoMap(2).isCached)
 
     // Task end with no updated blocks. This should not change anything.
-    bus.postToAll(SparkListenerTaskEnd(0, "obliteration", Success, taskInfo, 
new TaskMetrics))
+    bus.postToAll(SparkListenerTaskEnd(0, 0, "obliteration", Success, 
taskInfo, new TaskMetrics))
     assert(storageListener._rddInfoMap.size === 3)
     assert(storageListener.rddInfoList.size === 0)
 
@@ -128,7 +128,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
       (RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)),
       (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L))
     ))
-    bus.postToAll(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo, 
metrics1))
+    bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, 
taskInfo, metrics1))
     assert(storageListener._rddInfoMap(0).memSize === 800L)
     assert(storageListener._rddInfoMap(0).diskSize === 400L)
     assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
@@ -150,7 +150,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
       (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually 
exist
       (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually 
exist
     ))
-    bus.postToAll(SparkListenerTaskEnd(2, "obliteration", Success, taskInfo, 
metrics2))
+    bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, 
taskInfo, metrics2))
     assert(storageListener._rddInfoMap(0).memSize === 400L)
     assert(storageListener._rddInfoMap(0).diskSize === 400L)
     assert(storageListener._rddInfoMap(0).tachyonSize === 200L)

http://git-wip-us.apache.org/repos/asf/spark/blob/fb60bec3/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 97ffb07..2fd3b9c 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -35,13 +35,13 @@ class JsonProtocolSuite extends FunSuite {
     val stageSubmitted =
       SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), 
properties)
     val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 
301, 401L, 501L))
-    val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 
444L, false))
+    val taskStart = SparkListenerTaskStart(111, 0, makeTaskInfo(222L, 333, 1, 
444L, false))
     val taskGettingResult =
       SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
-    val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
+    val taskEnd = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success,
       makeTaskInfo(123L, 234, 67, 345L, false),
       makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = 
false))
-    val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", 
Success,
+    val taskEndWithHadoopInput = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", 
Success,
       makeTaskInfo(123L, 234, 67, 345L, false),
       makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true))
     val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
@@ -397,7 +397,8 @@ class JsonProtocolSuite extends FunSuite {
 
   private def assertJsonStringEquals(json1: String, json2: String) {
     val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "")
-    assert(formatJsonString(json1) === formatJsonString(json2))
+    assert(formatJsonString(json1) === formatJsonString(json2),
+      s"input ${formatJsonString(json1)} got ${formatJsonString(json2)}")
   }
 
   private def assertSeqEquals[T](seq1: Seq[T], seq2: Seq[T], assertEquals: (T, 
T) => Unit) {
@@ -485,7 +486,7 @@ class JsonProtocolSuite extends FunSuite {
 
   private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
     val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, 
d + i, e + i) }
-    val stageInfo = new StageInfo(a, "greetings", b, rddInfos, "details")
+    val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, "details")
     val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
     stageInfo.accumulables(acc1.id) = acc1
     stageInfo.accumulables(acc2.id) = acc2
@@ -558,84 +559,246 @@ class JsonProtocolSuite extends FunSuite {
 
   private val stageSubmittedJsonString =
     """
-      {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage 
ID":100,"Stage Name":
-      "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details",
-      
"Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      
{"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]},"Properties":
-      {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
+      |{
+      |  "Event": "SparkListenerStageSubmitted",
+      |  "Stage Info": {
+      |    "Stage ID": 100,
+      |    "Stage Attempt ID": 0,
+      |    "Stage Name": "greetings",
+      |    "Number of Tasks": 200,
+      |    "RDD Info": [],
+      |    "Details": "details",
+      |    "Accumulables": [
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      }
+      |    ]
+      |  },
+      |  "Properties": {
+      |    "France": "Paris",
+      |    "Germany": "Berlin",
+      |    "Russia": "Moscow",
+      |    "Ukraine": "Kiev"
+      |  }
+      |}
     """
 
   private val stageCompletedJsonString =
     """
-      {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage 
ID":101,"Stage Name":
-      "greetings","Number of Tasks":201,"RDD Info":[{"RDD 
ID":101,"Name":"mayor","Storage
-      Level":{"Use Disk":true,"Use Memory":true,"Use 
Tachyon":false,"Deserialized":true,
-      "Replication":1},"Number of Partitions":201,"Number of Cached 
Partitions":301,
-      "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details",
-      
"Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      {"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]}}
+      |{
+      |  "Event": "SparkListenerStageCompleted",
+      |  "Stage Info": {
+      |    "Stage ID": 101,
+      |    "Stage Attempt ID": 0,
+      |    "Stage Name": "greetings",
+      |    "Number of Tasks": 201,
+      |    "RDD Info": [
+      |      {
+      |        "RDD ID": 101,
+      |        "Name": "mayor",
+      |        "Storage Level": {
+      |          "Use Disk": true,
+      |          "Use Memory": true,
+      |          "Use Tachyon": false,
+      |          "Deserialized": true,
+      |          "Replication": 1
+      |        },
+      |        "Number of Partitions": 201,
+      |        "Number of Cached Partitions": 301,
+      |        "Memory Size": 401,
+      |        "Tachyon Size": 0,
+      |        "Disk Size": 501
+      |      }
+      |    ],
+      |    "Details": "details",
+      |    "Accumulables": [
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      }
+      |    ]
+      |  }
+      |}
     """
 
   private val taskStartJsonString =
     """
-      |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task 
ID":222,
-      |"Index":333,"Attempt":1,"Launch Time":444,"Executor 
ID":"executor","Host":"your kind sir",
-      |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result 
Time":0,"Finish Time":0,
-      
|"Failed":false,"Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
-      
|"Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      |{"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]}}
+      |{
+      |  "Event": "SparkListenerTaskStart",
+      |  "Stage ID": 111,
+      |  "Stage Attempt ID": 0,
+      |  "Task Info": {
+      |    "Task ID": 222,
+      |    "Index": 333,
+      |    "Attempt": 1,
+      |    "Launch Time": 444,
+      |    "Executor ID": "executor",
+      |    "Host": "your kind sir",
+      |    "Locality": "NODE_LOCAL",
+      |    "Speculative": false,
+      |    "Getting Result Time": 0,
+      |    "Finish Time": 0,
+      |    "Failed": false,
+      |    "Accumulables": [
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      },
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 3,
+      |        "Name": "Accumulable3",
+      |        "Update": "delta3",
+      |        "Value": "val3"
+      |      }
+      |    ]
+      |  }
+      |}
     """.stripMargin
 
   private val taskGettingResultJsonString =
     """
-      |{"Event":"SparkListenerTaskGettingResult","Task Info":
-      |  {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor 
ID":"executor",
-      |   "Host":"your kind 
sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
-      |   "Finish Time":0,"Failed":false,
-      |   "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
-      |   
"Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      |   {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
+      |{
+      |  "Event": "SparkListenerTaskGettingResult",
+      |  "Task Info": {
+      |    "Task ID": 1000,
+      |    "Index": 2000,
+      |    "Attempt": 5,
+      |    "Launch Time": 3000,
+      |    "Executor ID": "executor",
+      |    "Host": "your kind sir",
+      |    "Locality": "NODE_LOCAL",
+      |    "Speculative": true,
+      |    "Getting Result Time": 0,
+      |    "Finish Time": 0,
+      |    "Failed": false,
+      |    "Accumulables": [
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      },
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 3,
+      |        "Name": "Accumulable3",
+      |        "Update": "delta3",
+      |        "Value": "val3"
+      |      }
+      |    ]
       |  }
       |}
     """.stripMargin
 
   private val taskEndJsonString =
     """
-      |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task 
Type":"ShuffleMapTask",
-      |"Task End Reason":{"Reason":"Success"},
-      |"Task Info":{
-      |  "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor 
ID":"executor",
-      |  "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
-      |  "Getting Result Time":0,"Finish Time":0,"Failed":false,
-      |  "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
-      |  
"Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      |  {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
-      |},
-      |"Task Metrics":{
-      |  "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run 
Time":400,
-      |  "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
-      |  "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
-      |  "Shuffle Read Metrics":{
-      |    "Shuffle Finish Time":900,
-      |    "Remote Blocks Fetched":800,
-      |    "Local Blocks Fetched":700,
-      |    "Fetch Wait Time":900,
-      |    "Remote Bytes Read":1000
+      |{
+      |  "Event": "SparkListenerTaskEnd",
+      |  "Stage ID": 1,
+      |  "Stage Attempt ID": 0,
+      |  "Task Type": "ShuffleMapTask",
+      |  "Task End Reason": {
+      |    "Reason": "Success"
       |  },
-      |  "Shuffle Write Metrics":{
-      |    "Shuffle Bytes Written":1200,
-      |    "Shuffle Write Time":1500
+      |  "Task Info": {
+      |    "Task ID": 123,
+      |    "Index": 234,
+      |    "Attempt": 67,
+      |    "Launch Time": 345,
+      |    "Executor ID": "executor",
+      |    "Host": "your kind sir",
+      |    "Locality": "NODE_LOCAL",
+      |    "Speculative": false,
+      |    "Getting Result Time": 0,
+      |    "Finish Time": 0,
+      |    "Failed": false,
+      |    "Accumulables": [
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      },
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 3,
+      |        "Name": "Accumulable3",
+      |        "Update": "delta3",
+      |        "Value": "val3"
+      |      }
+      |    ]
       |  },
-      |  "Updated Blocks":[
-      |    {"Block ID":"rdd_0_0",
-      |      "Status":{
-      |        "Storage Level":{
-      |          "Use Disk":true,"Use Memory":true,"Use 
Tachyon":false,"Deserialized":false,
-      |          "Replication":2
-      |        },
-      |        "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+      |  "Task Metrics": {
+      |    "Host Name": "localhost",
+      |    "Executor Deserialize Time": 300,
+      |    "Executor Run Time": 400,
+      |    "Result Size": 500,
+      |    "JVM GC Time": 600,
+      |    "Result Serialization Time": 700,
+      |    "Memory Bytes Spilled": 800,
+      |    "Disk Bytes Spilled": 0,
+      |    "Shuffle Read Metrics": {
+      |      "Shuffle Finish Time": 900,
+      |      "Remote Blocks Fetched": 800,
+      |      "Local Blocks Fetched": 700,
+      |      "Fetch Wait Time": 900,
+      |      "Remote Bytes Read": 1000
+      |    },
+      |    "Shuffle Write Metrics": {
+      |      "Shuffle Bytes Written": 1200,
+      |      "Shuffle Write Time": 1500
+      |    },
+      |    "Updated Blocks": [
+      |      {
+      |        "Block ID": "rdd_0_0",
+      |        "Status": {
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": false,
+      |            "Replication": 2
+      |          },
+      |          "Memory Size": 0,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 0
+      |        }
       |      }
-      |    }
       |    ]
       |  }
       |}
@@ -643,80 +806,187 @@ class JsonProtocolSuite extends FunSuite {
 
   private val taskEndWithHadoopInputJsonString =
     """
-      |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task 
Type":"ShuffleMapTask",
-      |"Task End Reason":{"Reason":"Success"},
-      |"Task Info":{
-      |  "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor 
ID":"executor",
-      |  "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
-      |  "Getting Result Time":0,"Finish Time":0,"Failed":false,
-      |  "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
-      |  
"Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      |  {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
-      |},
-      |"Task Metrics":{
-      |  "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run 
Time":400,
-      |  "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
-      |  "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
-      |  "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write 
Time":1500},
-      |  "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100},
-      |  "Updated Blocks":[
-      |    {"Block ID":"rdd_0_0",
-      |      "Status":{
-      |        "Storage Level":{
-      |          "Use Disk":true,"Use Memory":true,"Use 
Tachyon":false,"Deserialized":false,
-      |          "Replication":2
-      |        },
-      |        "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+      |{
+      |  "Event": "SparkListenerTaskEnd",
+      |  "Stage ID": 1,
+      |  "Stage Attempt ID": 0,
+      |  "Task Type": "ShuffleMapTask",
+      |  "Task End Reason": {
+      |    "Reason": "Success"
+      |  },
+      |  "Task Info": {
+      |    "Task ID": 123,
+      |    "Index": 234,
+      |    "Attempt": 67,
+      |    "Launch Time": 345,
+      |    "Executor ID": "executor",
+      |    "Host": "your kind sir",
+      |    "Locality": "NODE_LOCAL",
+      |    "Speculative": false,
+      |    "Getting Result Time": 0,
+      |    "Finish Time": 0,
+      |    "Failed": false,
+      |    "Accumulables": [
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      },
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 3,
+      |        "Name": "Accumulable3",
+      |        "Update": "delta3",
+      |        "Value": "val3"
       |      }
-      |    }
-      |  ]}
+      |    ]
+      |  },
+      |  "Task Metrics": {
+      |    "Host Name": "localhost",
+      |    "Executor Deserialize Time": 300,
+      |    "Executor Run Time": 400,
+      |    "Result Size": 500,
+      |    "JVM GC Time": 600,
+      |    "Result Serialization Time": 700,
+      |    "Memory Bytes Spilled": 800,
+      |    "Disk Bytes Spilled": 0,
+      |    "Shuffle Write Metrics": {
+      |      "Shuffle Bytes Written": 1200,
+      |      "Shuffle Write Time": 1500
+      |    },
+      |    "Input Metrics": {
+      |      "Data Read Method": "Hadoop",
+      |      "Bytes Read": 2100
+      |    },
+      |    "Updated Blocks": [
+      |      {
+      |        "Block ID": "rdd_0_0",
+      |        "Status": {
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": false,
+      |            "Replication": 2
+      |          },
+      |          "Memory Size": 0,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 0
+      |        }
+      |      }
+      |    ]
+      |  }
       |}
     """
 
   private val jobStartJsonString =
     """
-      {"Event":"SparkListenerJobStart","Job ID":10,"Stage 
IDs":[1,2,3,4],"Properties":
-      {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
+      |{
+      |  "Event": "SparkListenerJobStart",
+      |  "Job ID": 10,
+      |  "Stage IDs": [
+      |    1,
+      |    2,
+      |    3,
+      |    4
+      |  ],
+      |  "Properties": {
+      |    "France": "Paris",
+      |    "Germany": "Berlin",
+      |    "Russia": "Moscow",
+      |    "Ukraine": "Kiev"
+      |  }
+      |}
     """
 
   private val jobEndJsonString =
     """
-      {"Event":"SparkListenerJobEnd","Job ID":20,"Job 
Result":{"Result":"JobSucceeded"}}
+      |{
+      |  "Event": "SparkListenerJobEnd",
+      |  "Job ID": 20,
+      |  "Job Result": {
+      |    "Result": "JobSucceeded"
+      |  }
+      |}
     """
 
   private val environmentUpdateJsonString =
     """
-      {"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"GC 
speed":"9999 objects/s",
-      "Java home":"Land of coffee"},"Spark Properties":{"Job 
throughput":"80000 jobs/s,
-      regardless of job type"},"System 
Properties":{"Username":"guest","Password":"guest"},
-      "Classpath Entries":{"Super library":"/tmp/super_library"}}
+      |{
+      |  "Event": "SparkListenerEnvironmentUpdate",
+      |  "JVM Information": {
+      |    "GC speed": "9999 objects/s",
+      |    "Java home": "Land of coffee"
+      |  },
+      |  "Spark Properties": {
+      |    "Job throughput": "80000 jobs/s, regardless of job type"
+      |  },
+      |  "System Properties": {
+      |    "Username": "guest",
+      |    "Password": "guest"
+      |  },
+      |  "Classpath Entries": {
+      |    "Super library": "/tmp/super_library"
+      |  }
+      |}
     """
 
   private val blockManagerAddedJsonString =
     """
-      {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor 
ID":"Stars",
-      "Host":"In your multitude...","Port":300,"Netty Port":400},"Maximum 
Memory":500}
+      |{
+      |  "Event": "SparkListenerBlockManagerAdded",
+      |  "Block Manager ID": {
+      |    "Executor ID": "Stars",
+      |    "Host": "In your multitude...",
+      |    "Port": 300,
+      |    "Netty Port": 400
+      |  },
+      |  "Maximum Memory": 500
+      |}
     """
 
   private val blockManagerRemovedJsonString =
     """
-      {"Event":"SparkListenerBlockManagerRemoved","Block Manager 
ID":{"Executor ID":"Scarce",
-      "Host":"to be counted...","Port":100,"Netty Port":200}}
+      |{
+      |  "Event": "SparkListenerBlockManagerRemoved",
+      |  "Block Manager ID": {
+      |    "Executor ID": "Scarce",
+      |    "Host": "to be counted...",
+      |    "Port": 100,
+      |    "Netty Port": 200
+      |  }
+      |}
     """
 
   private val unpersistRDDJsonString =
     """
-      {"Event":"SparkListenerUnpersistRDD","RDD ID":12345}
+      |{
+      |  "Event": "SparkListenerUnpersistRDD",
+      |  "RDD ID": 12345
+      |}
     """
 
   private val applicationStartJsonString =
     """
-      {"Event":"SparkListenerApplicationStart","App Name":"The winner of 
all","Timestamp":42,
-      "User":"Garfield"}
+      |{
+      |  "Event": "SparkListenerApplicationStart",
+      |  "App Name": "The winner of all",
+      |  "Timestamp": 42,
+      |  "User": "Garfield"
+      |}
     """
 
   private val applicationEndJsonString =
     """
-      {"Event":"SparkListenerApplicationEnd","Timestamp":42}
+      |{
+      |  "Event": "SparkListenerApplicationEnd",
+      |  "Timestamp": 42
+      |}
     """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to