Local jobs post SparkListenerJobEnd, and DAGScheduler data structure cleanup always occurs before any posting of SparkListenerJobEnd.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c9fcd909 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c9fcd909 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c9fcd909 Branch: refs/heads/master Commit: c9fcd909d0f86b08935a132409888b30e989bca4 Parents: 9ae2d09 Author: Mark Hamstra <[email protected]> Authored: Sun Nov 24 17:49:14 2013 -0800 Committer: Mark Hamstra <[email protected]> Committed: Tue Dec 3 09:57:32 2013 -0800 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/DAGScheduler.scala | 17 ++++++++++------- .../apache/spark/scheduler/DAGSchedulerEvent.scala | 2 +- 2 files changed, 11 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c9fcd909/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 01c5133..b371a24 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -597,12 +597,13 @@ class DAGScheduler( listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics)) handleTaskCompletion(completion) - case LocalJobCompleted(stage) => - val jobId = stageIdToJobIds(stage.id).head + case LocalJobCompleted(job, result) => + val stage = job.finalStage stageIdToJobIds -= stage.id // clean up data structures that were populated for a local job, stageIdToStage -= stage.id // but that won't get cleaned up via the normal paths through stageToInfos -= stage // completion events or stage abort - jobIdToStageIds -= jobId + jobIdToStageIds -= job.jobId + listenerBus.post(SparkListenerJobEnd(job, result)) case TaskSetFailed(taskSet, reason) => stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) } @@ -672,6 +673,7 @@ class DAGScheduler( // Broken out for easier testing in DAGSchedulerSuite. protected def runLocallyWithinThread(job: ActiveJob) { + var jobResult: JobResult = JobSucceeded try { SparkEnv.set(env) val rdd = job.finalStage.rdd @@ -686,9 +688,10 @@ class DAGScheduler( } } catch { case e: Exception => + jobResult = JobFailed(e, Some(job.finalStage)) job.listener.jobFailed(e) } finally { - eventProcessActor ! LocalJobCompleted(job.finalStage) + eventProcessActor ! LocalJobCompleted(job, jobResult) } } @@ -835,8 +838,8 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - listenerBus.post(SparkListenerJobEnd(job, JobSucceeded)) jobIdToStageIdsRemove(job.jobId) + listenerBus.post(SparkListenerJobEnd(job, JobSucceeded)) } job.listener.taskSucceeded(rt.outputId, event.result) } @@ -987,10 +990,10 @@ class DAGScheduler( val error = new SparkException("Job %d cancelled".format(jobId)) val job = idToActiveJob(jobId) job.listener.jobFailed(error) - listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(job.finalStage)))) jobIdToStageIds -= jobId activeJobs -= job idToActiveJob -= jobId + listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(job.finalStage)))) } } @@ -1009,11 +1012,11 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job aborted: " + reason) job.listener.jobFailed(error) - listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))) jobIdToStageIdsRemove(job.jobId) idToActiveJob -= resultStage.jobId activeJobs -= job resultStageToJob -= resultStage + listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))) } if (dependentStages.isEmpty) { logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c9fcd909/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index bf8dfb5..aa496b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -65,7 +65,7 @@ private[scheduler] case class CompletionEvent( taskMetrics: TaskMetrics) extends DAGSchedulerEvent -private[scheduler] case class LocalJobCompleted(stage: Stage) extends DAGSchedulerEvent +private[scheduler] case class LocalJobCompleted(job: ActiveJob, result: JobResult) extends DAGSchedulerEvent private[scheduler] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
