Repository: spark
Updated Branches:
  refs/heads/master e25b59344 -> fac6085cd


[SPARK-1397] Notify SparkListeners when stages fail or are cancelled.

[I wanted to post this for folks to comment but it depends on (and thus 
includes the changes in) a currently outstanding PR, #305.  You can look at 
just the second commit: 
https://github.com/kayousterhout/spark-1/commit/93f08baf731b9eaf5c9792a5373560526e2bccac
 to see just the changes relevant to this PR]

Previously, when stages fail or get cancelled, the SparkListener is only 
notified
indirectly through the SparkListenerJobEnd, where we sometimes pass in a single
stage that failed.  This worked before job cancellation, because jobs would 
only fail
due to a single stage failure.  However, with job cancellation, multiple 
running stages
can fail when a job gets cancelled.  Right now, this is not handled correctly, 
which
results in stages that get stuck in the “Running Stages” window in the UI 
even
though they’re dead.

This PR changes the SparkListenerStageCompleted event to a 
SparkListenerStageEnded
event, and uses this event to tell SparkListeners when stages fail in addition 
to when
they complete successfully.  This change is NOT publicly backward compatible 
for two
reasons.  First, it changes the SparkListener interface.  We could alternately 
add a new event,
SparkListenerStageFailed, and keep the existing SparkListenerStageCompleted.  
However,
this is less consistent with the listener events for tasks / jobs ending, and 
will result in some
code duplication for listeners (because failed and completed stages are handled 
in similar
ways).  Note that I haven’t finished updating the JSON code to correctly 
handle the new event
because I’m waiting for feedback on whether this is a good or bad idea (hence 
the “WIP”).

It is also not backwards compatible because it changes the publicly visible 
JobWaiter.jobFailed()
method to no longer include a stage that caused the failure.  I think this 
change should definitely
stay, because with cancellation (as described above), a failure isn’t 
necessarily caused by a
single stage.

Author: Kay Ousterhout <kayousterh...@gmail.com>

Closes #309 from kayousterhout/stage_cancellation and squashes the following 
commits:

5533ecd [Kay Ousterhout] Fixes in response to Mark's review
320c7c7 [Kay Ousterhout] Notify SparkListeners when stages fail or are 
cancelled.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fac6085c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fac6085c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fac6085c

Branch: refs/heads/master
Commit: fac6085cd774a4dba73ad1618537ef1817b2bcf3
Parents: e25b593
Author: Kay Ousterhout <kayousterh...@gmail.com>
Authored: Tue Apr 8 14:42:02 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Tue Apr 8 14:42:02 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/FutureAction.scala   |   2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 121 ++++++++++++-------
 .../org/apache/spark/scheduler/JobLogger.scala  |   8 +-
 .../org/apache/spark/scheduler/JobResult.scala  |   3 +-
 .../org/apache/spark/scheduler/JobWaiter.scala  |   2 +-
 .../apache/spark/scheduler/SparkListener.scala  |   2 +-
 .../org/apache/spark/scheduler/StageInfo.scala  |   9 ++
 .../spark/ui/jobs/JobProgressListener.scala     |  23 ++--
 .../org/apache/spark/util/JsonProtocol.scala    |  11 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |  45 ++++++-
 .../apache/spark/util/JsonProtocolSuite.scala   |   3 +-
 11 files changed, 151 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fac6085c/core/src/main/scala/org/apache/spark/FutureAction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala 
