[ https://issues.apache.org/jira/browse/SPARK-18649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207217#comment-16207217 ]
Frank Rosner commented on SPARK-18649: -------------------------------------- Looks like in SPARK-21551 they increased the hard coded limit to 15 seconds. > sc.textFile(my_file).collect() raises socket.timeout on large files > ------------------------------------------------------------------- > > Key: SPARK-18649 > URL: https://issues.apache.org/jira/browse/SPARK-18649 > Project: Spark > Issue Type: Bug > Components: PySpark > Environment: PySpark version 1.6.2 > Reporter: Erik Cederstrand > > I'm trying to load a file into the driver with this code: > contents = sc.textFile('hdfs://path/to/big_file.csv').collect() > Loading into the driver instead of creating a distributed RDD is intentional > in this case. The file is ca. 6GB, and I have adjusted driver memory > accordingly to fit the local data. After some time, my spark/submitted job > crashes with the stack trace below. > I have traced this to pyspark/rdd.py where the _load_from_socket() method > creates a socket with a hard-coded timeout of 3 seconds (this code is also > present in HEAD although I'm on PySpark 1.6.2). Raising this hard-coded value > to e.g. 600 lets me read the entire file. > Is there any reason that this value does not use e.g. the > 'spark.network.timeout' setting instead? > Traceback (most recent call last): > File "my_textfile_test.py", line 119, in <module> > contents = sc.textFile('hdfs://path/to/file.csv').collect() > File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", > line 772, in collect > File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", > line 142, in _load_from_socket > File > "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", > line 517, in load_stream > File > "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", > line 511, in loads > File "/usr/lib/python2.7/socket.py", line 380, in read > data = self._sock.recv(left) > socket.timeout: timed out > 16/11/30 13:33:14 WARN Utils: Suppressing exception in finally: Broken pipe > java.net.SocketException: Broken pipe > at java.net.SocketOutputStream.socketWrite0(Native Method) > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) > at java.net.SocketOutputStream.write(SocketOutputStream.java:153) > at > java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) > at java.io.DataOutputStream.flush(DataOutputStream.java:123) > at java.io.FilterOutputStream.close(FilterOutputStream.java:158) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:650) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1248) > at > org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649) > Suppressed: java.net.SocketException: Broken pipe > at java.net.SocketOutputStream.socketWrite0(Native Method) > at > java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) > at > java.net.SocketOutputStream.write(SocketOutputStream.java:153) > at > java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at > java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) > at java.io.FilterOutputStream.close(FilterOutputStream.java:158) > at java.io.FilterOutputStream.close(FilterOutputStream.java:159) > ... 3 more > 16/11/30 13:33:14 ERROR PythonRDD: Error while sending iterator > java.net.SocketException: Connection reset > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) > at java.net.SocketOutputStream.write(SocketOutputStream.java:153) > at > java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at java.io.DataOutputStream.write(DataOutputStream.java:107) > at java.io.FilterOutputStream.write(FilterOutputStream.java:97) > at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622) > at > org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:648) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239) > at > org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649) > Suppressed: java.net.SocketException: Broken pipe > at java.net.SocketOutputStream.socketWrite0(Native Method) > at > java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) > at > java.net.SocketOutputStream.write(SocketOutputStream.java:153) > at > java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at > java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) > at java.io.DataOutputStream.flush(DataOutputStream.java:123) > at java.io.FilterOutputStream.close(FilterOutputStream.java:158) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:650) > at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1248) > ... 1 more > Suppressed: java.net.SocketException: Broken pipe > at java.net.SocketOutputStream.socketWrite0(Native > Method) > at > java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) > at > java.net.SocketOutputStream.write(SocketOutputStream.java:153) > at > java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at > java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) > at > java.io.FilterOutputStream.close(FilterOutputStream.java:158) > at > java.io.FilterOutputStream.close(FilterOutputStream.java:159) > ... 3 more -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org