[ 
https://issues.apache.org/jira/browse/SPARK-19872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15901997#comment-15901997
 ] 

Brian Bruggeman commented on SPARK-19872:
-----------------------------------------

I reverted `rdd.py` and `serializers.py` to the 2.0.2 branch in github and the 
code above works without an error.

rdd.py link: 
https://github.com/apache/spark/blob/branch-2.0/python/pyspark/rdd.py
serializers.py link: 
https://github.com/apache/spark/blob/branch-2.0/python/pyspark/serializers.py

rdd diff:
{code}
--- tmp2.py 2017-03-08 15:17:45.000000000 -0600
+++ saved2.py   2017-03-08 15:17:59.000000000 -0600
@@ -52,8 +52,6 @@
     get_used_memory, ExternalSorter, ExternalGroupBy
 from pyspark.traceback_utils import SCCallSiteSync

-from py4j.java_collections import ListConverter, MapConverter
-

 __all__ = ["RDD"]

@@ -137,11 +135,12 @@
         break
     if not sock:
         raise Exception("could not open socket")
-    # The RDD materialization time is unpredicable, if we set a timeout for 
socket reading
-    # operation, it will very possibly fail. See SPARK-18281.
-    sock.settimeout(None)
-    # The socket will be automatically closed when garbage-collected.
-    return serializer.load_stream(sock.makefile("rb", 65536))
+    try:
+        rf = sock.makefile("rb", 65536)
+        for item in serializer.load_stream(rf):
+            yield item
+    finally:
+        sock.close()


 def ignore_unicode_prefix(f):
@@ -264,13 +263,44 @@

     def isCheckpointed(self):
         """
-        Return whether this RDD has been checkpointed or not
+        Return whether this RDD is checkpointed and materialized, either 
reliably or locally.
         """
         return self._jrdd.rdd().isCheckpointed()

+    def localCheckpoint(self):
+        """
+        Mark this RDD for local checkpointing using Spark's existing caching 
layer.
+
+        This method is for users who wish to truncate RDD lineages while 
skipping the expensive
+        step of replicating the materialized data in a reliable distributed 
file system. This is
+        useful for RDDs with long lineages that need to be truncated 
periodically (e.g. GraphX).
+
+        Local checkpointing sacrifices fault-tolerance for performance. In 
particular, checkpointed
+        data is written to ephemeral local storage in the executors instead of 
to a reliable,
+        fault-tolerant storage. The effect is that if an executor fails during 
the computation,
+        the checkpointed data may no longer be accessible, causing an 
irrecoverable job failure.
+
+        This is NOT safe to use with dynamic allocation, which removes 
executors along
+        with their cached blocks. If you must use both features, you are 
advised to set
+        L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value.
+
+        The checkpoint directory set through 
L{SparkContext.setCheckpointDir()} is not used.
+        """
+        self._jrdd.rdd().localCheckpoint()
+
+    def isLocallyCheckpointed(self):
+        """
+        Return whether this RDD is marked for local checkpointing.
+
+        Exposed for testing.
+        """
+        return self._jrdd.rdd().isLocallyCheckpointed()
+
     def getCheckpointFile(self):
         """
         Gets the name of the file to which this RDD was checkpointed
+
+        Not defined if RDD is checkpointed locally.
         """
         checkpointFile = self._jrdd.rdd().getCheckpointFile()
         if checkpointFile.isDefined():
@@ -387,6 +417,9 @@
             with replacement: expected number of times each element is chosen; 
fraction must be >= 0
         :param seed: seed for the random number generator

+        .. note:: This is not guaranteed to provide exactly the fraction 
specified of the total
+            count of the given :class:`DataFrame`.
+
         >>> rdd = sc.parallelize(range(100), 4)
         >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14
         True
@@ -425,8 +458,8 @@
         """
         Return a fixed-size sampled subset of this RDD.

-        Note that this method should only be used if the resulting array is 
expected
-        to be small, as all the data is loaded into the driver's memory.
+        .. note:: This method should only be used if the resulting array is 
expected
+            to be small, as all the data is loaded into the driver's memory.

         >>> rdd = sc.parallelize(range(0, 10))
         >>> len(rdd.takeSample(True, 20, 1))
@@ -537,7 +570,7 @@
         Return the intersection of this RDD and another one. The output will
         not contain any duplicate elements, even if the input RDDs did.

-        Note that this method performs a shuffle internally.
+        .. note:: This method performs a shuffle internally.

         >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
         >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
@@ -768,8 +801,9 @@
     def collect(self):
         """
         Return a list that contains all of the elements in this RDD.