b/core/src/main/scala/org/apache/spark/FutureAction.scala
index f2decd1..2eec09c 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -141,7 +141,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
   private def awaitResult(): Try[T] = {
     jobWaiter.awaitResult() match {
       case JobSucceeded => scala.util.Success(resultFunc)
-      case JobFailed(e: Exception, _) => scala.util.Failure(e)
+      case JobFailed(e: Exception) => scala.util.Failure(e)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fac6085c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c96d743..c41d6d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -342,22 +342,24 @@ class DAGScheduler(
   }
 
   /**
-   * Removes job and any stages that are not needed by any other job.  Returns 
the set of ids for
-   * stages that were removed.  The associated tasks for those stages need to 
be cancelled if we
-   * got here via job cancellation.
+   * Removes state for job and any stages that are not needed by any other 
job.  Does not
+   * handle cancelling tasks or notifying the SparkListener about finished 
jobs/stages/tasks.
+   *
+   * @param job The job whose state to cleanup.
+   * @param resultStage Specifies the result stage for the job; if set to 
None, this method
+   *                    searches resultStagesToJob to find and cleanup the 
appropriate result stage.
    */
-  private def removeJobAndIndependentStages(jobId: Int): Set[Int] = {
-    val registeredStages = jobIdToStageIds(jobId)
-    val independentStages = new HashSet[Int]()
-    if (registeredStages.isEmpty) {
-      logError("No stages registered for job " + jobId)
+  private def cleanupStateForJobAndIndependentStages(job: ActiveJob, 
resultStage: Option[Stage]) {
+    val registeredStages = jobIdToStageIds.get(job.jobId)
+    if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
+      logError("No stages registered for job " + job.jobId)
     } else {
-      stageIdToJobIds.filterKeys(stageId => 
registeredStages.contains(stageId)).foreach {
+      stageIdToJobIds.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {
         case (stageId, jobSet) =>
-          if (!jobSet.contains(jobId)) {
+          if (!jobSet.contains(job.jobId)) {
             logError(
               "Job %d not registered for stage %d even though that stage was 
registered for the job"
-              .format(jobId, stageId))
+              .format(job.jobId, stageId))
           } else {
             def removeStage(stageId: Int) {
               // data structures based on Stage
@@ -394,23 +396,28 @@ class DAGScheduler(
                 .format(stageId, stageIdToStage.size))
             }
 
-            jobSet -= jobId
+            jobSet -= job.jobId
             if (jobSet.isEmpty) { // no other job needs this stage
-              independentStages += stageId
               removeStage(stageId)
             }
           }
       }
     }
-    independentStages.toSet
-  }
+    jobIdToStageIds -= job.jobId
+    jobIdToActiveJob -= job.jobId
+    activeJobs -= job
 
-  private def jobIdToStageIdsRemove(jobId: Int) {
-    if (!jobIdToStageIds.contains(jobId)) {
-      logDebug("Trying to remove unregistered job " + jobId)
+    if (resultStage.isEmpty) {
+      // Clean up result stages.
+      val resultStagesForJob = resultStageToJob.keySet.filter(
+        stage => resultStageToJob(stage).jobId == job.jobId)
+      if (resultStagesForJob.size != 1) {
+        logWarning(
+          s"${resultStagesForJob.size} result stages for job ${job.jobId} 
(expect exactly 1)")
+      }
+      resultStageToJob --= resultStagesForJob
     } else {
-      removeJobAndIndependentStages(jobId)
-      jobIdToStageIds -= jobId
+      resultStageToJob -= resultStage.get
     }
   }
 
@@ -460,7 +467,7 @@ class DAGScheduler(
     val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, 
resultHandler, properties)
     waiter.awaitResult() match {
       case JobSucceeded => {}
-      case JobFailed(exception: Exception, _) =>
+      case JobFailed(exception: Exception) =>
         logInfo("Failed to run " + callSite)
         throw exception
     }
@@ -606,7 +613,16 @@ class DAGScheduler(
         for (job <- activeJobs) {
           val error = new SparkException("Job cancelled because SparkContext 
was shut down")
           job.listener.jobFailed(error)
-          listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, 
-1)))
+          // Tell the listeners that all of the running stages have ended.  
Don't bother
+          // cancelling the stages because if the DAG scheduler is stopped, 
the entire application
+          // is in the process of getting stopped.
+          val stageFailedMessage = "Stage cancelled because SparkContext was 
shut down"
+          runningStages.foreach { stage =>
+            val info = stageToInfos(stage)
+            info.stageFailed(stageFailedMessage)
+            listenerBus.post(SparkListenerStageCompleted(info))
+          }
+          listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
         }
         return true
     }
@@ -676,7 +692,7 @@ class DAGScheduler(
       }
     } catch {
       case e: Exception =>
-        jobResult = JobFailed(e, job.finalStage.id)
+        jobResult = JobFailed(e)
         job.listener.jobFailed(e)
     } finally {
       val s = job.finalStage
@@ -826,11 +842,8 @@ class DAGScheduler(
                   job.numFinished += 1
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
-                    jobIdToActiveJob -= stage.jobId
-                    activeJobs -= job
-                    resultStageToJob -= stage
                     markStageAsFinished(stage)
-                    jobIdToStageIdsRemove(job.jobId)
+                    cleanupStateForJobAndIndependentStages(job, Some(stage))
                     listenerBus.post(SparkListenerJobEnd(job.jobId, 
JobSucceeded))
                   }
                   job.listener.taskSucceeded(rt.outputId, event.result)
@@ -982,7 +995,7 @@ class DAGScheduler(
     if (!jobIdToStageIds.contains(jobId)) {
       logDebug("Trying to cancel unregistered job " + jobId)
     } else {
-      failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId 
cancelled")
+      failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId 
cancelled", None)
     }
   }
 
@@ -999,7 +1012,8 @@ class DAGScheduler(
     stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis())
     for (resultStage <- dependentStages) {
       val job = resultStageToJob(resultStage)
-      failJobAndIndependentStages(job, s"Job aborted due to stage failure: 
$reason")
+      failJobAndIndependentStages(job, s"Job aborted due to stage failure: 
$reason",
+        Some(resultStage))
     }
     if (dependentStages.isEmpty) {
       logInfo("Ignoring failure of " + failedStage + " because all jobs 
depending on it are done")
@@ -1008,28 +1022,45 @@ class DAGScheduler(
 
   /**
    * Fails a job and all stages that are only used by that job, and cleans up 
relevant state.
+   *
+   * @param resultStage The result stage for the job, if known. Used to 
cleanup state for the job
+   *                    slightly more efficiently than when not specified.
    */
-  private def failJobAndIndependentStages(job: ActiveJob, failureReason: 
String) {
+  private def failJobAndIndependentStages(job: ActiveJob, failureReason: 
String,
+      resultStage: Option[Stage]) {
     val error = new SparkException(failureReason)
     job.listener.jobFailed(error)
 
-    // Cancel all tasks in independent stages.
-    val independentStages = removeJobAndIndependentStages(job.jobId)
-    independentStages.foreach(taskScheduler.cancelTasks)
-
-    // Clean up remaining state we store for the job.
-    jobIdToActiveJob -= job.jobId
-    activeJobs -= job
-    jobIdToStageIds -= job.jobId
-    val resultStagesForJob = resultStageToJob.keySet.filter(
-      stage => resultStageToJob(stage).jobId == job.jobId)
-    if (resultStagesForJob.size != 1) {
-      logWarning(
-        s"${resultStagesForJob.size} result stages for job ${job.jobId} 
(expect exactly 1)")
+    // Cancel all independent, running stages.
+    val stages = jobIdToStageIds(job.jobId)
+    if (stages.isEmpty) {
+      logError("No stages registered for job " + job.jobId)
     }
-    resultStageToJob --= resultStagesForJob
+    stages.foreach { stageId =>
+      val jobsForStage = stageIdToJobIds.get(stageId)
+      if (jobsForStage.isEmpty || !jobsForStage.get.contains(job.jobId)) {
+        logError(
+          "Job %d not registered for stage %d even though that stage was 
registered for the job"
+            .format(job.jobId, stageId))
+      } else if (jobsForStage.get.size == 1) {
+        if (!stageIdToStage.contains(stageId)) {
+          logError("Missing Stage for stage with id $stageId")
+        } else {
+          // This is the only job that uses this stage, so fail the stage if 
it is running.
+          val stage = stageIdToStage(stageId)
+          if (runningStages.contains(stage)) {
+            taskScheduler.cancelTasks(stageId)
+            val stageInfo = stageToInfos(stage)
+            stageInfo.stageFailed(failureReason)
+            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+          }
+        }
+      }
+    }
+
+    cleanupStateForJobAndIndependentStages(job, resultStage)
 
-    listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, 
job.finalStage.id)))
+    listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/fac6085c/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala 
b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 5cecf94..7c50539 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -191,7 +191,11 @@ class JobLogger(val user: String, val logDirName: String) 
extends SparkListener
    */
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
     val stageId = stageCompleted.stageInfo.stageId
-    stageLogInfo(stageId, "STAGE_ID=%d STATUS=COMPLETED".format(stageId))
+    if (stageCompleted.stageInfo.failureReason.isEmpty) {
+      stageLogInfo(stageId, s"STAGE_ID=$stageId STATUS=COMPLETED")
+    } else {
+      stageLogInfo(stageId, s"STAGE_ID=$stageId STATUS=FAILED")
+    }
   }
 
   /**
@@ -227,7 +231,7 @@ class JobLogger(val user: String, val logDirName: String) 
extends SparkListener
     var info = "JOB_ID=" + jobId
     jobEnd.jobResult match {
       case JobSucceeded => info += " STATUS=SUCCESS"
-      case JobFailed(exception, _) =>
+      case JobFailed(exception) =>
         info += " STATUS=FAILED REASON="
         exception.getMessage.split("\\s+").foreach(info += _ + "_")
       case _ =>

http://git-wip-us.apache.org/repos/asf/spark/blob/fac6085c/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala 
b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
index 3cf4e30..047bd27 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
@@ -24,5 +24,4 @@ private[spark] sealed trait JobResult
 
 private[spark] case object JobSucceeded extends JobResult
 
-// A failed stage ID of -1 means there is not a particular stage that caused 
the failure
-private[spark] case class JobFailed(exception: Exception, failedStageId: Int) 
extends JobResult
+private[spark] case class JobFailed(exception: Exception) extends JobResult

http://git-wip-us.apache.org/repos/asf/spark/blob/fac6085c/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala 
b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index 8007b54..e9bfee2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -64,7 +64,7 @@ private[spark] class JobWaiter[T](
 
   override def jobFailed(exception: Exception): Unit = synchronized {
     _jobFinished = true
-    jobResult = JobFailed(exception, -1)
+    jobResult = JobFailed(exception)
     this.notifyAll()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fac6085c/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index d4eb0ac..d42e677 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -71,7 +71,7 @@ private[spark] case object SparkListenerShutdown extends 
SparkListenerEvent
  */
 trait SparkListener {
   /**
-   * Called when a stage is completed, with information on the completed stage
+   * Called when a stage completes successfully or fails, with information on 
the completed stage.
    */
   def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fac6085c/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 8115a7e..eec409b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -26,8 +26,17 @@ private[spark]
 class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val 
rddInfo: RDDInfo) {
   /** When this stage was submitted from the DAGScheduler to a TaskScheduler. 
*/
   var submissionTime: Option[Long] = None
+  /** Time when all tasks in the stage completed or when the stage was 
cancelled. */
   var completionTime: Option[Long] = None
+  /** If the stage failed, the reason why. */
+  var failureReason: Option[String] = None
+
   var emittedTaskSizeWarning = false
+
+  def stageFailed(reason: String) {
+    failureReason = Some(reason)
+    completionTime = Some(System.currentTimeMillis)
+  }
 }
 
 private[spark]

http://git-wip-us.apache.org/repos/asf/spark/blob/fac6085c/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 048f671..5167e20 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -74,8 +74,13 @@ private[ui] class JobProgressListener(conf: SparkConf) 
extends SparkListener {
     // Remove by stageId, rather than by StageInfo, in case the StageInfo is 
from storage
     poolToActiveStages(stageIdToPool(stageId)).remove(stageId)
     activeStages.remove(stageId)
-    completedStages += stage
-    trimIfNecessary(completedStages)
+    if (stage.failureReason.isEmpty) {
+      completedStages += stage
+      trimIfNecessary(completedStages)
+    } else {
+      failedStages += stage
+      trimIfNecessary(failedStages)
+    }
   }
 
   /** If stages is too large, remove and garbage collect old stages */
@@ -215,20 +220,6 @@ private[ui] class JobProgressListener(conf: SparkConf) 
extends SparkListener {
     }
   }
 
-  override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
-    jobEnd.jobResult match {
-      case JobFailed(_, stageId) =>
-        activeStages.get(stageId).foreach { s =>
-          // Remove by stageId, rather than by StageInfo, in case the 
StageInfo is from storage
-          activeStages.remove(s.stageId)
-          poolToActiveStages(stageIdToPool(stageId)).remove(s.stageId)
-          failedStages += s
-          trimIfNecessary(failedStages)
-        }
-      case _ =>
-    }
-  }
-
   override def onEnvironmentUpdate(environmentUpdate: 
SparkListenerEnvironmentUpdate) {
     synchronized {
       val schedulingModeName =

http://git-wip-us.apache.org/repos/asf/spark/blob/fac6085c/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 2155a88..1965489 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -166,12 +166,14 @@ private[spark] object JsonProtocol {
     val rddInfo = rddInfoToJson(stageInfo.rddInfo)
     val submissionTime = 
stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
     val completionTime = 
stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
+    val failureReason = 
stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
     ("Stage ID" -> stageInfo.stageId) ~
     ("Stage Name" -> stageInfo.name) ~
     ("Number of Tasks" -> stageInfo.numTasks) ~
     ("RDD Info" -> rddInfo) ~
     ("Submission Time" -> submissionTime) ~
     ("Completion Time" -> completionTime) ~
+    ("Failure Reason" -> failureReason) ~
     ("Emitted Task Size Warning" -> stageInfo.emittedTaskSizeWarning)
   }
 
@@ -259,9 +261,7 @@ private[spark] object JsonProtocol {
     val json = jobResult match {
       case JobSucceeded => Utils.emptyJson
       case jobFailed: JobFailed =>
-        val exception = exceptionToJson(jobFailed.exception)
-        ("Exception" -> exception) ~
-        ("Failed Stage ID" -> jobFailed.failedStageId)
+        JObject("Exception" -> exceptionToJson(jobFailed.exception))
     }
     ("Result" -> result) ~ json
   }
@@ -442,11 +442,13 @@ private[spark] object JsonProtocol {
     val rddInfo = rddInfoFromJson(json \ "RDD Info")
     val submissionTime = Utils.jsonOption(json \ "Submission 
Time").map(_.extract[Long])
     val completionTime = Utils.jsonOption(json \ "Completion 
Time").map(_.extract[Long])
+    val failureReason = Utils.jsonOption(json \ "Failure 
Reason").map(_.extract[String])
     val emittedTaskSizeWarning = (json \ "Emitted Task Size 
Warning").extract[Boolean]
 
     val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfo)
     stageInfo.submissionTime = submissionTime
     stageInfo.completionTime = completionTime
+    stageInfo.failureReason = failureReason
     stageInfo.emittedTaskSizeWarning = emittedTaskSizeWarning
     stageInfo
   }
@@ -561,8 +563,7 @@ private[spark] object JsonProtocol {
       case `jobSucceeded` => JobSucceeded
       case `jobFailed` =>
         val exception = exceptionFromJson(json \ "Exception")
-        val failedStageId = (json \ "Failed Stage ID").extract[Int]
-        new JobFailed(exception, failedStageId)
+        new JobFailed(exception)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fac6085c/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2e3026b..a74724d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -64,6 +64,21 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter 
with LocalSparkCont
     override def defaultParallelism() = 2
   }
 
+  /** Length of time to wait while draining listener events. */
+  val WAIT_TIMEOUT_MILLIS = 10000
+  val sparkListener = new SparkListener() {
+    val successfulStages = new HashSet[Int]()
+    val failedStages = new HashSet[Int]()
+    override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) 
{
+      val stageInfo = stageCompleted.stageInfo
+      if (stageInfo.failureReason.isEmpty) {
+        successfulStages += stageInfo.stageId
+      } else {
+        failedStages += stageInfo.stageId
+      }
+    }
+  }
+
   var mapOutputTracker: MapOutputTrackerMaster = null
   var scheduler: DAGScheduler = null
 
@@ -89,13 +104,16 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
   /** The list of results that DAGScheduler has collected. */
   val results = new HashMap[Int, Any]()
   var failure: Exception = _
-  val listener = new JobListener() {
+  val jobListener = new JobListener() {
     override def taskSucceeded(index: Int, result: Any) = results.put(index, 
result)
     override def jobFailed(exception: Exception) = { failure = exception }
   }
 
   before {
     sc = new SparkContext("local", "DAGSchedulerSuite")
+    sparkListener.successfulStages.clear()
+    sparkListener.failedStages.clear()
+    sc.addSparkListener(sparkListener)
     taskSets.clear()
     cancelledStages.clear()
     cacheLocations.clear()
@@ -187,7 +205,7 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
       partitions: Array[Int],
       func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
       allowLocal: Boolean = false,
-      listener: JobListener = listener): Int = {
+      listener: JobListener = jobListener): Int = {
     val jobId = scheduler.nextJobId.getAndIncrement()
     runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, 
listener))
     return jobId
@@ -231,7 +249,7 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
       override def toString = "DAGSchedulerSuite Local RDD"
     }
     val jobId = scheduler.nextJobId.getAndIncrement()
-    runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, 
listener))
+    runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, 
jobListener))
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty
   }
@@ -262,6 +280,9 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
     submit(makeRdd(1, Nil), Array(0))
     failed(taskSets(0), "some failure")
     assert(failure.getMessage === "Job aborted due to stage failure: some 
failure")
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sparkListener.failedStages.contains(0))
+    assert(sparkListener.failedStages.size === 1)
     assertDataStructuresEmpty
   }
 
@@ -270,6 +291,9 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
     val jobId = submit(rdd, Array(0))
     cancel(jobId)
     assert(failure.getMessage === s"Job $jobId cancelled")
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sparkListener.failedStages.contains(0))
+    assert(sparkListener.failedStages.size === 1)
     assertDataStructuresEmpty
   }
 
