Repository: spark
Updated Branches:
  refs/heads/master 7b6c85636 -> 1a0955250


[SPARK-9851] Support submitting map stages individually in DAGScheduler

This patch adds support for submitting map stages in a DAG individually so that 
we can make downstream decisions after seeing statistics about their output, as 
part of SPARK-9850. I also added more comments to many of the key classes in 
DAGScheduler. By itself, the patch is not super useful except maybe to switch 
between a shuffle and broadcast join, but with the other subtasks of SPARK-9850 
we'll be able to do more interesting decisions.

The main entry point is SparkContext.submitMapStage, which lets you run a map 
stage and see stats about the map output sizes. Other stats could also be 
collected through accumulators. See AdaptiveSchedulingSuite for a short example.

Author: Matei Zaharia <ma...@databricks.com>

Closes #8180 from mateiz/spark-9851.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a095525
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a095525
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a095525

Branch: refs/heads/master
Commit: 1a0955250bb65cd6f5818ad60efb62ea4b45d18e
Parents: 7b6c856
Author: Matei Zaharia <ma...@databricks.com>
Authored: Mon Sep 14 21:47:40 2015 -0400
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Mon Sep 14 21:47:40 2015 -0400

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputStatistics.scala  |  27 ++
 .../org/apache/spark/MapOutputTracker.scala     |  49 +++-
 .../scala/org/apache/spark/SparkContext.scala   |  17 ++
 .../org/apache/spark/scheduler/ActiveJob.scala  |  34 ++-
 .../apache/spark/scheduler/DAGScheduler.scala   | 251 ++++++++++++++++---
 .../spark/scheduler/DAGSchedulerEvent.scala     |  10 +
 .../apache/spark/scheduler/ResultStage.scala    |  17 +-
 .../spark/scheduler/ShuffleMapStage.scala       |  13 +-
 .../org/apache/spark/scheduler/Stage.scala      |  26 +-
 .../scala/org/apache/spark/FailureSuite.scala   |  21 ++
 .../scheduler/AdaptiveSchedulingSuite.scala     |  65 +++++
 .../spark/scheduler/DAGSchedulerSuite.scala     | 243 +++++++++++++++++-
 12 files changed, 710 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala 
b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
new file mode 100644
index 0000000..f8a6f1d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Holds statistics about the output sizes in a map stage. May become a 
DeveloperApi in the future.
+ *
+ * @param shuffleId ID of the shuffle
+ * @param bytesByPartitionId approximate number of output bytes for each map 
output partition
+ *   (may be inexact due to use of compressed map statuses)
+ */
+private[spark] class MapOutputStatistics(val shuffleId: Int, val 
bytesByPartitionId: Array[Long])

http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index a387592..94eb8da 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import java.io._
+import java.util.Arrays
 import java.util.concurrent.ConcurrentHashMap
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
@@ -132,13 +133,43 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
    *         describing the shuffle blocks that are stored at that block 
manager.
    */
   def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
-  : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
+      : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
     logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId")
-    val startTime = System.currentTimeMillis
+    val statuses = getStatuses(shuffleId)
+    // Synchronize on the returned array because, on the driver, it gets 
mutated in place
+    statuses.synchronized {
+      return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
+    }
+  }
 
