Eric Liang created SPARK-17472:
----------------------------------
Summary: Better error message for serialization failures of large
objects in Python
Key: SPARK-17472
URL: https://issues.apache.org/jira/browse/SPARK-17472
Project: Spark
Issue Type: Improvement
Components: PySpark
Reporter: Eric Liang
Priority: Minor
{code}
def run():
import numpy.random as nr
b = nr.bytes(8 * 1000000000)
sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
run()
{code}
Gives you the following error from pickle
{code}
error: 'i' format requires -2147483648 <= number <= 2147483647
---------------------------------------------------------------------------
error Traceback (most recent call last)
<ipython-input-14-ba73d84faba7> in <module>()
4 sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
5
----> 6 run()
<ipython-input-14-ba73d84faba7> in run()
2 import numpy.random as nr
3 b = nr.bytes(8 * 1000000000)
----> 4 sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
5
6 run()
/databricks/spark/python/pyspark/rdd.pyc in count(self)
1002 3
1003 """
-> 1004 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1005
1006 def stats(self):
/databricks/spark/python/pyspark/rdd.pyc in sum(self)
993 6.0
994 """
--> 995 return self.mapPartitions(lambda x: [sum(x)]).fold(0,
operator.add)
996
997 def count(self):
/databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, op)
867 # zeroValue provided to each partition is unique from the one
provided
868 # to the final reduce call
--> 869 vals = self.mapPartitions(func).collect()
870 return reduce(op, vals, zeroValue)
871
/databricks/spark/python/pyspark/rdd.pyc in collect(self)
769 """
770 with SCCallSiteSync(self.context) as css:
--> 771 port =
self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
772 return list(_load_from_socket(port, self._jrdd_deserializer))
773
/databricks/spark/python/pyspark/rdd.pyc in _jrdd(self)
2377 command = (self.func, profiler, self._prev_jrdd_deserializer,
2378 self._jrdd_deserializer)
-> 2379 pickled_cmd, bvars, env, includes =
_prepare_for_python_RDD(self.ctx, command, self)
2380 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
2381 bytearray(pickled_cmd),
/databricks/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc,
command, obj)
2297 # the serialized command will be compressed by broadcast
2298 ser = CloudPickleSerializer()
-> 2299 pickled_command = ser.dumps(command)
2300 if len(pickled_command) > (1 << 20): # 1M
2301 # The broadcast will have same life cycle as created PythonRDD
/databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj)
426
427 def dumps(self, obj):
--> 428 return cloudpickle.dumps(obj, 2)
429
430
/databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
655
656 cp = CloudPickler(file,protocol)
--> 657 cp.dump(obj)
658
659 return file.getvalue()
/databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
105 self.inject_addons()
106 try:
--> 107 return Pickler.dump(self, obj)
108 except RuntimeError as e:
109 if 'recursion' in e.args[0]:
/usr/lib/python2.7/pickle.pyc in dump(self, obj)
222 if self.proto >= 2:
223 self.write(PROTO + chr(self.proto))
--> 224 self.save(obj)
225 self.write(STOP)
226
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
560 write(MARK)
561 for element in obj:
--> 562 save(element)
563
564 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj,
name)
202 klass = getattr(themodule, name, None)
203 if klass is None or klass is not obj:
--> 204 self.save_function_tuple(obj)
205 return
206
/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self,
func)
239 # create a skeleton function object and memoize it
240 save(_make_skel_func)
--> 241 save((code, closure, base_globals))
242 write(pickle.REDUCE)
243 self.memoize(func)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
598
599 self.memoize(obj)
--> 600 self._batch_appends(iter(obj))
601
602 dispatch[ListType] = save_list
/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
631 write(MARK)
632 for x in tmp:
--> 633 save(x)
634 write(APPENDS)
635 elif n:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj,
name)
202 klass = getattr(themodule, name, None)
203 if klass is None or klass is not obj:
--> 204 self.save_function_tuple(obj)
205 return
206
/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self,
func)
239 # create a skeleton function object and memoize it
240 save(_make_skel_func)
--> 241 save((code, closure, base_globals))
242 write(pickle.REDUCE)
243 self.memoize(func)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
598
599 self.memoize(obj)
--> 600 self._batch_appends(iter(obj))
601
602 dispatch[ListType] = save_list
/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
631 write(MARK)
632 for x in tmp:
--> 633 save(x)
634 write(APPENDS)
635 elif n:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj,
name)
202 klass = getattr(themodule, name, None)
203 if klass is None or klass is not obj:
--> 204 self.save_function_tuple(obj)
205 return
206
/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self,
func)
239 # create a skeleton function object and memoize it
240 save(_make_skel_func)
--> 241 save((code, closure, base_globals))
242 write(pickle.REDUCE)
243 self.memoize(func)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
598
599 self.memoize(obj)
--> 600 self._batch_appends(iter(obj))
601
602 dispatch[ListType] = save_list
/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
631 write(MARK)
632 for x in tmp:
--> 633 save(x)
634 write(APPENDS)
635 elif n:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj,
name)
202 klass = getattr(themodule, name, None)
203 if klass is None or klass is not obj:
--> 204 self.save_function_tuple(obj)
205 return
206
/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self,
func)
239 # create a skeleton function object and memoize it
240 save(_make_skel_func)
--> 241 save((code, closure, base_globals))
242 write(pickle.REDUCE)
243 self.memoize(func)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
598
599 self.memoize(obj)
--> 600 self._batch_appends(iter(obj))
601
602 dispatch[ListType] = save_list
/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
634 write(APPENDS)
635 elif n:
--> 636 save(tmp[0])
637 write(APPEND)
638 # else tmp is empty, and we're done
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj,
name)
196 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or
themodule is None:
197 #print("save global", islambda(obj),
obj.__code__.co_filename, modname, themodule)
--> 198 self.save_function_tuple(obj)
199 return
200 else:
/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self,
func)
239 # create a skeleton function object and memoize it
240 save(_make_skel_func)
--> 241 save((code, closure, base_globals))
242 write(pickle.REDUCE)
243 self.memoize(func)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
598
599 self.memoize(obj)
--> 600 self._batch_appends(iter(obj))
601
602 dispatch[ListType] = save_list
/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
634 write(APPENDS)
635 elif n:
--> 636 save(tmp[0])
637 write(APPEND)
638 # else tmp is empty, and we're done
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_string(self, obj, pack)
484 self.write(SHORT_BINSTRING + chr(n) + obj)
485 else:
--> 486 self.write(BINSTRING + pack("<i", n) + obj)
487 else:
488 self.write(STRING + repr(obj) + '\n')
error: 'i' format requires -2147483648 <= number <= 2147483647
{code}
=======================================================
{code}
def run():
import numpy.random as nr
b = sc.broadcast(nr.bytes(8 * 1000000000))
sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
run()
{code}
Gives you
{code}
---------------------------------------------------------------------------
SystemError Traceback (most recent call last)
<ipython-input-14-53cbdb8ed528> in <module>()
4 sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
5
----> 6 run()
<ipython-input-14-53cbdb8ed528> in run()
1 def run():
2 import numpy.random as nr
----> 3 b = sc.broadcast(nr.bytes(8 * 1000000000))
4 sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
5
/databricks/spark/python/pyspark/context.py in broadcast(self, value)
741 be sent to each cluster only once.
742 """
--> 743 return Broadcast(self, value, self._pickled_broadcast_vars)
744
745 def accumulator(self, value, accum_param=None):
/databricks/spark/python/pyspark/broadcast.py in __init__(self, sc, value,
pickle_registry, path)
68 if sc is not None:
69 f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)
---> 70 self._path = self.dump(value, f)
71 self._jbroadcast =
sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path)
72 self._pickle_registry = pickle_registry
/databricks/spark/python/pyspark/broadcast.py in dump(self, value, f)
76
77 def dump(self, value, f):
---> 78 pickle.dump(value, f, 2)
79 f.close()
80 return f.name
SystemError: error return without exception set
{code}
In both cases, we should have a better error saying that the task or broadcast
could not be serialized or was too big.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]