Allow SparkContext.submitJob to submit a job for only a subset of the partitions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bf515688 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bf515688 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bf515688 Branch: refs/heads/master Commit: bf515688e78b75d94163efccdc5cd401dba5059b Parents: 37d8f37 Author: Reynold Xin <reyno...@gmail.com> Authored: Wed Sep 18 04:16:18 2013 -0700 Committer: Reynold Xin <reyno...@gmail.com> Committed: Wed Sep 18 04:16:18 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf515688/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ceb898d..e96fa52 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -816,6 +816,7 @@ class SparkContext( def submitJob[T, U, R]( rdd: RDD[T], processPartition: Iterator[T] => U, + partitions: Seq[Int], partitionResultHandler: (Int, U) => Unit, resultFunc: () => R): Future[R] = { @@ -823,7 +824,7 @@ class SparkContext( val waiter = dagScheduler.submitJob( rdd, (context: TaskContext, iter: Iterator[T]) => processPartition(iter), - 0 until rdd.partitions.size, + partitions, callSite, allowLocal = false, partitionResultHandler, http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf515688/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 7cba393..72a5a20 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -568,7 +568,7 @@ abstract class RDD[T: ClassManifest]( def collectAsync(): Future[Seq[T]] = { val results = new ArrayBuffer[T] sc.submitJob[T, Array[T], Seq[T]]( - this, _.toArray, (index, data) => results ++= data, () => results) + this, _.toArray, Range(0, partitions.size), (index, data) => results ++= data, () => results) } /** http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf515688/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index efe258a..2654ee7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -42,11 +42,11 @@ import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap} * locations to run each task on, based on the current cache status, and passes these to the * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are - * not caused by shuffie file loss are handled by the TaskScheduler, which will retry each task + * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task * a small number of times before cancelling the whole stage. * * THREADING: This class runs all its logic in a single thread executing the run() method, to which - * events are submitted using a synchonized queue (eventQueue). The public API methods, such as + * events are submitted using a synchronized queue (eventQueue). The public API methods, such as * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods * should be private. */