+  /**
+   * Return statistics about all of the outputs for a given shuffle.
+   */
+  def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
+    val statuses = getStatuses(dep.shuffleId)
+    // Synchronize on the returned array because, on the driver, it gets 
mutated in place
+    statuses.synchronized {
+      val totalSizes = new Array[Long](dep.partitioner.numPartitions)
+      for (s <- statuses) {
+        for (i <- 0 until totalSizes.length) {
+          totalSizes(i) += s.getSizeForBlock(i)
+        }
+      }
+      new MapOutputStatistics(dep.shuffleId, totalSizes)
+    }
+  }
+
+  /**
+   * Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: 
clients MUST synchronize
+   * on this array when reading it, because on the driver, we may be changing 
it in place.
+   *
+   * (It would be nice to remove this restriction in the future.)
+   */
+  private def getStatuses(shuffleId: Int): Array[MapStatus] = {
     val statuses = mapStatuses.get(shuffleId).orNull
     if (statuses == null) {
       logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching 
them")
+      val startTime = System.currentTimeMillis
       var fetchedStatuses: Array[MapStatus] = null
       fetching.synchronized {
         // Someone else is fetching it; wait for them to be done
@@ -160,7 +191,7 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
       }
 
       if (fetchedStatuses == null) {
-        // We won the race to fetch the output locs; do so
+        // We won the race to fetch the statuses; do so
         logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
         // This try-finally prevents hangs due to timeouts:
         try {
@@ -175,22 +206,18 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
           }
         }
       }
-      logDebug(s"Fetching map output location for shuffle $shuffleId, reduce 
$reduceId took " +
+      logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
         s"${System.currentTimeMillis - startTime} ms")
 
       if (fetchedStatuses != null) {
-        fetchedStatuses.synchronized {
-          return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, 
fetchedStatuses)
-        }
+        return fetchedStatuses
       } else {
         logError("Missing all output locations for shuffle " + shuffleId)
         throw new MetadataFetchFailedException(
-          shuffleId, reduceId, "Missing all output locations for shuffle " + 
shuffleId)
+          shuffleId, -1, "Missing all output locations for shuffle " + 
shuffleId)
       }
     } else {
-      statuses.synchronized {
-        return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, 
statuses)
-      }
+      return statuses
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e27b3c4..dee6091 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1985,6 +1985,23 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   }
 
   /**
+   * Submit a map stage for execution. This is currently an internal API only, 
but might be
+   * promoted to DeveloperApi in the future.
+   */
+  private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, 
V, C])
+      : SimpleFutureAction[MapOutputStatistics] = {
+    assertNotStopped()
+    val callSite = getCallSite()
+    var result: MapOutputStatistics = null
+    val waiter = dagScheduler.submitMapStage(
+      dependency,
+      (r: MapOutputStatistics) => { result = r },
+      callSite,
+      localProperties.get)
+    new SimpleFutureAction[MapOutputStatistics](waiter, result)
+  }
+
+  /**
    * Cancel active jobs for the specified group. See 
[[org.apache.spark.SparkContext.setJobGroup]]
    * for more information.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
index 50a6937..a3d2db3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
@@ -23,18 +23,42 @@ import org.apache.spark.TaskContext
 import org.apache.spark.util.CallSite
 
 /**
- * Tracks information about an active job in the DAGScheduler.
+ * A running job in the DAGScheduler. Jobs can be of two types: a result job, 
which computes a
+ * ResultStage to execute an action, or a map-stage job, which computes the 
map outputs for a
+ * ShuffleMapStage before any downstream stages are submitted. The latter is 
used for adaptive
+ * query planning, to look at map output statistics before submitting later 
stages. We distinguish
+ * between these two types of jobs using the finalStage field of this class.
+ *
+ * Jobs are only tracked for "leaf" stages that clients directly submitted, 
through DAGScheduler's
+ * submitJob or submitMapStage methods. However, either type of job may cause 
the execution of
+ * other earlier stages (for RDDs in the DAG it depends on), and multiple jobs 
may share some of
+ * these previous stages. These dependencies are managed inside DAGScheduler.
+ *
+ * @param jobId A unique ID for this job.
+ * @param finalStage The stage that this job computes (either a ResultStage 
for an action or a
+ *   ShuffleMapStage for submitMapStage).
+ * @param callSite Where this job was initiated in the user's program (shown 
on UI).
+ * @param listener A listener to notify if tasks in this job finish or the job 
fails.
+ * @param properties Scheduling properties attached to the job, such as fair 
scheduler pool name.
  */
 private[spark] class ActiveJob(
     val jobId: Int,
-    val finalStage: ResultStage,
-    val func: (TaskContext, Iterator[_]) => _,
-    val partitions: Array[Int],
+    val finalStage: Stage,
     val callSite: CallSite,
     val listener: JobListener,
     val properties: Properties) {
 
-  val numPartitions = partitions.length
+  /**
+   * Number of partitions we need to compute for this job. Note that result 
stages may not need
+   * to compute all partitions in their target RDD, for actions like first() 
and lookup().
+   */
+  val numPartitions = finalStage match {
+    case r: ResultStage => r.partitions.length
+    case m: ShuffleMapStage => m.rdd.partitions.length
+  }
+
+  /** Which partitions of the stage have finished */
   val finished = Array.fill[Boolean](numPartitions)(false)
+
   var numFinished = 0
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 09e963f..b4f90e8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -45,17 +45,65 @@ import 
org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
  * The high-level scheduling layer that implements stage-oriented scheduling. 
It computes a DAG of
  * stages for each job, keeps track of which RDDs and stage outputs are 
materialized, and finds a
  * minimal schedule to run the job. It then submits stages as TaskSets to an 
underlying
- * TaskScheduler implementation that runs them on the cluster.
+ * TaskScheduler implementation that runs them on the cluster. A TaskSet 
contains fully independent
+ * tasks that can run right away based on the data that's already on the 
cluster (e.g. map output
+ * files from previous stages), though it may fail if this data becomes 
unavailable.
  *
- * In addition to coming up with a DAG of stages, this class also determines 
the preferred
+ * Spark stages are created by breaking the RDD graph at shuffle boundaries. 
RDD operations with
+ * "narrow" dependencies, like map() and filter(), are pipelined together into 
one set of tasks
+ * in each stage, but operations with shuffle dependencies require multiple 
stages (one to write a
+ * set of map output files, and another to read those files after a barrier). 
In the end, every
+ * stage will have only shuffle dependencies on other stages, and may compute 
multiple operations
+ * inside it. The actual pipelining of these operations happens in the 
RDD.compute() functions of
+ * various RDDs (MappedRDD, FilteredRDD, etc).
+ *
+ * In addition to coming up with a DAG of stages, the DAGScheduler also 
determines the preferred
  * locations to run each task on, based on the current cache status, and 
passes these to the
  * low-level TaskScheduler. Furthermore, it handles failures due to shuffle 
output files being
  * lost, in which case old stages may need to be resubmitted. Failures 
*within* a stage that are
  * not caused by shuffle file loss are handled by the TaskScheduler, which 
will retry each task
  * a small number of times before cancelling the whole stage.
  *
+ * When looking through this code, there are several key concepts:
+ *
+ *  - Jobs (represented by [[ActiveJob]]) are the top-level work items 
submitted to the scheduler.
+ *    For example, when the user calls an action, like count(), a job will be 
submitted through
+ *    submitJob. Each Job may require the execution of multiple stages to 
build intermediate data.
+ *
+ *  - Stages ([[Stage]]) are sets of tasks that compute intermediate results 
in jobs, where each
+ *    task computes the same function on partitions of the same RDD. Stages 
are separated at shuffle
+ *    boundaries, which introduce a barrier (where we must wait for the 
previous stage to finish to
+ *    fetch outputs). There are two types of stages: [[ResultStage]], for the 
final stage that
+ *    executes an action, and [[ShuffleMapStage]], which writes map output 
files for a shuffle.
+ *    Stages are often shared across multiple jobs, if these jobs reuse the 
same RDDs.
+ *
+ *  - Tasks are individual units of work, each sent to one machine.
+ *
+ *  - Cache tracking: the DAGScheduler figures out which RDDs are cached to 
avoid recomputing them
+ *    and likewise remembers which shuffle map stages have already produced 
output files to avoid
+ *    redoing the map side of a shuffle.
+ *
+ *  - Preferred locations: the DAGScheduler also computes where to run each 
task in a stage based
+ *    on the preferred locations of its underlying RDDs, or the location of 
cached or shuffle data.
+ *
+ *  - Cleanup: all data structures are cleared when the running jobs that 
depend on them finish,
+ *    to prevent memory leaks in a long-running application.
+ *
+ * To recover from failures, the same stage might need to run multiple times, 
which are called
+ * "attempts". If the TaskScheduler reports that a task failed because a map 
output file from a
+ * previous stage was lost, the DAGScheduler resubmits that lost stage. This 
is detected through a
+ * CompletionEvent with FetchFailed, or an ExecutorLost event. The 
DAGScheduler will wait a small
+ * amount of time to see whether other nodes or tasks fail, then resubmit 
TaskSets for any lost
+ * stage(s) that compute the missing tasks. As part of this process, we might 
also have to create
+ * Stage objects for old (finished) stages where we previously cleaned up the 
Stage object. Since
+ * tasks from the old attempt of a stage could still be running, care must be 
taken to map any
+ * events received in the correct Stage object.
+ *
  * Here's a checklist to use when making or reviewing changes to this class:
  *
+ *  - All data structures should be cleared when the jobs involving them end 
to avoid indefinite
+ *    accumulation of state in long-running programs.
+ *
  *  - When adding a new data structure, update 
`DAGSchedulerSuite.assertDataStructuresEmpty` to
  *    include the new structure. This will help to catch memory leaks.
  */
@@ -295,12 +343,12 @@ class DAGScheduler(
    */
   private def newResultStage(
       rdd: RDD[_],
-      numTasks: Int,
+      func: (TaskContext, Iterator[_]) => _,
+      partitions: Array[Int],
       jobId: Int,
       callSite: CallSite): ResultStage = {
     val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
-    val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, 
jobId, callSite)
-
+    val stage = new ResultStage(id, rdd, func, partitions, parentStages, 
jobId, callSite)
     stageIdToStage(id) = stage
     updateJobIdStageIdMaps(jobId, stage)
     stage
@@ -500,12 +548,25 @@ class DAGScheduler(
     jobIdToStageIds -= job.jobId
     jobIdToActiveJob -= job.jobId
     activeJobs -= job
-    job.finalStage.resultOfJob = None
+    job.finalStage match {
+      case r: ResultStage =>
+        r.resultOfJob = None
+      case m: ShuffleMapStage =>
+        m.mapStageJobs = m.mapStageJobs.filter(_ != job)
+    }
   }
 
   /**
-   * Submit a job to the job scheduler and get a JobWaiter object back. The 
JobWaiter object
+   * Submit an action job to the scheduler and get a JobWaiter object back. 
The JobWaiter object
    * can be used to block until the the job finishes executing or can be used 
to cancel the job.
+   *
+   * @param rdd target RDD to run tasks on
+   * @param func a function to run on each partition of the RDD
+   * @param partitions set of partitions to run on; some jobs may not want to 
compute on all
+   *   partitions of the target RDD, e.g. for operations like first()
+   * @param callSite where in the user program this job was called
+   * @param resultHandler callback to pass each result to
+   * @param properties scheduler properties to attach to this job, e.g. fair 
scheduler pool name
    */
   def submitJob[T, U](
       rdd: RDD[T],
@@ -524,6 +585,7 @@ class DAGScheduler(
 
     val jobId = nextJobId.getAndIncrement()
     if (partitions.size == 0) {
+      // Return immediately if the job is running 0 tasks
       return new JobWaiter[U](this, jobId, 0, resultHandler)
     }
 
@@ -536,6 +598,18 @@ class DAGScheduler(
     waiter
   }
 
+  /**
+   * Run an action job on the given RDD and pass all the results to the 
resultHandler function as
+   * they arrive. Throws an exception if the job fials, or returns normally if 
successful.
+   *
+   * @param rdd target RDD to run tasks on
+   * @param func a function to run on each partition of the RDD
+   * @param partitions set of partitions to run on; some jobs may not want to 
compute on all
+   *   partitions of the target RDD, e.g. for operations like first()
+   * @param callSite where in the user program this job was called
+   * @param resultHandler callback to pass each result to
+   * @param properties scheduler properties to attach to this job, e.g. fair 
scheduler pool name
+   */
   def runJob[T, U](
       rdd: RDD[T],
       func: (TaskContext, Iterator[T]) => U,
@@ -559,6 +633,17 @@ class DAGScheduler(
     }
   }
 
+  /**
+   * Run an approximate job on the given RDD and pass all the results to an 
ApproximateEvaluator
+   * as they arrive. Returns a partial result object from the evaluator.
+   *
+   * @param rdd target RDD to run tasks on
+   * @param func a function to run on each partition of the RDD
+   * @param evaluator [[ApproximateEvaluator]] to receive the partial results
+   * @param callSite where in the user program this job was called
+   * @param timeout maximum time to wait for the job, in milliseconds
+   * @param properties scheduler properties to attach to this job, e.g. fair 
scheduler pool name
+   */
   def runApproximateJob[T, U, R](
       rdd: RDD[T],
       func: (TaskContext, Iterator[T]) => U,
@@ -576,6 +661,41 @@ class DAGScheduler(
   }
 
   /**
+   * Submit a shuffle map stage to run independently and get a JobWaiter 
object back. The waiter
+   * can be used to block until the the job finishes executing or can be used 
to cancel the job.
+   * This method is used for adaptive query planning, to run map stages and 
look at statistics
+   * about their outputs before submitting downstream stages.
+   *
+   * @param dependency the ShuffleDependency to run a map stage for
+   * @param callback function called with the result of the job, which in this 
case will be a
+   *   single MapOutputStatistics object showing how much data was produced 
for each partition
+   * @param callSite where in the user program this job was submitted
+   * @param properties scheduler properties to attach to this job, e.g. fair 
scheduler pool name
+   */
+  def submitMapStage[K, V, C](
+      dependency: ShuffleDependency[K, V, C],
+      callback: MapOutputStatistics => Unit,
+      callSite: CallSite,
+      properties: Properties): JobWaiter[MapOutputStatistics] = {
+
+    val rdd = dependency.rdd
+    val jobId = nextJobId.getAndIncrement()
+    if (rdd.partitions.length == 0) {
+      throw new SparkException("Can't run submitMapStage on RDD with 0 
partitions")
+    }
+
+    // We create a JobWaiter with only one "task", which will be marked as 
complete when the whole
+    // map stage has completed, and will be passed the MapOutputStatistics for 
that stage.
+    // This makes it easier to avoid race conditions between the user code and 
the map output
+    // tracker that might result if we told the user the stage had finished, 
but then they queries
+    // the map output tracker and some node failures had caused the output 
statistics to be lost.
+    val waiter = new JobWaiter(this, jobId, 1, (i: Int, r: 
MapOutputStatistics) => callback(r))
+    eventProcessLoop.post(MapStageSubmitted(
+      jobId, dependency, callSite, waiter, 
SerializationUtils.clone(properties)))
+    waiter
+  }
+
+  /**
    * Cancel a job that is running or waiting in the queue.
    */
   def cancelJob(jobId: Int): Unit = {
@@ -583,6 +703,9 @@ class DAGScheduler(
     eventProcessLoop.post(JobCancelled(jobId))
   }
 
+  /**
+   * Cancel all jobs in the given job group ID.
+   */
   def cancelJobGroup(groupId: String): Unit = {
     logInfo("Asked to cancel job group " + groupId)
     eventProcessLoop.post(JobGroupCancelled(groupId))
@@ -720,31 +843,77 @@ class DAGScheduler(
     try {
       // New stage creation may throw an exception if, for example, jobs are 
run on a
       // HadoopRDD whose underlying HDFS files have been deleted.
-      finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite)
+      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
     } catch {
       case e: Exception =>
         logWarning("Creating new stage failed due to exception - job: " + 
jobId, e)
         listener.jobFailed(e)
         return
     }
-    if (finalStage != null) {
-      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, 
listener, properties)
-      clearCacheLocs()
-      logInfo("Got job %s (%s) with %d output partitions".format(
-        job.jobId, callSite.shortForm, partitions.length))
-      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
-      logInfo("Parents of final stage: " + finalStage.parents)
-      logInfo("Missing parents: " + getMissingParentStages(finalStage))
-      val jobSubmissionTime = clock.getTimeMillis()
-      jobIdToActiveJob(jobId) = job
-      activeJobs += job
-      finalStage.resultOfJob = Some(job)
-      val stageIds = jobIdToStageIds(jobId).toArray
-      val stageInfos = stageIds.flatMap(id => 
stageIdToStage.get(id).map(_.latestInfo))
-      listenerBus.post(
-        SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, 
properties))
-      submitStage(finalStage)
+
+    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
+    clearCacheLocs()
+    logInfo("Got job %s (%s) with %d output partitions".format(
+      job.jobId, callSite.shortForm, partitions.length))
+    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
+    logInfo("Parents of final stage: " + finalStage.parents)
+    logInfo("Missing parents: " + getMissingParentStages(finalStage))
+
+    val jobSubmissionTime = clock.getTimeMillis()
+    jobIdToActiveJob(jobId) = job
+    activeJobs += job
+    finalStage.resultOfJob = Some(job)
+    val stageIds = jobIdToStageIds(jobId).toArray
+    val stageInfos = stageIds.flatMap(id => 
stageIdToStage.get(id).map(_.latestInfo))
+    listenerBus.post(
+      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, 
properties))
+    submitStage(finalStage)
+
+    submitWaitingStages()
+  }
+
+  private[scheduler] def handleMapStageSubmitted(jobId: Int,
+      dependency: ShuffleDependency[_, _, _],
+      callSite: CallSite,
+      listener: JobListener,
+      properties: Properties) {
+    // Submitting this map stage might still require the creation of some 
parent stages, so make
+    // sure that happens.
+    var finalStage: ShuffleMapStage = null
+    try {
+      // New stage creation may throw an exception if, for example, jobs are 
run on a
+      // HadoopRDD whose underlying HDFS files have been deleted.
+      finalStage = getShuffleMapStage(dependency, jobId)
+    } catch {
+      case e: Exception =>
+        logWarning("Creating new stage failed due to exception - job: " + 
jobId, e)
+        listener.jobFailed(e)
+        return
+    }
+
+    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
+    clearCacheLocs()
+    logInfo("Got map stage job %s (%s) with %d output partitions".format(
+      jobId, callSite.shortForm, dependency.rdd.partitions.size))
+    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
+    logInfo("Parents of final stage: " + finalStage.parents)
+    logInfo("Missing parents: " + getMissingParentStages(finalStage))
+
+    val jobSubmissionTime = clock.getTimeMillis()
+    jobIdToActiveJob(jobId) = job
+    activeJobs += job
+    finalStage.mapStageJobs = job :: finalStage.mapStageJobs
+    val stageIds = jobIdToStageIds(jobId).toArray
+    val stageInfos = stageIds.flatMap(id => 
stageIdToStage.get(id).map(_.latestInfo))
+    listenerBus.post(
+      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, 
properties))
+    submitStage(finalStage)
+
+    // If the whole stage has already finished, tell the listener and remove it
+    if (!finalStage.outputLocs.contains(Nil)) {
+      markMapStageJobAsFinished(job, 
mapOutputTracker.getStatistics(dependency))
     }
+
     submitWaitingStages()
   }
 
@@ -814,7 +983,7 @@ class DAGScheduler(
         case s: ResultStage =>
           val job = s.resultOfJob.get
           partitionsToCompute.map { id =>
-            val p = job.partitions(id)
+            val p = s.partitions(id)
             (id, getPreferredLocs(stage.rdd, p))
           }.toMap
       }
@@ -844,7 +1013,7 @@ class DAGScheduler(
         case stage: ShuffleMapStage =>
           closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef).array()
         case stage: ResultStage =>
-          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): 
AnyRef).array()
+          closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
       }
 
       taskBinary = sc.broadcast(taskBinaryBytes)
@@ -875,7 +1044,7 @@ class DAGScheduler(
         case stage: ResultStage =>
           val job = stage.resultOfJob.get
           partitionsToCompute.map { id =>
-            val p: Int = job.partitions(id)
+            val p: Int = stage.partitions(id)
             val part = stage.rdd.partitions(p)
             val locs = taskIdToLocations(id)
             new ResultTask(stage.id, stage.latestInfo.attemptId,
@@ -1052,13 +1221,21 @@ class DAGScheduler(
                 logInfo("Resubmitting " + shuffleStage + " (" + 
shuffleStage.name +
                   ") because some of its tasks had failed: " +
                   shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
-                      .map(_._2).mkString(", "))
+                    .map(_._2).mkString(", "))
                 submitStage(shuffleStage)
+              } else {
+                // Mark any map-stage jobs waiting on this stage as finished
+                if (shuffleStage.mapStageJobs.nonEmpty) {
+                  val stats = 
mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
+                  for (job <- shuffleStage.mapStageJobs) {
+                    markMapStageJobAsFinished(job, stats)
+                  }
+                }
               }
 
               // Note: newly runnable stages will be submitted below when we 
submit waiting stages
             }
-          }
+        }
 
       case Resubmitted =>
         logInfo("Resubmitted " + task + ", so marking it as still running")
