Tightly couple stageIdToJobIds and jobIdToStageIds
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/9ae2d094 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/9ae2d094 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/9ae2d094 Branch: refs/heads/master Commit: 9ae2d094a967782e3f5a624dd854059a40430ee6 Parents: 27c45e5 Author: Mark Hamstra <[email protected]> Authored: Fri Nov 22 13:14:26 2013 -0800 Committer: Mark Hamstra <[email protected]> Committed: Tue Dec 3 09:57:32 2013 -0800 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 29 ++++++++------------ 1 file changed, 12 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9ae2d094/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index aeac14a..01c5133 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -259,7 +259,7 @@ class DAGScheduler( val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) stageIdToStage(id) = stage - registerJobIdWithStages(jobId, stage) + updateJobIdStageIdMaps(jobId, stage) stageToInfos(stage) = new StageInfo(stage) stage } @@ -348,30 +348,24 @@ class DAGScheduler( * Registers the given jobId among the jobs that need the given stage and * all of that stage's ancestors. */ - private def registerJobIdWithStages(jobId: Int, stage: Stage) { - def registerJobIdWithStageList(stages: List[Stage]) { + private def updateJobIdStageIdMaps(jobId: Int, stage: Stage) { + def updateJobIdStageIdMapsList(stages: List[Stage]) { if (!stages.isEmpty) { val s = stages.head stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId + jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id val parents = getParentStages(s.rdd, jobId) val parentsWithoutThisJobId = parents.filter(p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId))) - registerJobIdWithStageList(parentsWithoutThisJobId ++ stages.tail) + updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail) } } - registerJobIdWithStageList(List(stage)) + updateJobIdStageIdMapsList(List(stage)) } - private def jobIdToStageIdsAdd(jobId: Int) { - val stageSet = jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) - stageIdToJobIds.foreach { case (stageId, jobSet) => - if (jobSet.contains(jobId)) { - stageSet += stageId - } - } - } - - // Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that - // were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation. + /** + * Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that + * were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation. + */ private def removeJobAndIndependentStages(jobId: Int): Set[Int] = { val registeredStages = jobIdToStageIds(jobId) val independentStages = new HashSet[Int]() @@ -555,7 +549,6 @@ class DAGScheduler( idToActiveJob(jobId) = job activeJobs += job resultStageToJob(finalStage) = job - jobIdToStageIdsAdd(jobId) listenerBus.post(SparkListenerJobStart(job, jobIdToStageIds(jobId).toArray, properties)) submitStage(finalStage) } @@ -605,9 +598,11 @@ class DAGScheduler( handleTaskCompletion(completion) case LocalJobCompleted(stage) => + val jobId = stageIdToJobIds(stage.id).head stageIdToJobIds -= stage.id // clean up data structures that were populated for a local job, stageIdToStage -= stage.id // but that won't get cleaned up via the normal paths through stageToInfos -= stage // completion events or stage abort + jobIdToStageIds -= jobId case TaskSetFailed(taskSet, reason) => stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
