[
https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391728#comment-15391728
]
Shea Parkes commented on SPARK-12261:
-------------------------------------
Alright, I've been spending time off and on for a week on this. I think I
better understand what's going on, but don't yet really have it nailed down.
I'm going to try and write down what I understand is going on here to get my
thoughts in order. I've been focusing on {{branch-1.6}} since that's what we
have in production.
When calling {{RDD.take()}} in {{rdd.py}}, it does not push the request down
to a ~scala implementation. Instead, it defines a closure/generator
({{takeUpToNumLeft()}}) and pushes that into a {{RDD.mapPartitions()}}.
{{takeUpToNumLeft()}} does **not** exhaust the iterator that it is given; it
yields only as many items as requested and then exits.
Next is the interplay between {{worker.py}} and {{PythonRDD.scala}}. These two
files setup bi-directional streams to each other and communicate some gnarly
state back and forth. The important part is what happens in {{worker.py}} when
the provided generator (i.e. {{takeUpToNumLeft()}} does not exhaust the stream
of data being provided by {{PythonRDD.scala}}. When this happens,
{{worker.py}} sends a {{SpecialLengths.END_OF_DATA_SECTION}}, followed by any
accumulators, and then sends a **second**
{{SpecialLengths.END_OF_DATA_SECTION}} and then kills itself.
I'm much better at Python than Scala, so I'm now trying to understand what
happens when {{PythonRDD.scala}} receives that second
{{SpecialLengths.END_OF_DATA_SECTION}}. In particular, I don't see anywhere
that {{PythonRDD.scala}} would treat the second
{{SpecialLengths.END_OF_DATA_SECTION}} any different. This means that
{{PythonRDD.scala}} would go on to try and read the accumulator information
from the stream again, but {{worker.py}} would have already exited, so it
doesn't find anything.
{{PythonRDD.scala}} actually fails when trying to **send** data as I understand
it though, which is where my understanding of Scala is a little rough. If I'm
assuming the code in {{PythonRDD.scala}} works similar to a Python generator,
then I'm assuming it's acting something like a co-routine and isn't stopping
when it gets the second {{SpecialLengths.END_OF_DATA_SECTION}}.
This still doesn't explain why this works 95% of the time for us (i.e. we only
get intermittent failures).
Also another track to chase is that Windows is treated differently in
{{PythonWorkerFactory.scala}}, so I'm occasionally trying to figure out if
{{worker.py}} being ran as a subprocess makes any difference...
So I'm going to try and spend more time with understanding what triggers more
data emission from {{PythonRDD.scala}} and why it keeps emitting after a second
{{SpecialLengths.END_OF_DATA_SECTION}}.
A simple band-aid would be to alter {{takeUpToNumLeft()}} in {{rdd.py}} to
exhaust the iterator provided, but I'm not sure that's a root cause fix. Was
it intentional to allow {{RDD.mapPartitions()}} to accept generators that did
not exhaust their streams?
> pyspark crash for large dataset
> -------------------------------
>
> Key: SPARK-12261
> URL: https://issues.apache.org/jira/browse/SPARK-12261
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 1.5.2
> Environment: windows
> Reporter: zihao
>
> I tried to import a local text(over 100mb) file via textFile in pyspark, when
> i ran data.take(), it failed and gave error messages including:
> 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times;
> aborting job
> Traceback (most recent call last):
> File "E:/spark_python/test3.py", line 9, in <module>
> lines.take(5)
> File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299,
> in take
> res = self.context.runJob(self, takeUpToNumLeft, p)
> File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line
> 916, in runJob
> port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd,
> partitions)
> File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in
> __call__
> answer, self.gateway_client, self.target_id, self.name)
> File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line
> 36, in deco
> return f(*a, **kw)
> File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in
> get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0
> (TID 0, localhost): java.net.SocketException: Connection reset by peer:
> socket write error
> Then i ran the same code for a small text file, this time .take() worked fine.
> How can i solve this problem?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]