[ https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15656228#comment-15656228 ]
Nicholas Chammas commented on SPARK-18367: ------------------------------------------ Tomorrow I'll try running this on a Linux VM. Maybe this is specific to macOS. > DataFrame join spawns unreasonably high number of open files > ------------------------------------------------------------ > > Key: SPARK-18367 > URL: https://issues.apache.org/jira/browse/SPARK-18367 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.1, 2.1.0 > Environment: Python 3.5, Java 8 > Reporter: Nicholas Chammas > Attachments: spark-lsof.txt > > > I have a moderately complex DataFrame query that spawns north of 10K open > files, causing my job to crash. 10K is the macOS limit on how many files a > single process can have open at once. It seems unreasonable that Spark should > hold that many files open at once. > I was able to boil down what I'm seeing to the following minimal reproduction: > {code} > import pyspark > from pyspark.sql import Row > if __name__ == '__main__': > spark = pyspark.sql.SparkSession.builder.getOrCreate() > df = spark.createDataFrame([ > Row(a=n) > for n in range(500000) > ]) #.coalesce(1) # a coalesce(1) here "fixes" the problem > df = df.join(df, on='a') #.coalesce(1) # a coalesce(1) here doesn't help > print('partitions:', df.rdd.getNumPartitions()) > df.explain() > df.show(1) > {code} > When I run this code, Spark spawns over 2K open files. I can "fix" the > problem by adding a {{coalesce(1)}} in the right place, as indicated in the > comments above. When I do, Spark spawns no more than 600 open files. The > number of partitions without the coalesce is 200. > Here are the execution plans with and without the coalesce. > 2K+ open files, without the coalesce: > {code} > == Physical Plan == > *Project [a#0L] > +- *SortMergeJoin [a#0L], [a#3L], Inner > :- *Sort [a#0L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(a#0L, 200) > : +- *Filter isnotnull(a#0L) > : +- Scan ExistingRDD[a#0L] > +- *Sort [a#3L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(a#3L, 200) > +- *Filter isnotnull(a#3L) > +- Scan ExistingRDD[a#3L] > {code} > <600 open files, with the coalesce: > {code} > == Physical Plan == > *Project [a#0L] > +- *SortMergeJoin [a#0L], [a#4L], Inner > :- *Sort [a#0L ASC NULLS FIRST], false, 0 > : +- Coalesce 1 > : +- *Filter isnotnull(a#0L) > : +- Scan ExistingRDD[a#0L] > +- *Sort [a#4L ASC NULLS FIRST], false, 0 > +- Coalesce 1 > +- *Filter isnotnull(a#4L) > +- Scan ExistingRDD[a#4L] > {code} > So the key difference appears to be the {{Exchange hashpartitioning(a#0L, > 200)}} operator. > Is the large number of open files perhaps expected given the join on a large > number of distinct keys? If so, how would one mitigate that issue? If not, is > this a bug in Spark? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org