-        Note that this method should only be used if the resulting array is 
expected
-        to be small, as all the data is loaded into the driver's memory.
+
+        .. note:: This method should only be used if the resulting array is 
expected
+            to be small, as all the data is loaded into the driver's memory.
         """
         with SCCallSiteSync(self.context) as css:
             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
@@ -1214,12 +1248,12 @@

     def top(self, num, key=None):
         """
-        Get the top N elements from a RDD.
+        Get the top N elements from an RDD.

-        Note that this method should only be used if the resulting array is 
expected
-        to be small, as all the data is loaded into the driver's memory.
+        .. note:: This method should only be used if the resulting array is 
expected
+            to be small, as all the data is loaded into the driver's memory.

-        Note: It returns the list sorted in descending order.
+        .. note:: It returns the list sorted in descending order.

         >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
         [12]
@@ -1238,11 +1272,11 @@

     def takeOrdered(self, num, key=None):
         """
-        Get the N elements from a RDD ordered in ascending order or as
+        Get the N elements from an RDD ordered in ascending order or as
         specified by the optional key function.

-        Note that this method should only be used if the resulting array is 
expected
-        to be small, as all the data is loaded into the driver's memory.
+        .. note:: this method should only be used if the resulting array is 
expected
+            to be small, as all the data is loaded into the driver's memory.

         >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
         [1, 2, 3, 4, 5, 6]
@@ -1263,11 +1297,11 @@
         that partition to estimate the number of additional partitions needed
         to satisfy the limit.

-        Note that this method should only be used if the resulting array is 
expected
-        to be small, as all the data is loaded into the driver's memory.
-
         Translated from the Scala implementation in RDD#take().

+        .. note:: this method should only be used if the resulting array is 
expected
+            to be small, as all the data is loaded into the driver's memory.
+
         >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
         [2, 3]
         >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
@@ -1331,8 +1365,9 @@

     def isEmpty(self):
         """
-        Returns true if and only if the RDD contains no elements at all. Note 
that an RDD
-        may be empty even when it has at least 1 partition.
+        Returns true if and only if the RDD contains no elements at all.
+
+        .. note:: an RDD may be empty even when it has at least 1 partition.

         >>> sc.parallelize([]).isEmpty()
         True
@@ -1523,8 +1558,8 @@
         """
         Return the key-value pairs in this RDD to the master as a dictionary.

-        Note that this method should only be used if the resulting data is 
expected
-        to be small, as all the data is loaded into the driver's memory.
+        .. note:: this method should only be used if the resulting data is 
expected
+            to be small, as all the data is loaded into the driver's memory.

         >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
         >>> m[1]
@@ -1761,8 +1796,7 @@
         set of aggregation functions.

         Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
-        type" C.  Note that V and C can be different -- for example, one might
-        group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
+        type" C.

         Users provide three functions:

@@ -1774,6 +1808,9 @@

         In addition, users can control the partitioning of the output RDD.

+        .. note:: V and C can be different -- for example, one might group an 
RDD of type
+            (Int, Int) into an RDD of type (Int, List[Int]).
+
         >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
         >>> def add(a, b): return a + str(b)
         >>> sorted(x.combineByKey(str, add, add).collect())
@@ -1845,9 +1882,9 @@
         Group the values for each key in the RDD into a single sequence.
         Hash-partitions the resulting RDD with numPartitions partitions.

-        Note: If you are grouping in order to perform an aggregation (such as a
-        sum or average) over each key, using reduceByKey or aggregateByKey will
-        provide much better performance.
+        .. note:: If you are grouping in order to perform an aggregation (such 
as a
+            sum or average) over each key, using reduceByKey or aggregateByKey 
will
+            provide much better performance.

         >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
         >>> sorted(rdd.groupByKey().mapValues(len).collect())
@@ -2018,8 +2055,7 @@
          >>> len(rdd.repartition(10).glom().collect())
          10
         """
-        jrdd = self._jrdd.repartition(numPartitions)
-        return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+        return self.coalesce(numPartitions, shuffle=True)

     def coalesce(self, numPartitions, shuffle=False):
         """
@@ -2030,7 +2066,15 @@
         >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
         [[1, 2, 3, 4, 5]]
         """
-        jrdd = self._jrdd.coalesce(numPartitions, shuffle)
+        if shuffle:
+            # Decrease the batch size in order to distribute evenly the 
elements across output
+            # partitions. Otherwise, repartition will possibly produce highly 
skewed partitions.
+            batchSize = min(10, self.ctx._batchSize or 1024)
+            ser = BatchedSerializer(PickleSerializer(), batchSize)
+            selfCopy = self._reserialize(ser)
+            jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle)
+        else:
+            jrdd = self._jrdd.coalesce(numPartitions, shuffle)
         return RDD(jrdd, self.ctx, self._jrdd_deserializer)

     def zip(self, other):
