Restore takePartition to PythonRDD, context.py This is to avoid removing functions in minor releases.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/38bf7860 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/38bf7860 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/38bf7860 Branch: refs/heads/branch-0.8 Commit: 38bf7860da3bf57288457510d15701bfaa1f7517 Parents: 691dfef Author: Shivaram Venkataraman <shiva...@eecs.berkeley.edu> Authored: Wed Jan 22 23:28:54 2014 -0800 Committer: Shivaram Venkataraman <shiva...@eecs.berkeley.edu> Committed: Wed Jan 22 23:28:54 2014 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 4 ++++ python/pyspark/context.py | 3 +++ 2 files changed, 7 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/38bf7860/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f7b38b4..12b4d94 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -283,6 +283,10 @@ private[spark] object PythonRDD { file.close() } + def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = { + implicit val cm : ClassManifest[T] = rdd.elementClassManifest + rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator + } } private object Pickle { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/38bf7860/python/pyspark/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 3d47589..a7ca8bc 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -43,6 +43,7 @@ class SparkContext(object): _gateway = None _jvm = None _writeIteratorToPickleFile = None + _takePartition = None _next_accum_id = 0 _active_spark_context = None _lock = Lock() @@ -126,6 +127,8 @@ class SparkContext(object): SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeIteratorToPickleFile = \ SparkContext._jvm.PythonRDD.writeIteratorToPickleFile + SparkContext._takePartition = \ + SparkContext._jvm.PythonRDD.takePartition if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: