- Created AsyncRDDActions. - Make FutureJob a Scala Future instead of Java Future.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/802bfb87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/802bfb87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/802bfb87 Branch: refs/heads/master Commit: 802bfb870d3d0ab3f17a9a9dc3148371d686cb13 Parents: e8e917f Author: Reynold Xin <r...@apache.org> Authored: Thu Oct 3 01:22:28 2013 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Thu Oct 3 01:22:28 2013 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/FutureJob.scala | 95 ++++++++++++++++---- .../scala/org/apache/spark/SparkContext.scala | 5 +- .../scala/org/apache/spark/TaskContext.scala | 13 ++- .../org/apache/spark/executor/Executor.scala | 2 +- .../org/apache/spark/rdd/AsyncRDDActions.scala | 58 ++++++++++++ .../main/scala/org/apache/spark/rdd/RDD.scala | 10 --- 6 files changed, 151 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/FutureJob.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/FutureJob.scala b/core/src/main/scala/org/apache/spark/FutureJob.scala index ec3e0c3..e3adf89 100644 --- a/core/src/main/scala/org/apache/spark/FutureJob.scala +++ b/core/src/main/scala/org/apache/spark/FutureJob.scala @@ -17,34 +17,99 @@ package org.apache.spark -import java.util.concurrent.{ExecutionException, TimeUnit, Future} +import java.util.concurrent.TimeoutException + +import scala.concurrent.{CanAwait, ExecutionContext, Future} +import scala.concurrent.duration.Duration +import scala.util.Try import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter} class FutureJob[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: () => T) extends Future[T] { - override def isDone: Boolean = jobWaiter.jobFinished - - override def cancel(mayInterruptIfRunning: Boolean): Boolean = { + /** + * Cancels the execution of this job. + */ + def cancel() { jobWaiter.kill() - true } - override def isCancelled: Boolean = { - throw new UnsupportedOperationException + /** + * Blocks until this job completes. + * @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf + * for unbounded waiting, or a finite positive duration + * @return this FutureJob + */ + override def ready(atMost: Duration)(implicit permit: CanAwait): FutureJob.this.type = { + if (atMost.isFinite()) { + awaitResult() + } else { + val finishTime = System.currentTimeMillis() + atMost.toMillis + while (!isCompleted) { + val time = System.currentTimeMillis() + if (time >= finishTime) { + throw new TimeoutException + } else { + jobWaiter.wait(finishTime - time) + } + } + } + this } - override def get(): T = { - jobWaiter.awaitResult() match { - case JobSucceeded => - resultFunc() - case JobFailed(e: Exception, _) => - throw new ExecutionException(e) + /** + * Await and return the result (of type T) of this job. + * @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf + * for unbounded waiting, or a finite positive duration + * @throws Exception exception during job execution + * @return the result value if the job is completed within the specific maximum wait time + */ + @throws(classOf[Exception]) + override def result(atMost: Duration)(implicit permit: CanAwait): T = { + ready(atMost)(permit) + awaitResult() match { + case scala.util.Success(res) => res + case scala.util.Failure(e) => throw e + } + } + + /** + * When this job is completed, either through an exception, or a value, apply the provided + * function. + */ + def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) { + executor.execute(new Runnable { + override def run() { + func(awaitResult()) + } + }) + } + + /** + * Returns whether the job has already been completed with a value or an exception. + */ + def isCompleted: Boolean = jobWaiter.jobFinished + + /** + * The value of this Future. + * + * If the future is not completed the returned value will be None. If the future is completed + * the value will be Some(Success(t)) if it contains a valid result, or Some(Failure(error)) if + * it contains an exception. + */ + def value: Option[Try[T]] = { + if (jobWaiter.jobFinished) { + Some(awaitResult()) + } else { + None } } - override def get(timeout: Long, unit: TimeUnit): T = { - throw new UnsupportedOperationException + private def awaitResult(): Try[T] = { + jobWaiter.awaitResult() match { + case JobSucceeded => scala.util.Success(resultFunc()) + case JobFailed(e: Exception, _) => scala.util.Failure(e) + } } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 23d9ca3..3012453 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -20,7 +20,6 @@ package org.apache.spark import java.io._ import java.net.URI import java.util.Properties -import java.util.concurrent.Future import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map @@ -823,7 +822,7 @@ class SparkContext( processPartition: Iterator[T] => U, partitions: Seq[Int], partitionResultHandler: (Int, U) => Unit, - resultFunc: () => R): Future[R] = + resultFunc: () => R): FutureJob[R] = { val callSite = Utils.formatSparkCallSite val waiter = dagScheduler.submitJob( @@ -933,6 +932,8 @@ object SparkContext { implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = new PairRDDFunctions(rdd) + implicit def rddToAsyncRDDActions[T: ClassManifest](rdd: RDD[T]) = new AsyncRDDActions(rdd) + implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest]( rdd: RDD[(K, V)]) = new SequenceFileRDDFunctions(rdd) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/TaskContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 0b1542c..86370d5 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -17,9 +17,10 @@ package org.apache.spark -import executor.TaskMetrics import scala.collection.mutable.ArrayBuffer +import org.apache.spark.executor.TaskMetrics + class TaskContext( val stageId: Int, val splitId: Int, @@ -29,10 +30,14 @@ class TaskContext( val taskMetrics: TaskMetrics = TaskMetrics.empty() ) extends Serializable { - @transient val onCompleteCallbacks = new ArrayBuffer[() => Unit] + // List of callback functions to execute when the task completes. + @transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit] - // Add a callback function to be executed on task completion. An example use - // is for HadoopRDD to register a callback to close the input stream. + /** + * Add a callback function to be executed on task completion. An example use + * is for HadoopRDD to register a callback to close the input stream. + * @param f Callback function. + */ def addOnCompleteCallback(f: () => Unit) { onCompleteCallbacks += f } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 6c5606f..1907594 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -198,7 +198,7 @@ private[spark] class Executor( } attemptedTask = Some(task) - logDebug("Its epoch is " + task.epoch) + logDebug("Task " + taskId +"'s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) // Run the actual task and measure its runtime. http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala new file mode 100644 index 0000000..7614b98 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.FutureJob + +/** + * A set of asynchronous RDD actions available through an implicit conversion. + * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions. + */ +class AsyncRDDActions[T: ClassManifest](self: RDD[T]) { + + private val allPartitions = Range(0, self.partitions.size) + + /** + * Return a future for retrieving all elements of this RDD. + */ + def collectAsync(): FutureJob[Seq[T]] = { + val results = new ArrayBuffer[T] + self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, allPartitions, + (index, data) => results ++= data, () => results) + } + + /** + * Applies a function f to all elements of this RDD. + */ + def foreachAsync(f: T => Unit): FutureJob[Unit] = { + val cleanF = self.context.clean(f) + self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), allPartitions, + (index, data) => Unit, () => Unit) + } + + /** + * Applies a function f to each partition of this RDD. + */ + def foreachPartitionAsync(f: Iterator[T] => Unit): FutureJob[Unit] = { + val cleanF = self.context.clean(f) + self.context.submitJob[T, Unit, Unit](self, cleanF, allPartitions, (index, data) => Unit, + () => Unit) + } +} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 5e417cb..ce6485f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -18,7 +18,6 @@ package org.apache.spark.rdd import java.util.Random -import java.util.concurrent.Future import scala.collection.Map import scala.collection.JavaConversions.mapAsScalaMap @@ -563,15 +562,6 @@ abstract class RDD[T: ClassManifest]( } /** - * Return a future for retrieving the results of a collect in an asynchronous fashion. - */ - def collectAsync(): Future[Seq[T]] = { - val results = new ArrayBuffer[T] - sc.submitJob[T, Array[T], Seq[T]]( - this, _.toArray, Range(0, partitions.size), (index, data) => results ++= data, () => results) - } - - /** * Return an array that contains all of the elements in this RDD. */ def toArray(): Array[T] = collect()