Synchronous, inline cleanup after runLocally
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f55d0b93 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f55d0b93 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f55d0b93 Branch: refs/heads/master Commit: f55d0b935d7c148f49b15932938e91150b64466f Parents: c9fcd90 Author: Mark Hamstra <[email protected]> Authored: Tue Nov 26 14:06:59 2013 -0800 Committer: Mark Hamstra <[email protected]> Committed: Tue Dec 3 09:57:32 2013 -0800 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/DAGScheduler.scala | 15 ++++++--------- .../apache/spark/scheduler/DAGSchedulerEvent.scala | 2 -- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 2 -- 3 files changed, 6 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f55d0b93/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b371a24..b849867 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -597,14 +597,6 @@ class DAGScheduler( listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics)) handleTaskCompletion(completion) - case LocalJobCompleted(job, result) => - val stage = job.finalStage - stageIdToJobIds -= stage.id // clean up data structures that were populated for a local job, - stageIdToStage -= stage.id // but that won't get cleaned up via the normal paths through - stageToInfos -= stage // completion events or stage abort - jobIdToStageIds -= job.jobId - listenerBus.post(SparkListenerJobEnd(job, result)) - case TaskSetFailed(taskSet, reason) => stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) } @@ -691,7 +683,12 @@ class DAGScheduler( jobResult = JobFailed(e, Some(job.finalStage)) job.listener.jobFailed(e) } finally { - eventProcessActor ! LocalJobCompleted(job, jobResult) + val s = job.finalStage + stageIdToJobIds -= s.id // clean up data structures that were populated for a local job, + stageIdToStage -= s.id // but that won't get cleaned up via the normal paths through + stageToInfos -= s // completion events or stage abort + jobIdToStageIds -= job.jobId + listenerBus.post(SparkListenerJobEnd(job, jobResult)) } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f55d0b93/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index aa496b7..add1187 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -65,8 +65,6 @@ private[scheduler] case class CompletionEvent( taskMetrics: TaskMetrics) extends DAGSchedulerEvent -private[scheduler] case class LocalJobCompleted(job: ActiveJob, result: JobResult) extends DAGSchedulerEvent - private[scheduler] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f55d0b93/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8ce8c68..706d84a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -219,8 +219,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont } val jobId = scheduler.nextJobId.getAndIncrement() runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, listener)) - assert(scheduler.stageToInfos.size === 1) - runEvent(LocalJobCompleted(scheduler.stageToInfos.keys.head)) assert(results === Map(0 -> 42)) assertDataStructuresEmpty }
