- Created AsyncRDDActions.
- Make FutureJob a Scala Future instead of Java Future.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/802bfb87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/802bfb87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/802bfb87

Branch: refs/heads/master
Commit: 802bfb870d3d0ab3f17a9a9dc3148371d686cb13
Parents: e8e917f
Author: Reynold Xin <r...@apache.org>
Authored: Thu Oct 3 01:22:28 2013 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Thu Oct 3 01:22:28 2013 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/FutureJob.scala | 95 ++++++++++++++++----
 .../scala/org/apache/spark/SparkContext.scala   |  5 +-
 .../scala/org/apache/spark/TaskContext.scala    | 13 ++-
 .../org/apache/spark/executor/Executor.scala    |  2 +-
 .../org/apache/spark/rdd/AsyncRDDActions.scala  | 58 ++++++++++++
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 10 ---
 6 files changed, 151 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/FutureJob.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureJob.scala 
b/core/src/main/scala/org/apache/spark/FutureJob.scala
index ec3e0c3..e3adf89 100644
--- a/core/src/main/scala/org/apache/spark/FutureJob.scala
+++ b/core/src/main/scala/org/apache/spark/FutureJob.scala
@@ -17,34 +17,99 @@
 
 package org.apache.spark
 
-import java.util.concurrent.{ExecutionException, TimeUnit, Future}
+import java.util.concurrent.TimeoutException
+
+import scala.concurrent.{CanAwait, ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.Try
 
 import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}
 
 class FutureJob[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: () => T)
   extends Future[T] {
 
-  override def isDone: Boolean = jobWaiter.jobFinished
-
-  override def cancel(mayInterruptIfRunning: Boolean): Boolean = {
+  /**
+   * Cancels the execution of this job.
+   */
+  def cancel() {
     jobWaiter.kill()
-    true
   }
 
-  override def isCancelled: Boolean = {
-    throw new UnsupportedOperationException
+  /**
+   * Blocks until this job completes.
+   * @param atMost maximum wait time, which may be negative (no waiting is 
done), Duration.Inf
+   *               for unbounded waiting, or a finite positive duration
+   * @return this FutureJob
+   */
+  override def ready(atMost: Duration)(implicit permit: CanAwait): 
FutureJob.this.type = {
+    if (atMost.isFinite()) {
+      awaitResult()
+    } else {
+      val finishTime = System.currentTimeMillis() + atMost.toMillis
+      while (!isCompleted) {
+        val time = System.currentTimeMillis()
+        if (time >= finishTime) {
+          throw new TimeoutException
+        } else {
+          jobWaiter.wait(finishTime - time)
+        }
+      }
+    }
+    this
   }
 
-  override def get(): T = {
-    jobWaiter.awaitResult() match {
-      case JobSucceeded =>
-        resultFunc()
-      case JobFailed(e: Exception, _) =>
-        throw new ExecutionException(e)
+  /**
+   * Await and return the result (of type T) of this job.
+   * @param atMost maximum wait time, which may be negative (no waiting is 
done), Duration.Inf
+   *               for unbounded waiting, or a finite positive duration
+   * @throws Exception exception during job execution
+   * @return the result value if the job is completed within the specific 
maximum wait time
+   */
+  @throws(classOf[Exception])
+  override def result(atMost: Duration)(implicit permit: CanAwait): T = {
+    ready(atMost)(permit)
+    awaitResult() match {
+      case scala.util.Success(res) => res
+      case scala.util.Failure(e) => throw e
+    }
+  }
+
+  /**
+   * When this job is completed, either through an exception, or a value, 
apply the provided
+   * function.
+   */
+  def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) {
+    executor.execute(new Runnable {
+      override def run() {
+        func(awaitResult())
+      }
+    })
+  }
+
+  /**
+   * Returns whether the job has already been completed with a value or an 
exception.
+   */
+  def isCompleted: Boolean = jobWaiter.jobFinished
+
+  /**
+   * The value of this Future.
+   *
+   * If the future is not completed the returned value will be None. If the 
future is completed
+   * the value will be Some(Success(t)) if it contains a valid result, or 
Some(Failure(error)) if
+   * it contains an exception.
+   */
+  def value: Option[Try[T]] = {
+    if (jobWaiter.jobFinished) {
+      Some(awaitResult())
+    } else {
+      None
     }
   }
 
-  override def get(timeout: Long, unit: TimeUnit): T = {
-    throw new UnsupportedOperationException
+  private def awaitResult(): Try[T] = {
+    jobWaiter.awaitResult() match {
+      case JobSucceeded => scala.util.Success(resultFunc())
+      case JobFailed(e: Exception, _) => scala.util.Failure(e)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 23d9ca3..3012453 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -20,7 +20,6 @@ package org.apache.spark
 import java.io._
 import java.net.URI
 import java.util.Properties
-import java.util.concurrent.Future
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.Map
@@ -823,7 +822,7 @@ class SparkContext(
       processPartition: Iterator[T] => U,
       partitions: Seq[Int],
       partitionResultHandler: (Int, U) => Unit,
-      resultFunc: () => R): Future[R] =
+      resultFunc: () => R): FutureJob[R] =
   {
     val callSite = Utils.formatSparkCallSite
     val waiter = dagScheduler.submitJob(
@@ -933,6 +932,8 @@ object SparkContext {
   implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: 
RDD[(K, V)]) =
     new PairRDDFunctions(rdd)
 
+  implicit def rddToAsyncRDDActions[T: ClassManifest](rdd: RDD[T]) = new 
AsyncRDDActions(rdd)
+
   implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V 
<% Writable: ClassManifest](
       rdd: RDD[(K, V)]) =
     new SequenceFileRDDFunctions(rdd)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 0b1542c..86370d5 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark
 
-import executor.TaskMetrics
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.executor.TaskMetrics
+
 class TaskContext(
   val stageId: Int,
   val splitId: Int,
@@ -29,10 +30,14 @@ class TaskContext(
   val taskMetrics: TaskMetrics = TaskMetrics.empty()
 ) extends Serializable {
 
-  @transient val onCompleteCallbacks = new ArrayBuffer[() => Unit]
+  // List of callback functions to execute when the task completes.
+  @transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit]
 
-  // Add a callback function to be executed on task completion. An example use
-  // is for HadoopRDD to register a callback to close the input stream.
+  /**
+   * Add a callback function to be executed on task completion. An example use
+   * is for HadoopRDD to register a callback to close the input stream.
+   * @param f Callback function.
+   */
   def addOnCompleteCallback(f: () => Unit) {
     onCompleteCallbacks += f
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 6c5606f..1907594 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -198,7 +198,7 @@ private[spark] class Executor(
         }
 
         attemptedTask = Some(task)
-        logDebug("Its epoch is " + task.epoch)
+        logDebug("Task " + taskId +"'s epoch is " + task.epoch)
         env.mapOutputTracker.updateEpoch(task.epoch)
 
         // Run the actual task and measure its runtime.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala 
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
new file mode 100644
index 0000000..7614b98
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.FutureJob
+
+/**
+ * A set of asynchronous RDD actions available through an implicit conversion.
+ * Import `org.apache.spark.SparkContext._` at the top of your program to use 
these functions.
+ */
+class AsyncRDDActions[T: ClassManifest](self: RDD[T]) {
+
+  private val allPartitions = Range(0, self.partitions.size)
+
+  /**
+   * Return a future for retrieving all elements of this RDD.
+   */
+  def collectAsync(): FutureJob[Seq[T]] = {
+    val results = new ArrayBuffer[T]
+    self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, allPartitions,
+      (index, data) => results ++= data, () => results)
+  }
+
+  /**
+   * Applies a function f to all elements of this RDD.
+   */
+  def foreachAsync(f: T => Unit): FutureJob[Unit] = {
+    val cleanF = self.context.clean(f)
+    self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), 
allPartitions,
+      (index, data) => Unit, () => Unit)
+  }
+
+  /**
+   * Applies a function f to each partition of this RDD.
+   */
+  def foreachPartitionAsync(f: Iterator[T] => Unit): FutureJob[Unit] = {
+    val cleanF = self.context.clean(f)
+    self.context.submitJob[T, Unit, Unit](self, cleanF, allPartitions, (index, 
data) => Unit,
+      () => Unit)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/802bfb87/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 5e417cb..ce6485f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.rdd
 
 import java.util.Random
-import java.util.concurrent.Future
 
 import scala.collection.Map
 import scala.collection.JavaConversions.mapAsScalaMap
@@ -563,15 +562,6 @@ abstract class RDD[T: ClassManifest](
   }
 
   /**
-   * Return a future for retrieving the results of a collect in an 
asynchronous fashion.
-   */
-  def collectAsync(): Future[Seq[T]] = {
-    val results = new ArrayBuffer[T]
-    sc.submitJob[T, Array[T], Seq[T]](
-      this, _.toArray, Range(0, partitions.size), (index, data) => results ++= 
data, () => results)
-  }
-
-  /**
    * Return an array that contains all of the elements in this RDD.
    */
   def toArray(): Array[T] = collect()

Reply via email to