Refactoring to make job removal, stage removal, task cancellation clearer

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/686a420d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/686a420d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/686a420d

Branch: refs/heads/master
Commit: 686a420ddc33407050d9019711cbe801fc352fa3
Parents: 205566e
Author: Mark Hamstra <[email protected]>
Authored: Fri Nov 22 10:20:09 2013 -0800
Committer: Mark Hamstra <[email protected]>
Committed: Tue Dec 3 09:57:32 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 76 ++++++++++----------
 1 file changed, 39 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/686a420d/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 6f9d4d5..b8b3ac0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -370,9 +370,11 @@ class DAGScheduler(
     }
   }
 
-  // Removes job and applies p to any stages that aren't needed by any other 
jobs
-  private def forIndependentStagesOfRemovedJob(jobId: Int)(p: Int => Unit) {
+  // Removes job and any stages that are not needed by any other job.  Returns 
the set of ids for stages that
+  // were removed and whose associated tasks may need to be cancelled.
+  private def removeJobAndIndependentStages(jobId: Int): Set[Int] = {
     val registeredStages = jobIdToStageIds(jobId)
+    val independentStages = new HashSet[Int]()
     if (registeredStages.isEmpty) {
       logError("No stages registered for job " + jobId)
     } else {
@@ -382,49 +384,51 @@ class DAGScheduler(
             logError("Job %d not registered for stage %d even though that 
stage was registered for the job"
               .format(jobId, stageId))
           } else {
+            def removeStage(stageId: Int) {
+              // data structures based on Stage
+              stageIdToStage.get(stageId).foreach { s =>
+                if (running.contains(s)) {
+                  logDebug("Removing running stage %d".format(stageId))
+                  running -= s
+                }
+                stageToInfos -= s
+                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == 
s).foreach(shuffleToMapStage.remove)
+                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
+                  logDebug("Removing pending status for stage 
%d".format(stageId))
+                }
+                pendingTasks -= s
+                if (waiting.contains(s)) {
+                  logDebug("Removing stage %d from waiting 
set.".format(stageId))
+                  waiting -= s
+                }
+                if (failed.contains(s)) {
+                  logDebug("Removing stage %d from failed 
set.".format(stageId))
+                  failed -= s
+                }
+              }
+              // data structures based on StageId
+              stageIdToStage -= stageId
+              stageIdToJobIds -= stageId
+
+              logDebug("After removal of stage %d, remaining stages = 
%d".format(stageId, stageIdToStage.size))
+            }
+
             jobSet -= jobId
             if (jobSet.isEmpty) { // no other job needs this stage
-              p(stageId)
+              independentStages += stageId
+              removeStage(stageId)
             }
           }
       }
     }
-  }
-
-  private def removeStage(stageId: Int) {
-    // data structures based on Stage
-    stageIdToStage.get(stageId).foreach { s =>
-      if (running.contains(s)) {
-        logDebug("Removing running stage %d".format(stageId))
-        running -= s
-      }
-      stageToInfos -= s
-      shuffleToMapStage.keys.filter(shuffleToMapStage(_) == 
s).foreach(shuffleToMapStage.remove(_))
-      if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
-        logDebug("Removing pending status for stage %d".format(stageId))
-      }
-      pendingTasks -= s
-      if (waiting.contains(s)) {
-        logDebug("Removing stage %d from waiting set.".format(stageId))
-        waiting -= s
-      }
-      if (failed.contains(s)) {
-        logDebug("Removing stage %d from failed set.".format(stageId))
-        failed -= s
-      }
-    }
-    // data structures based on StageId
-    stageIdToStage -= stageId
-    stageIdToJobIds -= stageId
-
-    logDebug("After removal of stage %d, remaining stages = 
%d".format(stageId, stageIdToStage.size))
+    independentStages.toSet
   }
 
   private def jobIdToStageIdsRemove(jobId: Int) {
     if (!jobIdToStageIds.contains(jobId)) {
       logDebug("Trying to remove unregistered job " + jobId)
     } else {
-      forIndependentStagesOfRemovedJob(jobId) { removeStage }
+      removeJobAndIndependentStages(jobId)
       jobIdToStageIds -= jobId
     }
   }
@@ -987,10 +991,8 @@ class DAGScheduler(
     if (!jobIdToStageIds.contains(jobId)) {
       logDebug("Trying to cancel unregistered job " + jobId)
     } else {
-      forIndependentStagesOfRemovedJob(jobId) { stageId =>
-        taskSched.cancelTasks(stageId)
-        removeStage(stageId)
-      }
+      val independentStages = removeJobAndIndependentStages(jobId)
+      independentStages.foreach { taskSched.cancelTasks }
       val error = new SparkException("Job %d cancelled".format(jobId))
       val job = idToActiveJob(jobId)
       job.listener.jobFailed(error)

Reply via email to