Added mapPartitions method to JavaRDD.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/dbadc6b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/dbadc6b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/dbadc6b9 Branch: refs/heads/master Commit: dbadc6b994ff54f86b726c71fa08837a6b1e7238 Parents: aae8a01 Author: eklavya <sr.ekla...@gmail.com> Authored: Mon Jan 13 17:56:10 2014 +0530 Committer: eklavya <sr.ekla...@gmail.com> Committed: Mon Jan 13 17:56:10 2014 +0530 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/api/java/JavaRDD.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbadc6b9/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 6c91eda..568ae15 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -21,8 +21,10 @@ import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap} import org.apache.spark.storage.StorageLevel +import java.util.{Iterator => JIterator} +import scala.collection.JavaConversions._ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends JavaRDDLike[T, JavaRDD[T]] { @@ -138,6 +140,15 @@ JavaRDDLike[T, JavaRDD[T]] { def setGenerator(_generator: String) = { rdd.generator = _generator } + + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ + def mapPartitions[U: ClassTag]( + f: JFMap[JIterator[T], U], preservesPartitioning: Boolean = false): JavaRDD[U] = { + rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning) + } + } object JavaRDD {