Repository: spark
Updated Branches:
  refs/heads/master f4073020a -> 6adf67dd1


[SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs

## What changes were proposed in this pull request?
(edited)
Fixes a bug introduced in #16121

In PairDeserializer convert each batch of keys and values to lists (if they do 
not have `__len__` already) so that we can check that they are the same size. 
Normally they already are lists so this should not have a performance impact, 
but this is needed when repeated `zip`'s are done.

## How was this patch tested?

Additional unit test

Author: Andrew Ray <ray.and...@gmail.com>

Closes #19226 from aray/SPARK-21985.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6adf67dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6adf67dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6adf67dd

Branch: refs/heads/master
Commit: 6adf67dd14b0ece342bb91adf800df0a7101e038
Parents: f407302
Author: Andrew Ray <ray.and...@gmail.com>
Authored: Mon Sep 18 02:46:27 2017 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Mon Sep 18 02:46:27 2017 +0900

----------------------------------------------------------------------
 python/pyspark/serializers.py |  6 +++++-
 python/pyspark/tests.py       | 12 ++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6adf67dd/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index d5c2a75..660b19a 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -97,7 +97,7 @@ class Serializer(object):
 
     def _load_stream_without_unbatching(self, stream):
         """
-        Return an iterator of deserialized batches (lists) of objects from the 
input stream.
+        Return an iterator of deserialized batches (iterable) of objects from 
the input stream.
         if the serializer does not operate on batches the default 
implementation returns an
         iterator of single element lists.
         """
@@ -343,6 +343,10 @@ class PairDeserializer(Serializer):
         key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
         val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
         for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
+            # For double-zipped RDDs, the batches can be iterators from other 
PairDeserializer,
+            # instead of lists. We need to convert them to lists if needed.
+            key_batch = key_batch if hasattr(key_batch, '__len__') else 
list(key_batch)
+            val_batch = val_batch if hasattr(val_batch, '__len__') else 
list(val_batch)
             if len(key_batch) != len(val_batch):
                 raise ValueError("Can not deserialize PairRDD with different 
number of items"
                                  " in batches: (%d, %d)" % (len(key_batch), 
len(val_batch)))

http://git-wip-us.apache.org/repos/asf/spark/blob/6adf67dd/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 000dd1e..3c108ec 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -644,6 +644,18 @@ class RDDTests(ReusedPySparkTestCase):
             set([(x, (y, y)) for x in range(10) for y in range(10)])
         )
 
+    def test_zip_chaining(self):
+        # Tests for SPARK-21985
+        rdd = self.sc.parallelize('abc', 2)
+        self.assertSetEqual(
+            set(rdd.zip(rdd).zip(rdd).collect()),
+            set([((x, x), x) for x in 'abc'])
+        )
+        self.assertSetEqual(
+            set(rdd.zip(rdd.zip(rdd)).collect()),
+            set([(x, (x, x)) for x in 'abc'])
+        )
+
     def test_deleting_input_files(self):
         # Regression test for SPARK-1025
         tempFile = tempfile.NamedTemporaryFile(delete=False)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to