Davies Liu created SPARK-6886:
---------------------------------

             Summary: Big closure in PySpark will fail during shuffle
                 Key: SPARK-6886
                 URL: https://issues.apache.org/jira/browse/SPARK-6886
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 1.3.0, 1.2.1, 1.4.0
            Reporter: Davies Liu
            Assignee: Davies Liu
            Priority: Blocker


Reported by  beifei.zhou <beifei.zhou at ximalaya.com>: 

I am using spark to process bid datasets. However, there is always problem when 
executing reduceByKey on a large dataset, whereas with a smaller dataset.  May 
I asked you how could I solve this issue?

The error is always like this:
{code}
15/04/09 11:27:46 ERROR Executor: Exception in task 3.0 in stage 1.0 (TID 5)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/nali/Softwares/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/Users/nali/Softwares/spark/python/pyspark/broadcast.py", line 106, in 
value
    self._value = self.load(self._path)
  File "/Users/nali/Softwares/spark/python/pyspark/broadcast.py", line 87, in 
load
    with open(path, 'rb', 1 << 20) as f:
IOError: [Errno 2] No such file or directory: 
'/private/var/folders/_x/n59vb1b54pl96lvldz2lr_v40000gn/T/spark-37d8ecbc-9ac9-4aa2-be23-12823f4cd1ed/pyspark-1e3d5904-a5b6-4222-a146-91bfdb4a33a7/tmp8XMhgG'
{code}

Here I attach my code:
{code}
import codecs
from pyspark import SparkContext, SparkConf
from operator import add 
import operator
from pyspark.storagelevel import StorageLevel

def combine_dict(a,b):
    a.update(b)
    return a
conf = SparkConf()
sc = SparkContext(appName = "tag")
al_tag_dict = sc.textFile('albumtag.txt').map(lambda x: 
x.split(',')).map(lambda x: {x[0]: x[1:]}).reduce(lambda a, b: 
combine_dict(a,b))

result = sc.textFile('uidAlbumscore.txt')\
        .map(lambda x: x.split(','))\
        .filter(lambda x: x[1] in al_tag_dict.keys())\
        .map(lambda x: (x[0], al_tag_dict[x[1]], float(x[2])))\
        .map(lambda x: map(lambda a: ((x[0], a), x[2]), x[1]))\
        .flatMap(lambda x: x)\ 
        .map(lambda x: (str(x[0][0]), x[1]))\
        .reduceByKey(add)\
#        .map(lambda x: x[0][0]+','+x[0][1]+','+str(x[1])+'\n')\
#        .reduce(add)
#codecs.open('tag_score.txt','w','utf-8').write(result)
print result.first()
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to