@@ -2316,16 +2360,9 @@
         # The broadcast will have same life cycle as created PythonRDD
         broadcast = sc.broadcast(pickled_command)
         pickled_command = ser.dumps(broadcast)
-    # There is a bug in py4j.java_gateway.JavaClass with auto_convert
-    # https://github.com/bartdag/py4j/issues/161
-    # TODO: use auto_convert once py4j fix the bug
-    broadcast_vars = ListConverter().convert(
-        [x._jbroadcast for x in sc._pickled_broadcast_vars],
-        sc._gateway._gateway_client)
+    broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars]
     sc._pickled_broadcast_vars.clear()
-    env = MapConverter().convert(sc.environment, sc._gateway._gateway_client)
-    includes = ListConverter().convert(sc._python_includes, 
sc._gateway._gateway_client)
-    return pickled_command, broadcast_vars, env, includes
+    return pickled_command, broadcast_vars, sc.environment, sc._python_includes


 def _wrap_function(sc, func, deserializer, serializer, profiler=None):
@@ -2433,4 +2470,4 @@


 if __name__ == "__main__":
-    _test()
\ No newline at end of file
+    _test()
{code}

serializers diff:
{code}
--- tmp.py  2017-03-08 15:13:45.000000000 -0600
+++ <redacted>/lib/python2.7/site-packages/pyspark/serializers.py   2017-03-08 
15:13:03.000000000 -0600
@@ -61,7 +61,7 @@
 if sys.version < '3':
     import cPickle as pickle
     protocol = 2
-    from itertools import izip as zip
+    from itertools import izip as zip, imap as map
 else:
     import pickle
     protocol = 3
@@ -96,7 +96,12 @@
         raise NotImplementedError

     def _load_stream_without_unbatching(self, stream):
-        return self.load_stream(stream)
+        """
+        Return an iterator of deserialized batches (lists) of objects from the 
input stream.
+        if the serializer does not operate on batches the default 
implementation returns an
+        iterator of single element lists.
+        """
+        return map(lambda x: [x], self.load_stream(stream))

     # Note: our notion of "equality" is that output generated by
     # equal serializers can be deserialized using the same serializer.
@@ -278,50 +283,57 @@
         return "AutoBatchedSerializer(%s)" % self.serializer


-class CartesianDeserializer(FramedSerializer):
+class CartesianDeserializer(Serializer):

     """
     Deserializes the JavaRDD cartesian() of two PythonRDDs.
+    Due to pyspark batching we cannot simply use the result of the Java RDD 
cartesian,
+    we additionally need to do the cartesian within each pair of batches.
     """

     def __init__(self, key_ser, val_ser):
-        FramedSerializer.__init__(self)
         self.key_ser = key_ser
         self.val_ser = val_ser

-    def prepare_keys_values(self, stream):
-        key_stream = self.key_ser._load_stream_without_unbatching(stream)
-        val_stream = self.val_ser._load_stream_without_unbatching(stream)
-        key_is_batched = isinstance(self.key_ser, BatchedSerializer)
-        val_is_batched = isinstance(self.val_ser, BatchedSerializer)
-        for (keys, vals) in zip(key_stream, val_stream):
-            keys = keys if key_is_batched else [keys]
-            vals = vals if val_is_batched else [vals]
-            yield (keys, vals)
+    def _load_stream_without_unbatching(self, stream):
+        key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
+        val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
+        for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
+            # for correctness with repeated cartesian/zip this must be 
returned as one batch
+            yield product(key_batch, val_batch)

     def load_stream(self, stream):
-        for (keys, vals) in self.prepare_keys_values(stream):
-            for pair in product(keys, vals):
-                yield pair
+        return 
chain.from_iterable(self._load_stream_without_unbatching(stream))

     def __repr__(self):
         return "CartesianDeserializer(%s, %s)" % \
                (str(self.key_ser), str(self.val_ser))


-class PairDeserializer(CartesianDeserializer):
+class PairDeserializer(Serializer):

     """
     Deserializes the JavaRDD zip() of two PythonRDDs.
+    Due to pyspark batching we cannot simply use the result of the Java RDD 
zip,
+    we additionally need to do the zip within each pair of batches.
     """

+    def __init__(self, key_ser, val_ser):
+        self.key_ser = key_ser
+        self.val_ser = val_ser
+
+    def _load_stream_without_unbatching(self, stream):
+        key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
+        val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
+        for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
+            if len(key_batch) != len(val_batch):
+                raise ValueError("Can not deserialize PairRDD with different 
number of items"
+                                 " in batches: (%d, %d)" % (len(key_batch), 
len(val_batch)))
+            # for correctness with repeated cartesian/zip this must be 
returned as one batch
+            yield zip(key_batch, val_batch)
+
     def load_stream(self, stream):
