Cleaned up job cancellation handling

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/27c45e52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/27c45e52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/27c45e52

Branch: refs/heads/master
Commit: 27c45e523620d801d547f167a5a33d71ee3af7b5
Parents: 686a420
Author: Mark Hamstra <[email protected]>
Authored: Fri Nov 22 11:14:39 2013 -0800
Committer: Mark Hamstra <[email protected]>
Committed: Tue Dec 3 09:57:32 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/27c45e52/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b8b3ac0..aeac14a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -371,7 +371,7 @@ class DAGScheduler(
   }
 
   // Removes job and any stages that are not needed by any other job.  Returns 
the set of ids for stages that
-  // were removed and whose associated tasks may need to be cancelled.
+  // were removed.  The associated tasks for those stages need to be cancelled 
if we got here via job cancellation.
   private def removeJobAndIndependentStages(jobId: Int): Set[Int] = {
     val registeredStages = jobIdToStageIds(jobId)
     val independentStages = new HashSet[Int]()
@@ -562,8 +562,6 @@ class DAGScheduler(
 
       case JobCancelled(jobId) =>
         handleJobCancellation(jobId)
-        idToActiveJob.get(jobId).foreach(job => activeJobs -= job)
-        idToActiveJob -= jobId
 
       case JobGroupCancelled(groupId) =>
         // Cancel all jobs belonging to this job group.
@@ -571,14 +569,12 @@ class DAGScheduler(
         val activeInGroup = activeJobs.filter(groupId == 
_.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
         val jobIds = activeInGroup.map(_.jobId)
         jobIds.foreach { handleJobCancellation }
-        activeJobs --= activeInGroup
-        idToActiveJob --= jobIds
 
       case AllJobsCancelled =>
         // Cancel all running jobs.
         running.map(_.jobId).foreach { handleJobCancellation }
-        activeJobs.clear()
-        idToActiveJob.clear()
+        activeJobs.clear()      // These should already be empty by this point,
+        idToActiveJob.clear()   // but just in case we lost track of some 
jobs...
 
       case ExecutorGained(execId, host) =>
         handleExecutorGained(execId, host)
@@ -998,6 +994,8 @@ class DAGScheduler(
       job.listener.jobFailed(error)
       listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, 
Some(job.finalStage))))
       jobIdToStageIds -= jobId
+      activeJobs -= job
+      idToActiveJob -= jobId
     }
   }
 

Reply via email to