Cleaned up job cancellation handling
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/27c45e52 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/27c45e52 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/27c45e52 Branch: refs/heads/master Commit: 27c45e523620d801d547f167a5a33d71ee3af7b5 Parents: 686a420 Author: Mark Hamstra <[email protected]> Authored: Fri Nov 22 11:14:39 2013 -0800 Committer: Mark Hamstra <[email protected]> Committed: Tue Dec 3 09:57:32 2013 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/27c45e52/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b8b3ac0..aeac14a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -371,7 +371,7 @@ class DAGScheduler( } // Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that - // were removed and whose associated tasks may need to be cancelled. + // were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation. private def removeJobAndIndependentStages(jobId: Int): Set[Int] = { val registeredStages = jobIdToStageIds(jobId) val independentStages = new HashSet[Int]() @@ -562,8 +562,6 @@ class DAGScheduler( case JobCancelled(jobId) => handleJobCancellation(jobId) - idToActiveJob.get(jobId).foreach(job => activeJobs -= job) - idToActiveJob -= jobId case JobGroupCancelled(groupId) => // Cancel all jobs belonging to this job group. @@ -571,14 +569,12 @@ class DAGScheduler( val activeInGroup = activeJobs.filter(groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val jobIds = activeInGroup.map(_.jobId) jobIds.foreach { handleJobCancellation } - activeJobs --= activeInGroup - idToActiveJob --= jobIds case AllJobsCancelled => // Cancel all running jobs. running.map(_.jobId).foreach { handleJobCancellation } - activeJobs.clear() - idToActiveJob.clear() + activeJobs.clear() // These should already be empty by this point, + idToActiveJob.clear() // but just in case we lost track of some jobs... case ExecutorGained(execId, host) => handleExecutorGained(execId, host) @@ -998,6 +994,8 @@ class DAGScheduler( job.listener.jobFailed(error) listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(job.finalStage)))) jobIdToStageIds -= jobId + activeJobs -= job + idToActiveJob -= jobId } }
