Repository: spark Updated Branches: refs/heads/master 44d3a6a75 -> fbf2678c1
SPARK-2636: Expose job ID in JobWaiter API This PR adds the async actions to the Java API. User can call these async actions to get the FutureAction and use JobWaiter (for SimpleFutureAction) to retrieve job Id. Author: lirui <rui...@intel.com> Closes #2176 from lirui-intel/SPARK-2636 and squashes the following commits: ccaafb7 [lirui] SPARK-2636: fix java doc 5536d55 [lirui] SPARK-2636: mark the async API as experimental e2e01d5 [lirui] SPARK-2636: add mima exclude 0ca320d [lirui] SPARK-2636: fix method name & javadoc 3fa39f7 [lirui] SPARK-2636: refine the patch af4f5d9 [lirui] SPARK-2636: remove unused imports 843276c [lirui] SPARK-2636: only keep foreachAsync in the java API fbf5744 [lirui] SPARK-2636: add more async actions for java api 1b25abc [lirui] SPARK-2636: expose some fields in JobWaiter d09f732 [lirui] SPARK-2636: fix build eb1ee79 [lirui] SPARK-2636: change some parameters in SimpleFutureAction to member field 6e2b87b [lirui] SPARK-2636: add java API for async actions Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbf2678c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbf2678c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbf2678c Branch: refs/heads/master Commit: fbf2678c16acc0071ebd1cbdd165702635be5f0c Parents: 44d3a6a Author: lirui <rui...@intel.com> Authored: Mon Sep 1 23:28:19 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Mon Sep 1 23:28:19 2014 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/FutureAction.scala | 3 +++ .../org/apache/spark/api/java/JavaRDDLike.scala | 15 ++++++++++++++- .../scala/org/apache/spark/rdd/AsyncRDDActions.scala | 3 ++- .../scala/org/apache/spark/scheduler/JobWaiter.scala | 2 +- project/MimaExcludes.scala | 3 +++ 5 files changed, 23 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fbf2678c/core/src/main/scala/org/apache/spark/FutureAction.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 1e4dec8..75ea535 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -149,6 +149,9 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: case JobFailed(e: Exception) => scala.util.Failure(e) } } + + /** Get the corresponding job id for this action. */ + def jobId = jobWaiter.jobId } http://git-wip-us.apache.org/repos/asf/spark/blob/fbf2678c/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index f917cfd..545bc0e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext} import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag @@ -574,4 +574,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name + /** + * :: Experimental :: + * The asynchronous version of the foreach action. + * + * @param f the function to apply to all the elements of the RDD + * @return a FutureAction for the action + */ + @Experimental + def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = { + import org.apache.spark.SparkContext._ + rdd.foreachAsync(x => f.call(x)) + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/fbf2678c/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index aed951a..b62f3fb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -112,7 +112,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi * Applies a function f to all elements of this RDD. */ def foreachAsync(f: T => Unit): FutureAction[Unit] = { - self.context.submitJob[T, Unit, Unit](self, _.foreach(f), Range(0, self.partitions.size), + val cleanF = self.context.clean(f) + self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size), (index, data) => Unit, Unit) } http://git-wip-us.apache.org/repos/asf/spark/blob/fbf2678c/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index e9bfee2..29879b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -23,7 +23,7 @@ package org.apache.spark.scheduler */ private[spark] class JobWaiter[T]( dagScheduler: DAGScheduler, - jobId: Int, + val jobId: Int, totalTasks: Int, resultHandler: (Int, T) => Unit) extends JobListener { http://git-wip-us.apache.org/repos/asf/spark/blob/fbf2678c/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index fe8ffe6..a2f1b35 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -41,6 +41,9 @@ object MimaExcludes { Seq( // Adding new method to JavaRDLike trait - we should probably mark this as a developer API. ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"), + // Should probably mark this as Experimental + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.foreachAsync"), // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values // for countApproxDistinct* functions, which does not work in Java. We later removed // them, and use the following to tell Mima to not care about them. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org