[ https://issues.apache.org/jira/browse/SPARK-791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh Rosen updated SPARK-791: ----------------------------- Affects Version/s: 1.0.0 Fix Version/s: 1.0.3 0.9.3 1.1.0 > [pyspark] operator.getattr not serialized > ----------------------------------------- > > Key: SPARK-791 > URL: https://issues.apache.org/jira/browse/SPARK-791 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 0.7.2, 0.9.0, 1.0.0 > Reporter: Jim Blomo > Assignee: Davies Liu > Priority: Minor > Fix For: 1.1.0, 0.9.3, 1.0.3 > > > Using operator.itemgetter as a function in map seems to confuse the > serialization process in pyspark. I'm using itemgetter to return tuples, > which fails with a TypeError (details below). Using an equivalent lambda > function returns the correct result. > Use a test file: > {code:sh} > echo 1,1 > test.txt > {code} > Then try mapping it to a tuple: > {code:python} > import csv > sc.textFile("test.txt").mapPartitions(csv.reader).map(lambda l: > (l[0],l[1])).first() > Out[7]: ('1', '1') > {code} > But this does not work when using operator.itemgetter: > {code:python} > import operator > sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first() > # TypeError: list indices must be integers, not tuple > {code} > This is running with git master, commit > 6d60fe571a405eb9306a2be1817901316a46f892 > IPython 0.13.2 > java version "1.7.0_25" > Scala code runner version 2.9.1 > Ubuntu 12.04 > Full debug output: > {code:python} > In [9]: > sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first() > 13/07/04 16:19:49 INFO storage.MemoryStore: ensureFreeSpace(33632) called > with curMem=201792, maxMem=339585269 > 13/07/04 16:19:49 INFO storage.MemoryStore: Block broadcast_6 stored as > values to memory (estimated size 32.8 KB, free 323.6 MB) > 13/07/04 16:19:49 INFO mapred.FileInputFormat: Total input paths to process : > 1 > 13/07/04 16:19:49 INFO spark.SparkContext: Starting job: takePartition at > NativeMethodAccessorImpl.java:-2 > 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Got job 4 (takePartition at > NativeMethodAccessorImpl.java:-2) with 1 output partitions (allowLocal=true) > 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Final stage: Stage 4 > (PythonRDD at NativeConstructorAccessorImpl.java:-2) > 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Parents of final stage: List() > 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Missing parents: List() > 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Computing the requested > partition locally > 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Failed to run takePartition at > NativeMethodAccessorImpl.java:-2 > --------------------------------------------------------------------------- > Py4JJavaError Traceback (most recent call last) > <ipython-input-9-1fdb3e7a8ac7> in <module>() > ----> 1 > sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first() > /home/jim/src/spark/python/pyspark/rdd.pyc in first(self) > 389 2 > 390 """ > --> 391 return self.take(1)[0] > 392 > 393 def saveAsTextFile(self, path): > /home/jim/src/spark/python/pyspark/rdd.pyc in take(self, num) > 372 items = [] > 373 for partition in range(self._jrdd.splits().size()): > --> 374 iterator = self.ctx._takePartition(self._jrdd.rdd(), > partition) > 375 # Each item in the iterator is a string, Python object, > batch of > 376 # Python objects. Regardless, it is sufficient to take > `num` > /home/jim/src/spark/python/lib/py4j0.7.egg/py4j/java_gateway.pyc in > __call__(self, *args) > 498 answer = self.gateway_client.send_command(command) > 499 return_value = get_return_value(answer, self.gateway_client, > --> 500 self.target_id, self.name) > 501 > 502 for temp_arg in temp_args: > /home/jim/src/spark/python/lib/py4j0.7.egg/py4j/protocol.pyc in > get_return_value(answer, gateway_client, target_id, name) > 298 raise Py4JJavaError( > 299 'An error occurred while calling {0}{1}{2}.\n'. > --> 300 format(target_id, '.', name), value) > 301 else: > 302 raise Py4JError( > Py4JJavaError: An error occurred while calling > z:spark.api.python.PythonRDD.takePartition. > : spark.api.python.PythonException: Traceback (most recent call last): > File "/home/jim/src/spark/python/pyspark/worker.py", line 53, in main > for obj in func(split_index, iterator): > File "/home/jim/src/spark/python/pyspark/serializers.py", line 24, in > batched > for item in iterator: > TypeError: list indices must be integers, not tuple > at spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:117) > at spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:139) > at spark.api.python.PythonRDD.compute(PythonRDD.scala:82) > at spark.RDD.computeOrReadCheckpoint(RDD.scala:232) > at spark.RDD.iterator(RDD.scala:221) > at > spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:423) > at spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:410) > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)