[
https://issues.apache.org/jira/browse/SPARK-23961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16784843#comment-16784843
]
Bryan Cutler commented on SPARK-23961:
--------------------------------------
I could also reproduce with a nearly identical error using the following
{code}
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, udf
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.appName("toLocalIterator_Test")\
.getOrCreate()
df = spark.range(1 << 16).select(rand())
it = df.toLocalIterator()
print(next(it))
it = None
time.sleep(5)
spark.stop()
{code}
I think there are a couple issues with the way this is currently working. When
toLocalIterator is called in Python, the Scala side also creates a local
iterator which immediately starts a loop to consume the entire iterator and
write it all to Python without any synchronization with the Python iterator.
Blocking the write operation only happens when the socket receive buffer is
full. Small examples work fine if the data all fits in the read buffer, but
the above code fails because the writing becomes blocked, then the Python
iterator stops reading and closes the connection, which the Scala side sees as
an error. I can work on a fix for this.
> pyspark toLocalIterator throws an exception
> -------------------------------------------
>
> Key: SPARK-23961
> URL: https://issues.apache.org/jira/browse/SPARK-23961
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.0.2, 2.1.2, 2.2.1, 2.3.0
> Reporter: Michel Lemay
> Priority: Minor
> Labels: DataFrame, pyspark
>
> Given a dataframe and use toLocalIterator. If we do not consume all records,
> it will throw:
> {quote}ERROR PythonRDD: Error while sending iterator
> java.net.SocketException: Connection reset by peer: socket write error
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
> at
> org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497)
> at
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
> at
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
> at
> org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705)
> at
> org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
> at
> org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706)
> {quote}
>
> To reproduce, here is a simple pyspark shell script that show the error:
> {quote}import itertools
> df = spark.read.parquet("large parquet folder").cache()
> print(df.count())
> b = df.toLocalIterator()
> print(len(list(itertools.islice(b, 20))))
> b = None # Make the iterator goes out of scope. Throws here.
> {quote}
>
> Observations:
> * Consuming all records do not throw. Taking only a subset of the
> partitions create the error.
> * In another experiment, doing the same on a regular RDD works if we
> cache/materialize it. If we do not cache the RDD, it throws similarly.
> * It works in scala shell
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]