Repository: spark
Updated Branches:
  refs/heads/master ea9de658a -> 11973a7bd


Renamed stageIdToActiveJob to jobIdToActiveJob.

This data structure was misused and, as a result, later renamed to an incorrect 
name.

This data structure seems to have gotten into this tangled state as a result of 
@henrydavidge using the stageID instead of the job Id to index into it and 
later @andrewor14 renaming the data structure to reflect this misunderstanding.

This patch renames it and removes an incorrect indexing into it.  The incorrect 
indexing into it meant that the code added by @henrydavidge to warn when a task 
size is too large (added here 
https://github.com/apache/spark/commit/57579934f0454f258615c10e69ac2adafc5b9835)
 was not always executed; this commit fixes that.

Author: Kay Ousterhout <kayousterh...@gmail.com>

Closes #301 from kayousterhout/fixCancellation and squashes the following 
commits:

bd3d3a4 [Kay Ousterhout] Renamed stageIdToActiveJob to jobIdToActiveJob.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11973a7b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11973a7b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11973a7b

Branch: refs/heads/master
Commit: 11973a7bdad58fdb759033c232d87f0b279c83b4
Parents: ea9de65
Author: Kay Ousterhout <kayousterh...@gmail.com>
Authored: Wed Apr 2 10:35:52 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Wed Apr 2 10:35:52 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 21 ++++++++++----------
 .../spark/scheduler/DAGSchedulerSuite.scala     |  2 +-
 2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/11973a7b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4fce47e..ef3d24d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -84,7 +84,7 @@ class DAGScheduler(
   private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, 
HashSet[Int]]
   private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
   private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
-  private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
+  private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
   private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
   private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
 
@@ -536,7 +536,7 @@ class DAGScheduler(
           listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), 
properties))
           runLocally(job)
         } else {
-          stageIdToActiveJob(jobId) = job
+          jobIdToActiveJob(jobId) = job
           activeJobs += job
           resultStageToJob(finalStage) = job
           listenerBus.post(
@@ -559,7 +559,7 @@ class DAGScheduler(
         // Cancel all running jobs.
         runningStages.map(_.jobId).foreach(handleJobCancellation)
         activeJobs.clear()      // These should already be empty by this point,
-        stageIdToActiveJob.clear()   // but just in case we lost track of some 
jobs...
+        jobIdToActiveJob.clear()   // but just in case we lost track of some 
jobs...
 
       case ExecutorAdded(execId, host) =>
         handleExecutorAdded(execId, host)
@@ -569,7 +569,6 @@ class DAGScheduler(
 
       case BeginEvent(task, taskInfo) =>
         for (
-          job <- stageIdToActiveJob.get(task.stageId);
           stage <- stageIdToStage.get(task.stageId);
           stageInfo <- stageToInfos.get(stage)
         ) {
@@ -697,7 +696,7 @@ class DAGScheduler(
   private def activeJobForStage(stage: Stage): Option[Int] = {
     if (stageIdToJobIds.contains(stage.id)) {
       val jobsThatUseStage: Array[Int] = 
stageIdToJobIds(stage.id).toArray.sorted
-      jobsThatUseStage.find(stageIdToActiveJob.contains)
+      jobsThatUseStage.find(jobIdToActiveJob.contains)
     } else {
       None
     }
@@ -750,8 +749,8 @@ class DAGScheduler(
       }
     }
 
-    val properties = if (stageIdToActiveJob.contains(jobId)) {
-      stageIdToActiveJob(stage.jobId).properties
+    val properties = if (jobIdToActiveJob.contains(jobId)) {
+      jobIdToActiveJob(stage.jobId).properties
     } else {
       // this stage will be assigned to "default" pool
       null
@@ -827,7 +826,7 @@ class DAGScheduler(
                   job.numFinished += 1
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
-                    stageIdToActiveJob -= stage.jobId
+                    jobIdToActiveJob -= stage.jobId
                     activeJobs -= job
                     resultStageToJob -= stage
                     markStageAsFinished(stage)
@@ -986,11 +985,11 @@ class DAGScheduler(
       val independentStages = removeJobAndIndependentStages(jobId)
       independentStages.foreach(taskScheduler.cancelTasks)
       val error = new SparkException("Job %d cancelled".format(jobId))
-      val job = stageIdToActiveJob(jobId)
+      val job = jobIdToActiveJob(jobId)
       job.listener.jobFailed(error)
       jobIdToStageIds -= jobId
       activeJobs -= job
-      stageIdToActiveJob -= jobId
+      jobIdToActiveJob -= jobId
       listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, 
job.finalStage.id)))
     }
   }
@@ -1011,7 +1010,7 @@ class DAGScheduler(
       val error = new SparkException("Job aborted: " + reason)
       job.listener.jobFailed(error)
       jobIdToStageIdsRemove(job.jobId)
-      stageIdToActiveJob -= resultStage.jobId
+      jobIdToActiveJob -= resultStage.jobId
       activeJobs -= job
       resultStageToJob -= resultStage
       listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, 
failedStage.id)))

http://git-wip-us.apache.org/repos/asf/spark/blob/11973a7b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c97543f..ce567b0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -428,7 +428,7 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
     assert(scheduler.pendingTasks.isEmpty)
     assert(scheduler.activeJobs.isEmpty)
     assert(scheduler.failedStages.isEmpty)
-    assert(scheduler.stageIdToActiveJob.isEmpty)
+    assert(scheduler.jobIdToActiveJob.isEmpty)
     assert(scheduler.jobIdToStageIds.isEmpty)
     assert(scheduler.stageIdToJobIds.isEmpty)
     assert(scheduler.stageIdToStage.isEmpty)

Reply via email to