Davies Liu created SPARK-10542:
----------------------------------
Summary: The PySpark 1.5 closure serializer can't serialize a
namedtuple instance.
Key: SPARK-10542
URL: https://issues.apache.org/jira/browse/SPARK-10542
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 1.5.0
Reporter: Davies Liu
Assignee: Davies Liu
Priority: Critical
Code to Reproduce Bug:
{code}
from collections import namedtuple
PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"])
rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 5.0))
rdd.count()
{code}
Error message on Spark 1.5:
{code}
AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-5-59448e31019f> in <module>()
2 PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"])
3 rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0,
5.0))
----> 4 rdd.count()
/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in count(self)
1004 3
1005 """
-> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1007
1008 def stats(self):
/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in sum(self)
995 6.0
996 """
--> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0,
operator.add)
998
999 def count(self):
/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue,
op)
869 # zeroValue provided to each partition is unique from the one
provided
870 # to the final reduce call
--> 871 vals = self.mapPartitions(func).collect()
872 return reduce(op, vals, zeroValue)
873
/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in collect(self)
771 """
772 with SCCallSiteSync(self.context) as css:
--> 773 port =
self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
774 return list(_load_from_socket(port, self._jrdd_deserializer))
775
/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in _jrdd(self)
2383 command = (self.func, profiler, self._prev_jrdd_deserializer,
2384 self._jrdd_deserializer)
-> 2385 pickled_cmd, bvars, env, includes =
_prepare_for_python_RDD(self.ctx, command, self)
2386 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
2387 bytearray(pickled_cmd),
/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in
_prepare_for_python_RDD(sc, command, obj)
2303 # the serialized command will be compressed by broadcast
2304 ser = CloudPickleSerializer()
-> 2305 pickled_command = ser.dumps(command)
2306 if len(pickled_command) > (1 << 20): # 1M
2307 # The broadcast will have same life cycle as created PythonRDD
/home/ubuntu/databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj)
425
426 def dumps(self, obj):
--> 427 return cloudpickle.dumps(obj, 2)
428
429
/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj,
protocol)
639
640 cp = CloudPickler(file,protocol)
--> 641 cp.dump(obj)
642
643 return file.getvalue()
/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
105 self.inject_addons()
106 try:
--> 107 return Pickler.dump(self, obj)
108 except RuntimeError as e:
109 if 'recursion' in e.args[0]:
/usr/lib/python2.7/pickle.pyc in dump(self, obj)
222 if self.proto >= 2:
223 self.write(PROTO + chr(self.proto))
--> 224 self.save(obj)
225 self.write(STOP)
226
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
560 write(MARK)
561 for element in obj:
--> 562 save(element)
563
564 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
... skipped 23125 bytes ...
650
651 dispatch[DictionaryType] = save_dict
/usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
684 k, v = tmp[0]
685 save(k)
--> 686 save(v)
687 write(SETITEM)
688 # else tmp is empty, and we're done
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in
save_global(self, obj, name, pack)
367 v = v.__func__
368 dd[k] = v
--> 369 self.save(dd)
370 self.write(pickle.TUPLE2)
371 self.write(pickle.REDUCE)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_dict(self, obj)
647
648 self.memoize(obj)
--> 649 self._batch_setitems(obj.iteritems())
650
651 dispatch[DictionaryType] = save_dict
/usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
679 for k, v in tmp:
680 save(k)
--> 681 save(v)
682 write(SETITEMS)
683 elif n:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in
save_function(self, obj, name)
191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or
themodule is None:
192 #print("save global", islambda(obj),
obj.__code__.co_filename, modname, themodule)
--> 193 self.save_function_tuple(obj)
194 return
195 else:
/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in
save_function_tuple(self, func)
240 # save the rest of the func data needed by _fill_function
241 save(f_globals)
--> 242 save(defaults)
243 save(dct)
244 write(pickle.TUPLE)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in
save_builtin_function(self, obj)
313 if obj.__module__ is "__builtin__":
314 return self.save_global(obj)
--> 315 return self.save_function(obj)
316 dispatch[types.BuiltinFunctionType] = save_builtin_function
317
/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in
save_function(self, obj, name)
189 # we'll pickle the actual function object rather than simply
saving a
190 # reference (as is done in default pickler), via
save_function_tuple.
--> 191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or
themodule is None:
192 #print("save global", islambda(obj),
obj.__code__.co_filename, modname, themodule)
193 self.save_function_tuple(obj)
AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]