Repository: spark Updated Branches: refs/heads/branch-1.0 8285993a9 -> b1a7e99fe
[SPARK-2079] Support batching when serializing SchemaRDD to Python Added batching with default batch size 10 in SchemaRDD.javaToPython Author: Kan Zhang <[email protected]> Closes #1023 from kanzhang/SPARK-2079 and squashes the following commits: 2d1915e [Kan Zhang] [SPARK-2079] Add batching in SchemaRDD.javaToPython 19b0c09 [Kan Zhang] [SPARK-2079] Removing unnecessary wrapping in SchemaRDD.javaToPython (cherry picked from commit 2550533a28382664f8fd294b2caa494d12bfc7c1) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1a7e99f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1a7e99f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1a7e99f Branch: refs/heads/branch-1.0 Commit: b1a7e99fe1fa89afb0e83c46a388b009037ec37d Parents: 8285993 Author: Kan Zhang <[email protected]> Authored: Sat Jun 14 13:17:22 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Sat Jun 14 13:17:29 2014 -0700 ---------------------------------------------------------------------- python/pyspark/sql.py | 4 +++- .../src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 9 ++------- 2 files changed, 5 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b1a7e99f/python/pyspark/sql.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 960d0a8..e344610 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -16,6 +16,7 @@ # from pyspark.rdd import RDD +from pyspark.serializers import BatchedSerializer, PickleSerializer from py4j.protocol import Py4JError @@ -346,7 +347,8 @@ class SchemaRDD(RDD): # TODO: This is inefficient, we should construct the Python Row object # in Java land in the javaToPython function. May require a custom # pickle serializer in Pyrolite - return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d)) + return RDD(jrdd, self._sc, BatchedSerializer( + PickleSerializer())).map(lambda d: Row(d)) # We override the default cache/persist/checkpoint behavior as we want to cache the underlying # SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class http://git-wip-us.apache.org/repos/asf/spark/blob/b1a7e99f/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 821ac85..89eaba2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -347,16 +347,11 @@ class SchemaRDD( val pickle = new Pickler iter.map { row => val map: JMap[String, Any] = new java.util.HashMap - // TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict]. - // Ideally we should be able to pickle an object directly into a Python collection so we - // don't have to create an ArrayList every time. - val arr: java.util.ArrayList[Any] = new java.util.ArrayList row.zip(fieldNames).foreach { case (obj, name) => map.put(name, obj) } - arr.add(map) - pickle.dumps(arr) - } + map + }.grouped(10).map(batched => pickle.dumps(batched.toArray)) } }
