Nate Crosswhite created SPARK-7613:
--------------------------------------

             Summary: Serialization fails in pyspark for lambdas referencing 
class data members
                 Key: SPARK-7613
                 URL: https://issues.apache.org/jira/browse/SPARK-7613
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 1.3.1, 1.2.0
         Environment: Python 2.7.6, Java 8
            Reporter: Nate Crosswhite


The following code snippet works in pyspark 1.1.0, but fails post 1.2 with the 
indicated error.  It appears the failure is caused by cloudpickler attempting 
to serialize the second lambda function twice.

## Begin PySpark code
class LambdaFine():
    def __init__(self, exp):
        self.exp = exp
        self.f_function = (lambda x: x**exp)

class LambdaFail():
    def __init__(self, exp):
        self.exp = exp
        self.f_function = (lambda x: x**self.exp)

rdd = sc.parallelize(range(0,10))
print 'LambdaFine:', rdd.map(LambdaFine(2).f_function).collect()  # works
print 'LambdaFail:', rdd.map(LambdaFail(2).f_function).collect() # fails in 
spark 1.2+
### End PySpark code

### Output:

LambdaFine: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
LambdaFail:
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/rdd.py", line 
745, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/rdd.py", line 
2345, in _jrdd
    pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, 
command, self)
  File "<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/rdd.py", line 
2265, in _prepare_for_python_RDD
    pickled_command = ser.dumps((command, sys.version_info[:2]))
  File 
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/serializers.py", line 
427, in dumps
    return cloudpickle.dumps(obj, 2)
  File 
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line 
622, in dumps
    cp.dump(obj)
  File 
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line 
107, in dump
    return Pickler.dump(self, obj)
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 224, in dump
    self.save(obj)
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 548, in save_tuple
    save(element)
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 562, in save_tuple
    save(element)
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line 
199, in save_function
    self.save_function_tuple(obj)
  File 
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line 
236, in save_function_tuple
    save((code, closure, base_globals))
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 548, in save_tuple
    save(element)
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 600, in save_list
    self._batch_appends(iter(obj))
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 636, in _batch_appends
    save(tmp[0])
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line 
193, in save_function
    self.save_function_tuple(obj)
  File 
"<home>/spark-1.4.0-SNAPSHOT-bin-4abf285f/python/pyspark/cloudpickle.py", line 
238, in save_function_tuple
    self.memoize(func)
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
 line 244, in memoize
    assert id(obj) not in self.memo
AssertionError





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to