[ 
https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15144734#comment-15144734
 ] 

Niall McCarroll commented on SPARK-12261:
-----------------------------------------

As a workaround you might try the following change to python/pyspark/worker.py

Add the following 2 lines to the end of the process function defined inside the 
main function

for obj in iterator:
     pass

... so the process function now looks like this (in spark 1.5.2 at least):
      
     def process():
            iterator = deserializer.load_stream(infile)
            serializer.dump_stream(func(split_index, iterator), outfile)
            for obj in iterator:
                pass

After the change you will need to rebuild your pyspark.zip in the python/lib 
folder to include the change.

The issue may be that the worker process is completing before the executor has 
written all the data to it.  The thread writing the data down the socket throws 
an exception and if this happens before the executor marks the task as complete 
it will cause trouble.  The idea is to try to get the worker to pull all the 
data from the executor even if its not needed to lazily compute the output.  
This is very inefficient of course so it is a workaround rather than a proper 
solution.  



> pyspark crash for large dataset
> -------------------------------
>
>                 Key: SPARK-12261
>                 URL: https://issues.apache.org/jira/browse/SPARK-12261
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.5.2
>         Environment: windows
>            Reporter: zihao
>
> I tried to import a local text(over 100mb) file via textFile in pyspark, when 
> i ran data.take(), it failed and gave error messages including:
> 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Traceback (most recent call last):
>   File "E:/spark_python/test3.py", line 9, in <module>
>     lines.take(5)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, 
> in take
>     res = self.context.runJob(self, takeUpToNumLeft, p)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line 
> 916, in runJob
>     port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
>   File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in 
> __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line 
> 36, in deco
>     return f(*a, **kw)
>   File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in 
> get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
> (TID 0, localhost): java.net.SocketException: Connection reset by peer: 
> socket write error
> Then i ran the same code for a small text file, this time .take() worked fine.
> How can i solve this problem?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to