@@ -1412,6 +1589,17 @@ class DAGScheduler(
     Nil
   }
 
+  /** Mark a map stage job as finished with the given output stats, and report 
to its listener. */
+  def markMapStageJobAsFinished(job: ActiveJob, stats: MapOutputStatistics): 
Unit = {
+    // In map stage jobs, we only create a single "task", which is to finish 
all of the stage
+    // (including reusing any previous map outputs, etc); so we just mark task 
0 as done
+    job.finished(0) = true
+    job.numFinished += 1
+    job.listener.taskSucceeded(0, stats)
+    cleanupStateForJobAndIndependentStages(job)
+    listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), 
JobSucceeded))
+  }
+
   def stop() {
     logInfo("Stopping DAGScheduler")
     messageScheduler.shutdownNow()
@@ -1445,6 +1633,9 @@ private[scheduler] class 
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, 
properties) =>
       dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, 
listener, properties)
 
+    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) 
=>
+      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, 
listener, properties)
+
     case StageCancelled(stageId) =>
       dagScheduler.handleStageCancellation(stageId)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index f72a52e..dda3b6c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -35,6 +35,7 @@ import org.apache.spark.util.CallSite
  */
 private[scheduler] sealed trait DAGSchedulerEvent
 
+/** A result-yielding job was submitted on a target RDD */
 private[scheduler] case class JobSubmitted(
     jobId: Int,
     finalRDD: RDD[_],
@@ -45,6 +46,15 @@ private[scheduler] case class JobSubmitted(
     properties: Properties = null)
   extends DAGSchedulerEvent
 
+/** A map stage as submitted to run as a separate job */
+private[scheduler] case class MapStageSubmitted(
+  jobId: Int,
+  dependency: ShuffleDependency[_, _, _],
+  callSite: CallSite,
+  listener: JobListener,
+  properties: Properties = null)
+  extends DAGSchedulerEvent
+
 private[scheduler] case class StageCancelled(stageId: Int) extends 
DAGSchedulerEvent
 
 private[scheduler] case class JobCancelled(jobId: Int) extends 
DAGSchedulerEvent

http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
index bf81b9a..c0451da 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
@@ -17,23 +17,30 @@
 
 package org.apache.spark.scheduler
 
+import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.CallSite
 
 /**
- * The ResultStage represents the final stage in a job.
+ * ResultStages apply a function on some partitions of an RDD to compute the 
result of an action.
+ * The ResultStage object captures the function to execute, `func`, which will 
be applied to each
+ * partition, and the set of partition IDs, `partitions`. Some stages may not 
run on all partitions
+ * of the RDD, for actions like first() and lookup().
  */
 private[spark] class ResultStage(
     id: Int,
     rdd: RDD[_],
-    numTasks: Int,
+    val func: (TaskContext, Iterator[_]) => _,
+    val partitions: Array[Int],
     parents: List[Stage],
     firstJobId: Int,
     callSite: CallSite)
-  extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
+  extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {
 
-  // The active job for this result stage. Will be empty if the job has 
already finished
-  // (e.g., because the job was cancelled).
+  /**
+   * The active job for this result stage. Will be empty if the job has 
already finished
+   * (e.g., because the job was cancelled).
+   */
   var resultOfJob: Option[ActiveJob] = None
 
   override def toString: String = "ResultStage " + id

http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
index 48d8d8e..7d92960 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
@@ -23,7 +23,15 @@ import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.CallSite
 
 /**
- * The ShuffleMapStage represents the intermediate stages in a job.
+ * ShuffleMapStages are intermediate stages in the execution DAG that produce 
data for a shuffle.
+ * They occur right before each shuffle operation, and might contain multiple 
pipelined operations
+ * before that (e.g. map and filter). When executed, they save map output 
files that can later be
+ * fetched by reduce tasks. The `shuffleDep` field describes the shuffle each 
stage is part of,
+ * and variables like `outputLocs` and `numAvailableOutputs` track how many 
map outputs are ready.
+ *
+ * ShuffleMapStages can also be submitted independently as jobs with 
DAGScheduler.submitMapStage.
+ * For such stages, the ActiveJobs that submitted them are tracked in 
`mapStageJobs`. Note that
+ * there can be multiple ActiveJobs trying to compute the same shuffle map 
stage.
  */
 private[spark] class ShuffleMapStage(
     id: Int,
@@ -37,6 +45,9 @@ private[spark] class ShuffleMapStage(
 
   override def toString: String = "ShuffleMapStage " + id
 
+  /** Running map-stage jobs that were submitted to execute this stage 
independently (if any) */
+  var mapStageJobs: List[ActiveJob] = Nil
+
   var numAvailableOutputs: Int = 0
 
   def isAvailable: Boolean = numAvailableOutputs == numPartitions

http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index c086535..b37eccb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -24,27 +24,33 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.util.CallSite
 
 /**
- * A stage is a set of independent tasks all computing the same function that 
need to run as part
+ * A stage is a set of parallel tasks all computing the same function that 
need to run as part
  * of a Spark job, where all the tasks have the same shuffle dependencies. 
Each DAG of tasks run
  * by the scheduler is split up into stages at the boundaries where shuffle 
occurs, and then the
  * DAGScheduler runs these stages in topological order.
  *
  * Each Stage can either be a shuffle map stage, in which case its tasks' 
results are input for
- * another stage, or a result stage, in which case its tasks directly compute 
the action that
- * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we 
also track the nodes
- * that each output partition is on.
+ * other stage(s), or a result stage, in which case its tasks directly compute 
a Spark action
+ * (e.g. count(), save(), etc) by running a function on an RDD. For shuffle 
map stages, we also
+ * track the nodes that each output partition is on.
  *
  * Each Stage also has a firstJobId, identifying the job that first submitted 
the stage.  When FIFO
  * scheduling is used, this allows Stages from earlier jobs to be computed 
first or recovered
  * faster on failure.
  *
- * The callSite provides a location in user code which relates to the stage. 
For a shuffle map
- * stage, the callSite gives the user code that created the RDD being 
shuffled. For a result
- * stage, the callSite gives the user code that executes the associated action 
(e.g. count()).
- *
- * A single stage can consist of multiple attempts. In that case, the 
latestInfo field will
- * be updated for each attempt.
+ * Finally, a single stage can be re-executed in multiple attempts due to 
fault recovery. In that
+ * case, the Stage object will track multiple StageInfo objects to pass to 
listeners or the web UI.
+ * The latest one will be accessible through latestInfo.
  *
+ * @param id Unique stage ID
+ * @param rdd RDD that this stage runs on: for a shuffle map stage, it's the 
RDD we run map tasks
+ *   on, while for a result stage, it's the target RDD that we ran an action on
+ * @param numTasks Total number of tasks in stage; result stages in particular 
may not need to
+ *   compute all partitions, e.g. for first(), lookup(), and take().
+ * @param parents List of stages that this stage depends on (through shuffle 
dependencies).
+ * @param firstJobId ID of the first job this stage was part of, for FIFO 
scheduling.
+ * @param callSite Location in the user program associated with this stage: 
either where the target
+ *   RDD was created, for a shuffle map stage, or where the action for a 
result stage was called.
  */
 private[scheduler] abstract class Stage(
     val id: Int,

http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala 
b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index aa50a49..f58756e 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -217,6 +217,27 @@ class FailureSuite extends SparkFunSuite with 
LocalSparkContext {
     FailureSuiteState.clear()
   }
 
+  // Run a 3-task map stage where one task fails once.
+  test("failure in tasks in a submitMapStage") {
+    sc = new SparkContext("local[1,2]", "test")
+    val rdd = sc.makeRDD(1 to 3, 3).map { x =>
+      FailureSuiteState.synchronized {
+        FailureSuiteState.tasksRun += 1
+        if (x == 1 && FailureSuiteState.tasksFailed == 0) {
+          FailureSuiteState.tasksFailed += 1
+          throw new Exception("Intentional task failure")
+        }
+      }
+      (x, x)
+    }
+    val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(2))
+    sc.submitMapStage(dep).get()
+    FailureSuiteState.synchronized {
+      assert(FailureSuiteState.tasksRun === 4)
+    }
+    FailureSuiteState.clear()
+  }
+
   // TODO: Need to add tests with shuffle fetch failures.
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala
new file mode 100644
index 0000000..3fe2802
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.rdd.{ShuffledRDDPartition, RDD, ShuffledRDD}
+import org.apache.spark._
+
+object AdaptiveSchedulingSuiteState {
+  var tasksRun = 0
+
+  def clear(): Unit = {
+    tasksRun = 0
+  }
+}
+
+/** A special ShuffledRDD where we can pass a ShuffleDependency object to use 
*/
+class CustomShuffledRDD[K, V, C](@transient dep: ShuffleDependency[K, V, C])
+  extends RDD[(K, C)](dep.rdd.context, Seq(dep)) {
+
+  override def compute(split: Partition, context: TaskContext): Iterator[(K, 
C)] = {
+    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
+    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, 
split.index + 1, context)
+      .read()
+      .asInstanceOf[Iterator[(K, C)]]
+  }
+
+  override def getPartitions: Array[Partition] = {
+    Array.tabulate[Partition](dep.partitioner.numPartitions)(i => new 
ShuffledRDDPartition(i))
+  }
+}
+
+class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext {
+  test("simple use of submitMapStage") {
+    try {
+      sc = new SparkContext("local[1,2]", "test")
+      val rdd = sc.parallelize(1 to 3, 3).map { x =>
+        AdaptiveSchedulingSuiteState.tasksRun += 1
+        (x, x)
+      }
+      val dep = new ShuffleDependency[Int, Int, Int](rdd, new 
HashPartitioner(2))
+      val shuffled = new CustomShuffledRDD[Int, Int, Int](dep)
+      sc.submitMapStage(dep).get()
+      assert(AdaptiveSchedulingSuiteState.tasksRun == 3)
+      assert(shuffled.collect().toSet == Set((1, 1), (2, 2), (3, 3)))
+      assert(AdaptiveSchedulingSuiteState.tasksRun == 3)
+    } finally {
+      AdaptiveSchedulingSuiteState.clear()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1a095525/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 1b9ff74..1c55f90 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -152,6 +152,14 @@ class DAGSchedulerSuite
     override def jobFailed(exception: Exception) = { failure = exception }
   }
 
+  /** A simple helper class for creating custom JobListeners */
+  class SimpleListener extends JobListener {
+    val results = new HashMap[Int, Any]
+    var failure: Exception = null
+    override def taskSucceeded(index: Int, result: Any): Unit = 
results.put(index, result)
+    override def jobFailed(exception: Exception): Unit = { failure = exception 
}
+  }
+
   before {
     sc = new SparkContext("local", "DAGSchedulerSuite")
     sparkListener.submittedStageInfos.clear()
@@ -229,7 +237,7 @@ class DAGSchedulerSuite
     }
   }
 
-  /** Sends the rdd to the scheduler for scheduling and returns the job id. */
+  /** Submits a job to the scheduler and returns the job id. */
   private def submit(
       rdd: RDD[_],
       partitions: Array[Int],
@@ -240,6 +248,15 @@ class DAGSchedulerSuite
     jobId
   }
 
+  /** Submits a map stage to the scheduler and returns the job id. */
+  private def submitMapStage(
+      shuffleDep: ShuffleDependency[_, _, _],
+      listener: JobListener = jobListener): Int = {
+    val jobId = scheduler.nextJobId.getAndIncrement()
+    runEvent(MapStageSubmitted(jobId, shuffleDep, CallSite("", ""), listener))
+    jobId
+  }
+
   /** Sends TaskSetFailed to the scheduler. */
   private def failed(taskSet: TaskSet, message: String) {
     runEvent(TaskSetFailed(taskSet, message, None))
@@ -1313,6 +1330,230 @@ class DAGSchedulerSuite
     assert(stackTraceString.contains("org.scalatest.FunSuite"))
   }
 
+  test("simple map stage submission") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(1))
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+
+    // Submit a map stage by itself
+    submitMapStage(shuffleDep)
+    assert(results.size === 0)  // No results yet
+    completeShuffleMapStageSuccessfully(0, 0, 1)
+    assert(results.size === 1)
+    results.clear()
+    assertDataStructuresEmpty()
+
+    // Submit a reduce job that depends on this map stage; it should directly 
do the reduce
+    submit(reduceRdd, Array(0))
+    completeNextResultStageWithSuccess(2, 0)
+    assert(results === Map(0 -> 42))
+    results.clear()
+    assertDataStructuresEmpty()
+
+    // Check that if we submit the map stage again, no tasks run
+    submitMapStage(shuffleDep)
+    assert(results.size === 1)
+    assertDataStructuresEmpty()
+  }
+
+  test("map stage submission with reduce stage also depending on the data") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(1))
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+
+    // Submit the map stage by itself
+    submitMapStage(shuffleDep)
+
+    // Submit a reduce job that depends on this map stage
+    submit(reduceRdd, Array(0))
+
+    // Complete tasks for the map stage
+    completeShuffleMapStageSuccessfully(0, 0, 1)
+    assert(results.size === 1)
+    results.clear()
+
+    // Complete tasks for the reduce stage
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42))
+    results.clear()
+    assertDataStructuresEmpty()
+
+    // Check that if we submit the map stage again, no tasks run
+    submitMapStage(shuffleDep)
+    assert(results.size === 1)
+    assertDataStructuresEmpty()
+  }
+
+  test("map stage submission with fetch failure") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+
+    // Submit a map stage by itself
+    submitMapStage(shuffleDep)
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
+    assert(results.size === 1)
+    results.clear()
+    assertDataStructuresEmpty()
+
+    // Submit a reduce job that depends on this map stage, but where one 
reduce will fail a fetch
+    submit(reduceRdd, Array(0, 1))
+    complete(taskSets(1), Seq(
+      (Success, 42),
+      (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), 
null)))
+    // Ask the scheduler to try it again; TaskSet 2 will rerun the map task 
that we couldn't fetch
+    // from, then TaskSet 3 will run the reduce stage
+    scheduler.resubmitFailedStages()
+    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 
reduceRdd.partitions.size))))
+    complete(taskSets(3), Seq((Success, 43)))
+    assert(results === Map(0 -> 42, 1 -> 43))
+    results.clear()
+    assertDataStructuresEmpty()
+
+    // Run another reduce job without a failure; this should just work
+    submit(reduceRdd, Array(0, 1))
+    complete(taskSets(4), Seq(
+      (Success, 44),
+      (Success, 45)))
+    assert(results === Map(0 -> 44, 1 -> 45))
+    results.clear()
+    assertDataStructuresEmpty()
+
+    // Resubmit the map stage; this should also just work
+    submitMapStage(shuffleDep)
+    assert(results.size === 1)
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  /**
+   * In this test, we have three RDDs with shuffle dependencies, and we submit 
map stage jobs
+   * that are waiting on each one, as well as a reduce job on the last one. We 
test that all of
+   * these jobs complete even if there are some fetch failures in both 
shuffles.
+   */
+  test("map stage submission with multiple shared stages and failures") {
+    val rdd1 = new MyRDD(sc, 2, Nil)
+    val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
+    val rdd2 = new MyRDD(sc, 2, List(dep1))
+    val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
+    val rdd3 = new MyRDD(sc, 2, List(dep2))
+
+    val listener1 = new SimpleListener
+    val listener2 = new SimpleListener
+    val listener3 = new SimpleListener
+
+    submitMapStage(dep1, listener1)
+    submitMapStage(dep2, listener2)
+    submit(rdd3, Array(0, 1), listener = listener3)
+
+    // Complete the first stage
+    assert(taskSets(0).stageId === 0)
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", rdd1.partitions.size)),
+      (Success, makeMapStatus("hostB", rdd1.partitions.size))))
+    assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
+      HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+    assert(listener1.results.size === 1)
+
+    // When attempting the second stage, show a fetch failure
+    assert(taskSets(1).stageId === 1)
+    complete(taskSets(1), Seq(
+      (Success, makeMapStatus("hostA", rdd2.partitions.size)),
+      (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, 
"ignored"), null)))
+    scheduler.resubmitFailedStages()
+    assert(listener2.results.size === 0)    // Second stage listener should 
not have a result yet
+
+    // Stage 0 should now be running as task set 2; make its task succeed
+    assert(taskSets(2).stageId === 0)
+    complete(taskSets(2), Seq(
+      (Success, makeMapStatus("hostC", rdd2.partitions.size))))
+    assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
+      HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+    assert(listener2.results.size === 0)    // Second stage listener should 
still not have a result
+
+    // Stage 1 should now be running as task set 3; make its first task succeed
+    assert(taskSets(3).stageId === 1)
+    complete(taskSets(3), Seq(
+      (Success, makeMapStatus("hostB", rdd2.partitions.size)),
+      (Success, makeMapStatus("hostD", rdd2.partitions.size))))
+    assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 
0).map(_._1).toSet ===
+      HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD")))
+    assert(listener2.results.size === 1)
+
+    // Finally, the reduce job should be running as task set 4; make it see a 
fetch failure,
+    // then make it run again and succeed
+    assert(taskSets(4).stageId === 2)
+    complete(taskSets(4), Seq(
+      (Success, 52),
+      (FetchFailed(makeBlockManagerId("hostD"), dep2.shuffleId, 0, 0, 
"ignored"), null)))
+    scheduler.resubmitFailedStages()
+
+    // TaskSet 5 will rerun stage 1's lost task, then TaskSet 6 will rerun 
stage 2
+    assert(taskSets(5).stageId === 1)
+    complete(taskSets(5), Seq(
+      (Success, makeMapStatus("hostE", rdd2.partitions.size))))
+    complete(taskSets(6), Seq(
+      (Success, 53)))
+    assert(listener3.results === Map(0 -> 52, 1 -> 53))
+    assertDataStructuresEmpty()
+  }
+
+  /**
+   * In this test, we run a map stage where one of the executors fails but we 
still receive a
+   * "zombie" complete message from that executor. We want to make sure the 
stage is not reported
+   * as done until all tasks have completed.
+   */
+  test("map stage submission with executor failure late map task completions") 
{
+    val shuffleMapRdd = new MyRDD(sc, 3, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
+
+    submitMapStage(shuffleDep)
+
+    val oldTaskSet = taskSets(0)
+    runEvent(CompletionEvent(oldTaskSet.tasks(0), Success, 
makeMapStatus("hostA", 2),
+      null, createFakeTaskInfo(), null))
+    assert(results.size === 0)    // Map stage job should not be complete yet
+
+    // Pretend host A was lost
+    val oldEpoch = mapOutputTracker.getEpoch
+    runEvent(ExecutorLost("exec-hostA"))
+    val newEpoch = mapOutputTracker.getEpoch
+    assert(newEpoch > oldEpoch)
+
+    // Suppose we also get a completed event from task 1 on the same host; 
this should be ignored
+    runEvent(CompletionEvent(oldTaskSet.tasks(1), Success, 
makeMapStatus("hostA", 2),
+      null, createFakeTaskInfo(), null))
+    assert(results.size === 0)    // Map stage job should not be complete yet
+
+    // A completion from another task should work because it's a non-failed 
host
+    runEvent(CompletionEvent(oldTaskSet.tasks(2), Success, 
makeMapStatus("hostB", 2),
+      null, createFakeTaskInfo(), null))
+    assert(results.size === 0)    // Map stage job should not be complete yet
+
+    // Now complete tasks in the second task set
+    val newTaskSet = taskSets(1)
+    assert(newTaskSet.tasks.size === 2)     // Both tasks 0 and 1 were on on 
hostA
+    runEvent(CompletionEvent(newTaskSet.tasks(0), Success, 
makeMapStatus("hostB", 2),
+      null, createFakeTaskInfo(), null))
+    assert(results.size === 0)    // Map stage job should not be complete yet
+    runEvent(CompletionEvent(newTaskSet.tasks(1), Success, 
makeMapStatus("hostB", 2),
+      null, createFakeTaskInfo(), null))
+    assert(results.size === 1)    // Map stage job should now finally be 
complete
+    assertDataStructuresEmpty()
+
+    // Also test that a reduce stage using this shuffled data can immediately 
run
+    val reduceRDD = new MyRDD(sc, 2, List(shuffleDep))
+    results.clear()
+    submit(reduceRDD, Array(0, 1))
+    complete(taskSets(2), Seq((Success, 42), (Success, 43)))
+    assert(results === Map(0 -> 42, 1 -> 43))
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its 
preferred locations.
    * Note that this checks only the host and not the executor ID.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to