-        for (keys, vals) in self.prepare_keys_values(stream):
-            if len(keys) != len(vals):
-                raise ValueError("Can not deserialize RDD with different 
number of items"
-                                 " in pair: (%d, %d)" % (len(keys), len(vals)))
-            for pair in zip(keys, vals):
-                yield pair
+        return 
chain.from_iterable(self._load_stream_without_unbatching(stream))

     def __repr__(self):
         return "PairDeserializer(%s, %s)" % (str(self.key_ser), 
str(self.val_ser))
@@ -378,6 +390,16 @@
     _old_namedtuple = _copy_func(collections.namedtuple)

     def namedtuple(*args, **kwargs):
+        import sys
+        if sys.version.startswith('3'):
+            defaults = {
+                'verbose': False,
+                'rename': False,
+                'module': None,
+            }
+            for key, value in defaults.items():
+                if key not in kwargs:
+                    kwargs[key] = value
         cls = _old_namedtuple(*args, **kwargs)
         return _hack_namedtuple(cls)

@@ -559,4 +581,4 @@
     import doctest
     (failure_count, test_count) = doctest.testmod()
     if failure_count:
-        exit(-1)
\ No newline at end of file
+        exit(-1)
{code}

> UnicodeDecodeError in Pyspark on sc.textFile read with repartition
> ------------------------------------------------------------------
>
>                 Key: SPARK-19872
>                 URL: https://issues.apache.org/jira/browse/SPARK-19872
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.1.0
>         Environment: Mac and EC2
>            Reporter: Brian Bruggeman
>
> I'm receiving the following traceback:
> {code}
> >>> sc.textFile('test.txt').repartition(10).collect()
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/rdd.py", 
> line 810, in collect
>     return list(_load_from_socket(port, self._jrdd_deserializer))
>   File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/rdd.py", 
> line 140, in _load_from_socket
>     for item in serializer.load_stream(rf):
>   File 
> "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/serializers.py", 
> line 539, in load_stream
>     yield self.loads(stream)
>   File 
> "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/serializers.py", 
> line 534, in loads
>     return s.decode("utf-8") if self.use_unicode else s
>   File "/Users/brianbruggeman/.envs/dg/lib/python2.7/encodings/utf_8.py", 
> line 16, in decode
>     return codecs.utf_8_decode(input, errors, True)
> UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: 
> invalid start byte
> {code}
> I created a textfile (text.txt) with standard linux newlines:
> {code}
> a
> b
> d
> e
> f
> g
> h
> i
> j
> k
> l
> {code}
> I think ran pyspark:
> {code}
> $ pyspark
> Python 2.7.13 (default, Dec 18 2016, 07:03:39)
> [GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] on darwin
> Type "help", "copyright", "credits" or "license" for more information.
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 17/03/08 13:59:27 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 17/03/08 13:59:32 WARN ObjectStore: Failed to get database global_temp, 
> returning NoSuchObjectException
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
>       /_/
> Using Python version 2.7.13 (default, Dec 18 2016 07:03:39)
> SparkSession available as 'spark'.
> >>> sc.textFile('test.txt').collect()
> [u'a', u'b', u'c', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l']
> >>> sc.textFile('test.txt', use_unicode=False).collect()
> ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l']
> >>> sc.textFile('test.txt', use_unicode=False).repartition(10).collect()
> ['\x80\x02]q\x01(U\x01aU\x01bU\x01cU\x01dU\x01eU\x01fU\x01ge.', 
> '\x80\x02]q\x01(U\x01hU\x01iU\x01jU\x01kU\x01le.']
> >>> sc.textFile('test.txt').repartition(10).collect()
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/rdd.py", 
> line 810, in collect
>     return list(_load_from_socket(port, self._jrdd_deserializer))
>   File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/rdd.py", 
> line 140, in _load_from_socket
>     for item in serializer.load_stream(rf):
>   File 
> "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/serializers.py", 
> line 539, in load_stream
>     yield self.loads(stream)
>   File 
> "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/serializers.py", 
> line 534, in loads
>     return s.decode("utf-8") if self.use_unicode else s
>   File "/Users/brianbruggeman/.envs/dg/lib/python2.7/encodings/utf_8.py", 
> line 16, in decode
>     return codecs.utf_8_decode(input, errors, True)
> UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: 
> invalid start byte
> {code}
> This really looks like a bug in the `serializers.py` code.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to