Hello,

`itertools.groupby` is evaluated lazily and the `g`s in your code are
generators not lists. This might cause your problem. Casting everything to
lists might help here, e.g.:

  grp2 = [(k, list(g)) for k,g in groupby(grp1, lambda e: e[1])]

HTH

Eike



2016-08-05 7:31 GMT+02:00 林家銘 <robin890...@gmail.com>:

> Hi
> I wrote a map function to aggregate data in a partition, and this function
> using  itertools.groupby for more than twice, then there comes the pickle
> error .
>
> Here is what I do
>
> ===Driver Code===
> pair_count = df.mapPartitions(lambda iterable: pair_func_cnt(iterable))
> pair_count.collection()
>
> ===Map Function ===
> def pair_func_cnt(iterable):
>     from itertools import groupby
>
>     ls = [[1,2,3],[1,2,5],[1,3,5],[2,4,6]]
>     grp1 = [(k,g) for k,g in groupby(ls, lambda e: e[0])]
>     grp2 = [(k,g) for k,g in groupby(grp1, lambda e: e[1])]
>     return iter(grp2)
>
> ===Error Message===
>
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last):
>   File 
> "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/worker.py",
>  line 111, in main
>     process()
>   File 
> "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/worker.py",
>  line 106, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File 
> "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/serializers.py",
>  line 267, in dump_stream
>     bytes = self.serializer.dumps(vs)
>   File 
> "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/serializers.py",
>  line 415, in dumps
>     return pickle.dumps(obj, protocol)PicklingError: Can't pickle <type 
> 'itertools._grouper'>: attribute lookup itertools._grouper failed
>     at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
>     at 
> org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
>     at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>     at org.apache.spark.scheduler.Task.run(Task.scala:89)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     ... 1 more
>
>

Reply via email to