Local jobs post SparkListenerJobEnd, and DAGScheduler data structure
cleanup always occurs before any posting of SparkListenerJobEnd.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c9fcd909
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c9fcd909
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c9fcd909

Branch: refs/heads/master
Commit: c9fcd909d0f86b08935a132409888b30e989bca4
Parents: 9ae2d09
Author: Mark Hamstra <[email protected]>
Authored: Sun Nov 24 17:49:14 2013 -0800
Committer: Mark Hamstra <[email protected]>
Committed: Tue Dec 3 09:57:32 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 17 ++++++++++-------
 .../apache/spark/scheduler/DAGSchedulerEvent.scala |  2 +-
 2 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c9fcd909/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 01c5133..b371a24 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -597,12 +597,13 @@ class DAGScheduler(
         listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, 
taskMetrics))
         handleTaskCompletion(completion)
 
-      case LocalJobCompleted(stage) =>
-        val jobId = stageIdToJobIds(stage.id).head
+      case LocalJobCompleted(job, result) =>
+        val stage = job.finalStage
         stageIdToJobIds -= stage.id    // clean up data structures that were 
populated for a local job,
         stageIdToStage -= stage.id     // but that won't get cleaned up via 
the normal paths through
         stageToInfos -= stage          // completion events or stage abort
-        jobIdToStageIds -= jobId
+        jobIdToStageIds -= job.jobId
+        listenerBus.post(SparkListenerJobEnd(job, result))
 
       case TaskSetFailed(taskSet, reason) =>
         stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
@@ -672,6 +673,7 @@ class DAGScheduler(
 
   // Broken out for easier testing in DAGSchedulerSuite.
   protected def runLocallyWithinThread(job: ActiveJob) {
+    var jobResult: JobResult = JobSucceeded
     try {
       SparkEnv.set(env)
       val rdd = job.finalStage.rdd
@@ -686,9 +688,10 @@ class DAGScheduler(
       }
     } catch {
       case e: Exception =>
+        jobResult = JobFailed(e, Some(job.finalStage))
         job.listener.jobFailed(e)
     } finally {
-      eventProcessActor ! LocalJobCompleted(job.finalStage)
+      eventProcessActor ! LocalJobCompleted(job, jobResult)
     }
   }
 
@@ -835,8 +838,8 @@ class DAGScheduler(
                     activeJobs -= job
                     resultStageToJob -= stage
                     markStageAsFinished(stage)
-                    listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))
                     jobIdToStageIdsRemove(job.jobId)
+                    listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))
                   }
                   job.listener.taskSucceeded(rt.outputId, event.result)
                 }
@@ -987,10 +990,10 @@ class DAGScheduler(
       val error = new SparkException("Job %d cancelled".format(jobId))
       val job = idToActiveJob(jobId)
       job.listener.jobFailed(error)
-      listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, 
Some(job.finalStage))))
       jobIdToStageIds -= jobId
       activeJobs -= job
       idToActiveJob -= jobId
+      listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, 
Some(job.finalStage))))
     }
   }
 
@@ -1009,11 +1012,11 @@ class DAGScheduler(
       val job = resultStageToJob(resultStage)
       val error = new SparkException("Job aborted: " + reason)
       job.listener.jobFailed(error)
-      listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, 
Some(failedStage))))
       jobIdToStageIdsRemove(job.jobId)
       idToActiveJob -= resultStage.jobId
       activeJobs -= job
       resultStageToJob -= resultStage
+      listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, 
Some(failedStage))))
     }
     if (dependentStages.isEmpty) {
       logInfo("Ignoring failure of " + failedStage + " because all jobs 
depending on it are done")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c9fcd909/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index bf8dfb5..aa496b7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -65,7 +65,7 @@ private[scheduler] case class CompletionEvent(
     taskMetrics: TaskMetrics)
   extends DAGSchedulerEvent
 
-private[scheduler] case class LocalJobCompleted(stage: Stage) extends 
DAGSchedulerEvent
+private[scheduler] case class LocalJobCompleted(job: ActiveJob, result: 
JobResult) extends DAGSchedulerEvent
 
 private[scheduler] case class ExecutorGained(execId: String, host: String) 
extends DAGSchedulerEvent
 

Reply via email to