[
https://issues.apache.org/jira/browse/SPARK-19872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15901997#comment-15901997
]
Brian Bruggeman commented on SPARK-19872:
-----------------------------------------
I reverted `rdd.py` and `serializers.py` to the 2.0.2 branch in github and the
code above works without an error.
rdd.py link:
https://github.com/apache/spark/blob/branch-2.0/python/pyspark/rdd.py
serializers.py link:
https://github.com/apache/spark/blob/branch-2.0/python/pyspark/serializers.py
rdd diff:
{code}
--- tmp2.py 2017-03-08 15:17:45.000000000 -0600
+++ saved2.py 2017-03-08 15:17:59.000000000 -0600
@@ -52,8 +52,6 @@
get_used_memory, ExternalSorter, ExternalGroupBy
from pyspark.traceback_utils import SCCallSiteSync
-from py4j.java_collections import ListConverter, MapConverter
-
__all__ = ["RDD"]
@@ -137,11 +135,12 @@
break
if not sock:
raise Exception("could not open socket")
- # The RDD materialization time is unpredicable, if we set a timeout for
socket reading
- # operation, it will very possibly fail. See SPARK-18281.
- sock.settimeout(None)
- # The socket will be automatically closed when garbage-collected.
- return serializer.load_stream(sock.makefile("rb", 65536))
+ try:
+ rf = sock.makefile("rb", 65536)
+ for item in serializer.load_stream(rf):
+ yield item
+ finally:
+ sock.close()
def ignore_unicode_prefix(f):
@@ -264,13 +263,44 @@
def isCheckpointed(self):
"""
- Return whether this RDD has been checkpointed or not
+ Return whether this RDD is checkpointed and materialized, either
reliably or locally.
"""
return self._jrdd.rdd().isCheckpointed()
+ def localCheckpoint(self):
+ """
+ Mark this RDD for local checkpointing using Spark's existing caching
layer.
+
+ This method is for users who wish to truncate RDD lineages while
skipping the expensive
+ step of replicating the materialized data in a reliable distributed
file system. This is
+ useful for RDDs with long lineages that need to be truncated
periodically (e.g. GraphX).
+
+ Local checkpointing sacrifices fault-tolerance for performance. In
particular, checkpointed
+ data is written to ephemeral local storage in the executors instead of
to a reliable,
+ fault-tolerant storage. The effect is that if an executor fails during
the computation,
+ the checkpointed data may no longer be accessible, causing an
irrecoverable job failure.
+
+ This is NOT safe to use with dynamic allocation, which removes
executors along
+ with their cached blocks. If you must use both features, you are
advised to set
+ L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value.
+
+ The checkpoint directory set through
L{SparkContext.setCheckpointDir()} is not used.
+ """
+ self._jrdd.rdd().localCheckpoint()
+
+ def isLocallyCheckpointed(self):
+ """
+ Return whether this RDD is marked for local checkpointing.
+
+ Exposed for testing.
+ """
+ return self._jrdd.rdd().isLocallyCheckpointed()
+
def getCheckpointFile(self):
"""
Gets the name of the file to which this RDD was checkpointed
+
+ Not defined if RDD is checkpointed locally.
"""
checkpointFile = self._jrdd.rdd().getCheckpointFile()
if checkpointFile.isDefined():
@@ -387,6 +417,9 @@
with replacement: expected number of times each element is chosen;
fraction must be >= 0
:param seed: seed for the random number generator
+ .. note:: This is not guaranteed to provide exactly the fraction
specified of the total
+ count of the given :class:`DataFrame`.
+
>>> rdd = sc.parallelize(range(100), 4)
>>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14
True
@@ -425,8 +458,8 @@
"""
Return a fixed-size sampled subset of this RDD.
- Note that this method should only be used if the resulting array is
expected
- to be small, as all the data is loaded into the driver's memory.
+ .. note:: This method should only be used if the resulting array is
expected
+ to be small, as all the data is loaded into the driver's memory.
>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
@@ -537,7 +570,7 @@
Return the intersection of this RDD and another one. The output will
not contain any duplicate elements, even if the input RDDs did.
- Note that this method performs a shuffle internally.
+ .. note:: This method performs a shuffle internally.
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
@@ -768,8 +801,9 @@
def collect(self):
"""
Return a list that contains all of the elements in this RDD.
- Note that this method should only be used if the resulting array is
expected
- to be small, as all the data is loaded into the driver's memory.
+
+ .. note:: This method should only be used if the resulting array is
expected
+ to be small, as all the data is loaded into the driver's memory.
"""
with SCCallSiteSync(self.context) as css:
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
@@ -1214,12 +1248,12 @@
def top(self, num, key=None):
"""
- Get the top N elements from a RDD.
+ Get the top N elements from an RDD.
- Note that this method should only be used if the resulting array is
expected
- to be small, as all the data is loaded into the driver's memory.
+ .. note:: This method should only be used if the resulting array is
expected
+ to be small, as all the data is loaded into the driver's memory.
- Note: It returns the list sorted in descending order.
+ .. note:: It returns the list sorted in descending order.
>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
[12]
@@ -1238,11 +1272,11 @@
def takeOrdered(self, num, key=None):
"""
- Get the N elements from a RDD ordered in ascending order or as
+ Get the N elements from an RDD ordered in ascending order or as
specified by the optional key function.
- Note that this method should only be used if the resulting array is
expected
- to be small, as all the data is loaded into the driver's memory.
+ .. note:: this method should only be used if the resulting array is
expected
+ to be small, as all the data is loaded into the driver's memory.
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
@@ -1263,11 +1297,11 @@
that partition to estimate the number of additional partitions needed
to satisfy the limit.
- Note that this method should only be used if the resulting array is
expected
- to be small, as all the data is loaded into the driver's memory.
-
Translated from the Scala implementation in RDD#take().
+ .. note:: this method should only be used if the resulting array is
expected
+ to be small, as all the data is loaded into the driver's memory.
+
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
[2, 3]
>>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
@@ -1331,8 +1365,9 @@
def isEmpty(self):
"""
- Returns true if and only if the RDD contains no elements at all. Note
that an RDD
- may be empty even when it has at least 1 partition.
+ Returns true if and only if the RDD contains no elements at all.
+
+ .. note:: an RDD may be empty even when it has at least 1 partition.
>>> sc.parallelize([]).isEmpty()
True
@@ -1523,8 +1558,8 @@
"""
Return the key-value pairs in this RDD to the master as a dictionary.
- Note that this method should only be used if the resulting data is
expected
- to be small, as all the data is loaded into the driver's memory.
+ .. note:: this method should only be used if the resulting data is
expected
+ to be small, as all the data is loaded into the driver's memory.
>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
>>> m[1]
@@ -1761,8 +1796,7 @@
set of aggregation functions.
Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
- type" C. Note that V and C can be different -- for example, one might
- group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
+ type" C.
Users provide three functions:
@@ -1774,6 +1808,9 @@
In addition, users can control the partitioning of the output RDD.
+ .. note:: V and C can be different -- for example, one might group an
RDD of type
+ (Int, Int) into an RDD of type (Int, List[Int]).
+
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> def add(a, b): return a + str(b)
>>> sorted(x.combineByKey(str, add, add).collect())
@@ -1845,9 +1882,9 @@
Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with numPartitions partitions.
- Note: If you are grouping in order to perform an aggregation (such as a
- sum or average) over each key, using reduceByKey or aggregateByKey will
- provide much better performance.
+ .. note:: If you are grouping in order to perform an aggregation (such
as a
+ sum or average) over each key, using reduceByKey or aggregateByKey
will
+ provide much better performance.
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect())
@@ -2018,8 +2055,7 @@
>>> len(rdd.repartition(10).glom().collect())
10
"""
- jrdd = self._jrdd.repartition(numPartitions)
- return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+ return self.coalesce(numPartitions, shuffle=True)
def coalesce(self, numPartitions, shuffle=False):
"""
@@ -2030,7 +2066,15 @@
>>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
[[1, 2, 3, 4, 5]]
"""
- jrdd = self._jrdd.coalesce(numPartitions, shuffle)
+ if shuffle:
+ # Decrease the batch size in order to distribute evenly the
elements across output
+ # partitions. Otherwise, repartition will possibly produce highly
skewed partitions.
+ batchSize = min(10, self.ctx._batchSize or 1024)
+ ser = BatchedSerializer(PickleSerializer(), batchSize)
+ selfCopy = self._reserialize(ser)
+ jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle)
+ else:
+ jrdd = self._jrdd.coalesce(numPartitions, shuffle)
return RDD(jrdd, self.ctx, self._jrdd_deserializer)
def zip(self, other):
@@ -2316,16 +2360,9 @@
# The broadcast will have same life cycle as created PythonRDD
broadcast = sc.broadcast(pickled_command)
pickled_command = ser.dumps(broadcast)
- # There is a bug in py4j.java_gateway.JavaClass with auto_convert
- # https://github.com/bartdag/py4j/issues/161
- # TODO: use auto_convert once py4j fix the bug
- broadcast_vars = ListConverter().convert(
- [x._jbroadcast for x in sc._pickled_broadcast_vars],
- sc._gateway._gateway_client)
+ broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars]
sc._pickled_broadcast_vars.clear()
- env = MapConverter().convert(sc.environment, sc._gateway._gateway_client)
- includes = ListConverter().convert(sc._python_includes,
sc._gateway._gateway_client)
- return pickled_command, broadcast_vars, env, includes
+ return pickled_command, broadcast_vars, sc.environment, sc._python_includes
def _wrap_function(sc, func, deserializer, serializer, profiler=None):
@@ -2433,4 +2470,4 @@
if __name__ == "__main__":
- _test()
\ No newline at end of file
+ _test()
{code}
serializers diff:
{code}
--- tmp.py 2017-03-08 15:13:45.000000000 -0600
+++ <redacted>/lib/python2.7/site-packages/pyspark/serializers.py 2017-03-08
15:13:03.000000000 -0600
@@ -61,7 +61,7 @@
if sys.version < '3':
import cPickle as pickle
protocol = 2
- from itertools import izip as zip
+ from itertools import izip as zip, imap as map
else:
import pickle
protocol = 3
@@ -96,7 +96,12 @@
raise NotImplementedError
def _load_stream_without_unbatching(self, stream):
- return self.load_stream(stream)
+ """
+ Return an iterator of deserialized batches (lists) of objects from the
input stream.
+ if the serializer does not operate on batches the default
implementation returns an
+ iterator of single element lists.
+ """
+ return map(lambda x: [x], self.load_stream(stream))
# Note: our notion of "equality" is that output generated by
# equal serializers can be deserialized using the same serializer.
@@ -278,50 +283,57 @@
return "AutoBatchedSerializer(%s)" % self.serializer
-class CartesianDeserializer(FramedSerializer):
+class CartesianDeserializer(Serializer):
"""
Deserializes the JavaRDD cartesian() of two PythonRDDs.
+ Due to pyspark batching we cannot simply use the result of the Java RDD
cartesian,
+ we additionally need to do the cartesian within each pair of batches.
"""
def __init__(self, key_ser, val_ser):
- FramedSerializer.__init__(self)
self.key_ser = key_ser
self.val_ser = val_ser
- def prepare_keys_values(self, stream):
- key_stream = self.key_ser._load_stream_without_unbatching(stream)
- val_stream = self.val_ser._load_stream_without_unbatching(stream)
- key_is_batched = isinstance(self.key_ser, BatchedSerializer)
- val_is_batched = isinstance(self.val_ser, BatchedSerializer)
- for (keys, vals) in zip(key_stream, val_stream):
- keys = keys if key_is_batched else [keys]
- vals = vals if val_is_batched else [vals]
- yield (keys, vals)
+ def _load_stream_without_unbatching(self, stream):
+ key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
+ val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
+ for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
+ # for correctness with repeated cartesian/zip this must be
returned as one batch
+ yield product(key_batch, val_batch)
def load_stream(self, stream):
- for (keys, vals) in self.prepare_keys_values(stream):
- for pair in product(keys, vals):
- yield pair
+ return
chain.from_iterable(self._load_stream_without_unbatching(stream))
def __repr__(self):
return "CartesianDeserializer(%s, %s)" % \
(str(self.key_ser), str(self.val_ser))
-class PairDeserializer(CartesianDeserializer):
+class PairDeserializer(Serializer):
"""
Deserializes the JavaRDD zip() of two PythonRDDs.
+ Due to pyspark batching we cannot simply use the result of the Java RDD
zip,
+ we additionally need to do the zip within each pair of batches.
"""
+ def __init__(self, key_ser, val_ser):
+ self.key_ser = key_ser
+ self.val_ser = val_ser
+
+ def _load_stream_without_unbatching(self, stream):
+ key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
+ val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
+ for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
+ if len(key_batch) != len(val_batch):
+ raise ValueError("Can not deserialize PairRDD with different
number of items"
+ " in batches: (%d, %d)" % (len(key_batch),
len(val_batch)))
+ # for correctness with repeated cartesian/zip this must be
returned as one batch
+ yield zip(key_batch, val_batch)
+
def load_stream(self, stream):
- for (keys, vals) in self.prepare_keys_values(stream):
- if len(keys) != len(vals):
- raise ValueError("Can not deserialize RDD with different
number of items"
- " in pair: (%d, %d)" % (len(keys), len(vals)))
- for pair in zip(keys, vals):
- yield pair
+ return
chain.from_iterable(self._load_stream_without_unbatching(stream))
def __repr__(self):
return "PairDeserializer(%s, %s)" % (str(self.key_ser),
str(self.val_ser))
@@ -378,6 +390,16 @@
_old_namedtuple = _copy_func(collections.namedtuple)
def namedtuple(*args, **kwargs):
+ import sys
+ if sys.version.startswith('3'):
+ defaults = {
+ 'verbose': False,
+ 'rename': False,
+ 'module': None,
+ }
+ for key, value in defaults.items():
+ if key not in kwargs:
+ kwargs[key] = value
cls = _old_namedtuple(*args, **kwargs)
return _hack_namedtuple(cls)
@@ -559,4 +581,4 @@
import doctest
(failure_count, test_count) = doctest.testmod()
if failure_count:
- exit(-1)
\ No newline at end of file
+ exit(-1)
{code}
> UnicodeDecodeError in Pyspark on sc.textFile read with repartition
> ------------------------------------------------------------------
>
> Key: SPARK-19872
> URL: https://issues.apache.org/jira/browse/SPARK-19872
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.1.0
> Environment: Mac and EC2
> Reporter: Brian Bruggeman
>
> I'm receiving the following traceback:
> {code}
> >>> sc.textFile('test.txt').repartition(10).collect()
> Traceback (most recent call last):
> File "<stdin>", line 1, in <module>
> File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/rdd.py",
> line 810, in collect
> return list(_load_from_socket(port, self._jrdd_deserializer))
> File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/rdd.py",
> line 140, in _load_from_socket
> for item in serializer.load_stream(rf):
> File
> "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/serializers.py",
> line 539, in load_stream
> yield self.loads(stream)
> File
> "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/serializers.py",
> line 534, in loads
> return s.decode("utf-8") if self.use_unicode else s
> File "/Users/brianbruggeman/.envs/dg/lib/python2.7/encodings/utf_8.py",
> line 16, in decode
> return codecs.utf_8_decode(input, errors, True)
> UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0:
> invalid start byte
> {code}
> I created a textfile (text.txt) with standard linux newlines:
> {code}
> a
> b
> d
> e
> f
> g
> h
> i
> j
> k
> l
> {code}
> I think ran pyspark:
> {code}
> $ pyspark
> Python 2.7.13 (default, Dec 18 2016, 07:03:39)
> [GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] on darwin
> Type "help", "copyright", "credits" or "license" for more information.
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> 17/03/08 13:59:27 WARN NativeCodeLoader: Unable to load native-hadoop library
> for your platform... using builtin-java classes where applicable
> 17/03/08 13:59:32 WARN ObjectStore: Failed to get database global_temp,
> returning NoSuchObjectException
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /__ / .__/\_,_/_/ /_/\_\ version 2.1.0
> /_/
> Using Python version 2.7.13 (default, Dec 18 2016 07:03:39)
> SparkSession available as 'spark'.
> >>> sc.textFile('test.txt').collect()
> [u'a', u'b', u'c', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l']
> >>> sc.textFile('test.txt', use_unicode=False).collect()
> ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l']
> >>> sc.textFile('test.txt', use_unicode=False).repartition(10).collect()
> ['\x80\x02]q\x01(U\x01aU\x01bU\x01cU\x01dU\x01eU\x01fU\x01ge.',
> '\x80\x02]q\x01(U\x01hU\x01iU\x01jU\x01kU\x01le.']
> >>> sc.textFile('test.txt').repartition(10).collect()
> Traceback (most recent call last):
> File "<stdin>", line 1, in <module>
> File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/rdd.py",
> line 810, in collect
> return list(_load_from_socket(port, self._jrdd_deserializer))
> File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/rdd.py",
> line 140, in _load_from_socket
> for item in serializer.load_stream(rf):
> File
> "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/serializers.py",
> line 539, in load_stream
> yield self.loads(stream)
> File
> "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/serializers.py",
> line 534, in loads
> return s.decode("utf-8") if self.use_unicode else s
> File "/Users/brianbruggeman/.envs/dg/lib/python2.7/encodings/utf_8.py",
> line 16, in decode
> return codecs.utf_8_decode(input, errors, True)
> UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0:
> invalid start byte
> {code}
> This really looks like a bug in the `serializers.py` code.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]