@@ -354,6 +378,13 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
     val stageFailureMessage = "Exception failure in map stage"
     failed(taskSets(0), stageFailureMessage)
     assert(failure.getMessage === s"Job aborted due to stage failure: 
$stageFailureMessage")
+
+    // Listener bus should get told about the map stage failing, but not the 
reduce stage
+    // (since the reduce stage hasn't been started yet).
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sparkListener.failedStages.contains(1))
+    assert(sparkListener.failedStages.size === 1)
+
     assertDataStructuresEmpty
   }
 
@@ -400,6 +431,14 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
     failed(taskSets(0), stageFailureMessage)
 
     assert(cancelledStages.contains(1))
+
+    // Make sure the listeners got told about both failed stages.
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sparkListener.successfulStages.isEmpty)
+    assert(sparkListener.failedStages.contains(1))
+    assert(sparkListener.failedStages.contains(3))
+    assert(sparkListener.failedStages.size === 2)
+
     assert(listener1.failureMessage === s"Job aborted due to stage failure: 
$stageFailureMessage")
     assert(listener2.failureMessage === s"Job aborted due to stage failure: 
$stageFailureMessage")
     assertDataStructuresEmpty

http://git-wip-us.apache.org/repos/asf/spark/blob/fac6085c/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 7bab7da..0342a8a 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite {
     // JobResult
     val exception = new Exception("Out of Memory! Please restock film.")
     exception.setStackTrace(stackTrace)
-    val jobFailed = JobFailed(exception, 2)
+    val jobFailed = JobFailed(exception)
     testJobResult(JobSucceeded)
     testJobResult(jobFailed)
 
@@ -294,7 +294,6 @@ class JsonProtocolSuite extends FunSuite {
     (result1, result2) match {
       case (JobSucceeded, JobSucceeded) =>
       case (r1: JobFailed, r2: JobFailed) =>
-        assert(r1.failedStageId === r2.failedStageId)
         assertEquals(r1.exception, r2.exception)
       case _ => fail("Job results don't match in types!")
     }

Reply via email to