Allow SparkContext.submitJob to submit a job for only a subset of the 
partitions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bf515688
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bf515688
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bf515688

Branch: refs/heads/master
Commit: bf515688e78b75d94163efccdc5cd401dba5059b
Parents: 37d8f37
Author: Reynold Xin <reyno...@gmail.com>
Authored: Wed Sep 18 04:16:18 2013 -0700
Committer: Reynold Xin <reyno...@gmail.com>
Committed: Wed Sep 18 04:16:18 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkContext.scala          | 3 ++-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala               | 2 +-
 .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++--
 3 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf515688/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ceb898d..e96fa52 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -816,6 +816,7 @@ class SparkContext(
   def submitJob[T, U, R](
       rdd: RDD[T],
       processPartition: Iterator[T] => U,
+      partitions: Seq[Int],
       partitionResultHandler: (Int, U) => Unit,
       resultFunc: () => R): Future[R] =
   {
@@ -823,7 +824,7 @@ class SparkContext(
     val waiter = dagScheduler.submitJob(
       rdd,
       (context: TaskContext, iter: Iterator[T]) => processPartition(iter),
-      0 until rdd.partitions.size,
+      partitions,
       callSite,
       allowLocal = false,
       partitionResultHandler,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf515688/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 7cba393..72a5a20 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -568,7 +568,7 @@ abstract class RDD[T: ClassManifest](
   def collectAsync(): Future[Seq[T]] = {
     val results = new ArrayBuffer[T]
     sc.submitJob[T, Array[T], Seq[T]](
-      this, _.toArray, (index, data) => results ++= data, () => results)
+      this, _.toArray, Range(0, partitions.size), (index, data) => results ++= 
data, () => results)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf515688/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index efe258a..2654ee7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -42,11 +42,11 @@ import org.apache.spark.util.{MetadataCleaner, 
TimeStampedHashMap}
  * locations to run each task on, based on the current cache status, and 
passes these to the
  * low-level TaskScheduler. Furthermore, it handles failures due to shuffle 
output files being
  * lost, in which case old stages may need to be resubmitted. Failures 
*within* a stage that are
- * not caused by shuffie file loss are handled by the TaskScheduler, which 
will retry each task
+ * not caused by shuffle file loss are handled by the TaskScheduler, which 
will retry each task
  * a small number of times before cancelling the whole stage.
  *
  * THREADING: This class runs all its logic in a single thread executing the 
run() method, to which
- * events are submitted using a synchonized queue (eventQueue). The public API 
methods, such as
+ * events are submitted using a synchronized queue (eventQueue). The public 
API methods, such as
  * runJob, taskEnded and executorLost, post events asynchronously to this 
queue. All other methods
  * should be private.
  */

Reply via email to