Nate Crosswhite created SPARK-7613:
--------------------------------------
Summary: Serialization fails in pyspark for lambdas referencing
class data members
Key: SPARK-7613
URL: https://issues.apache.org/jira/browse/SPARK-7613
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 1.3.1, 1.2.0
Environment: Python 2.7.6, Java 8
Reporter: Nate Crosswhite
The following code snippet works in pyspark 1.1.0, but fails post 1.2 with the
indicated error. It appears the failure is caused by cloudpickler attempting
to serialize the second lambda function twice.
## Begin PySpark code
class LambdaFine():
def __init__(self, exp):
self.exp = exp
self.f_function = (lambda x: x**exp)
class LambdaFail():
def __init__(self, exp):
self.exp = exp
self.f_function = (lambda x: x**self.exp)
rdd = sc.parallelize(range(0,10))
print 'LambdaFine:', rdd.map(LambdaFine(2).f_function).collect() # works
print 'LambdaFail:', rdd.map(LambdaFail(2).f_function).collect() # fails in
spark 1.2+
### End PySpark code
### Output:
LambdaFine: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
LambdaFail:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/rdd.py", line
745, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/rdd.py", line
2345, in _jrdd
pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx,
command, self)
File "<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/rdd.py", line
2265, in _prepare_for_python_RDD
pickled_command = ser.dumps((command, sys.version_info[:2]))
File
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/serializers.py", line
427, in dumps
return cloudpickle.dumps(obj, 2)
File
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line
622, in dumps
cp.dump(obj)
File
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line
107, in dump
return Pickler.dump(self, obj)
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 224, in dump
self.save(obj)
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 286, in save
f(self, obj) # Call unbound method with explicit self
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 548, in save_tuple
save(element)
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 286, in save
f(self, obj) # Call unbound method with explicit self
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 562, in save_tuple
save(element)
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 286, in save
f(self, obj) # Call unbound method with explicit self
File
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line
199, in save_function
self.save_function_tuple(obj)
File
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line
236, in save_function_tuple
save((code, closure, base_globals))
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 286, in save
f(self, obj) # Call unbound method with explicit self
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 548, in save_tuple
save(element)
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 286, in save
f(self, obj) # Call unbound method with explicit self
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 600, in save_list
self._batch_appends(iter(obj))
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 636, in _batch_appends
save(tmp[0])
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 286, in save
f(self, obj) # Call unbound method with explicit self
File
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line
193, in save_function
self.save_function_tuple(obj)
File
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line
238, in save_function_tuple
self.memoize(func)
File
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 244, in memoize
assert id(obj) not in self.memo
AssertionError
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]