[ 
https://issues.apache.org/jira/browse/SPARK-27548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16829861#comment-16829861
 ] 

Bryan Cutler edited comment on SPARK-27548 at 4/30/19 12:46 AM:
----------------------------------------------------------------

This is not that easy to fix by itself. Since there is no call from Py4J after 
the initial socket is setup, when a task fails the Thread serving the iterator 
to python just terminates and the serializer stops. This ends up giving partial 
results that look fine because the error is never seen by the Python driver.

Because the serving thread is being run asynchronously, the exception would 
have to be caught and then be transferred to Python after being joined. This 
would require lots of changes, but is pretty easy to do with the changes from 
SPARK-23961, so I will add the fix there.


was (Author: bryanc):
This is not that easy to fix by itself. Since there is no call from Py4J after 
the initial socket is setup, when a task fails the Thread serving the iterator 
to python just terminates and the serializer stops, so the error is never seen 
by the Python driver. Because the serving thread is being run asynchronously, 
the exception would have to be caught and then be transferred to Python after 
being joined. This would require lots of changes, but is pretty easy to do with 
the changes from SPARK-23961, so I will add the fix there.

> PySpark toLocalIterator does not raise errors from worker
> ---------------------------------------------------------
>
>                 Key: SPARK-27548
>                 URL: https://issues.apache.org/jira/browse/SPARK-27548
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.1
>            Reporter: Bryan Cutler
>            Priority: Major
>
> When using a PySpark RDD local iterator and an error occurs on the worker, it 
> is not picked up by Py4J and raised in the Python driver process. So unless 
> looking at logs, there is no way for the application to know the worker had 
> an error. This is a test that should pass if the error is raised in the 
> driver:
> {code}
> def test_to_local_iterator_error(self):
>     def fail(_):
>         raise RuntimeError("local iterator error")
>     rdd = self.sc.parallelize(range(10)).map(fail)
>     with self.assertRaisesRegexp(Exception, "local iterator error"):
>         for _ in rdd.toLocalIterator():
>             pass{code}
> but it does not raise an exception:
> {noformat}
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last):
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
> 428, in main
>     process()
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
> 423, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 505, in dump_stream
>     vs = list(itertools.islice(iterator, batch))
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/util.py", line 
> 99, in wrapper
>     return f(*args, **kwargs)
>   File "/home/bryan/git/spark/python/pyspark/tests/test_rdd.py", line 742, in 
> fail
>     raise RuntimeError("local iterator error")
> RuntimeError: local iterator error
>     at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:453)
> ...
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> FAIL
> ======================================================================
> FAIL: test_to_local_iterator_error (pyspark.tests.test_rdd.RDDTests)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File "/home/bryan/git/spark/python/pyspark/tests/test_rdd.py", line 748, in 
> test_to_local_iterator_error
>     pass
> AssertionError: Exception not raised{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to