[
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nicholas Chammas updated SPARK-18367:
-------------------------------------
Description:
I have a moderately complex DataFrame query that spawns north of 10K open
files, causing my job to crash. 10K is the macOS limit on how many files a
single process can have open at once. It seems unreasonable that Spark should
hold that many files open at once.
I was able to boil down what I'm seeing to the following minimal reproduction:
{code}
import pyspark
from pyspark.sql import Row
if __name__ == '__main__':
spark = pyspark.sql.SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
Row(a=n)
for n in range(500000)
]).coalesce(1) # a coalesce(1) here "fixes" the problem
df = df.join(df, on='a') #.coalesce(1) # a coalesce(1) here doesn't help
print('parititons:', df.rdd.getNumPartitions())
df.explain()
df.show(1)
{code}
When I run this code, Spark spawns over 2K open files. I can "fix" the problem
by adding a {{coalesce(1)}} in the right place, as indicated in the comments
above. When I do, Spark spawns no more than 600 open files. The number of
partitions without the coalesce is 200.
Here are the execution plans with and without the coalesce.
2K+ open files, without the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
:- *Sort [a#0L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(a#0L, 200)
: +- *Filter isnotnull(a#0L)
: +- Scan ExistingRDD[a#0L]
+- *Sort [a#3L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(a#3L, 200)
+- *Filter isnotnull(a#3L)
+- Scan ExistingRDD[a#3L]
{code}
<600 open files, with the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#4L], Inner
:- *Sort [a#0L ASC NULLS FIRST], false, 0
: +- Coalesce 1
: +- *Filter isnotnull(a#0L)
: +- Scan ExistingRDD[a#0L]
+- *Sort [a#4L ASC NULLS FIRST], false, 0
+- Coalesce 1
+- *Filter isnotnull(a#4L)
+- Scan ExistingRDD[a#4L]
{code}
So the key difference appears to be the {{Exchange hashpartitioning(a#0L,
200)}} operator.
Is the large number of open files perhaps expected given the join on a large
number of distinct keys? If so, how would one mitigate that issue? If not, is
this a bug in Spark?
was:
I have a complex DataFrame query that fails to run normally but succeeds if I
add a dummy {{limit()}} upstream in the query tree.
The failure presents itself like this:
{code}
ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes
to file
/private/var/folders/f5/t48vxz555b51mr3g6jjhxv400000gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
java.io.FileNotFoundException:
/private/var/folders/f5/t48vxz555b51mr3g6jjhxv400000gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
(Too many open files in system)
{code}
My {{ulimit -n}} is already set to 10,000, and I can't set it much higher on
macOS. However, I don't think that's the issue, since if I add a dummy
{{limit()}} early on the query tree -- dummy as in it does not actually reduce
the number of rows queried -- then the same query works.
I've diffed the physical query plans to see what this {{limit()}} is actually
doing, and the diff is as follows:
{code}
diff plan-with-limit.txt plan-without-limit.txt
24,28c24
< : : : +- *GlobalLimit 1000000
< : : : +- Exchange SinglePartition
< : : : +- *LocalLimit 1000000
< : : : +- *Project [...]
< : : : +- *Scan orc [...]
Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<...
---
> : : : +- *Scan orc [...] Format: ORC,
> InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct<...
49,53c45
< : : +- *GlobalLimit 1000000
< : : +- Exchange SinglePartition
< : : +- *LocalLimit 1000000
< : : +- *Project [...]
< : : +- *Scan orc [...]
Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<...
---
> : : +- *Scan orc [] Format: ORC,
> InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct<...
{code}
Does this give any clues as to why this {{limit()}} is helping? Again, the
1000000 limit you can see in the plan is much higher than the cardinality of
the dataset I'm reading, so there is no theoretical impact on the output. You
can see the full query plans attached to this ticket.
Unfortunately, I don't have a minimal reproduction for this issue, but I can
work towards one with some clues.
I'm seeing this behavior on 2.0.1 and on master at commit
{{26e1c53aceee37e3687a372ff6c6f05463fd8a94}}.
> DataFrame join spawns unreasonably high number of open files
> ------------------------------------------------------------
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
> Reporter: Nicholas Chammas
>
> I have a moderately complex DataFrame query that spawns north of 10K open
> files, causing my job to crash. 10K is the macOS limit on how many files a
> single process can have open at once. It seems unreasonable that Spark should
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(500000)
> ]).coalesce(1) # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a') #.coalesce(1) # a coalesce(1) here doesn't help
> print('parititons:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the
> comments above. When I do, Spark spawns no more than 600 open files. The
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
> :- *Sort [a#0L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(a#0L, 200)
> : +- *Filter isnotnull(a#0L)
> : +- Scan ExistingRDD[a#0L]
> +- *Sort [a#3L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(a#3L, 200)
> +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
> :- *Sort [a#0L ASC NULLS FIRST], false, 0
> : +- Coalesce 1
> : +- *Filter isnotnull(a#0L)
> : +- Scan ExistingRDD[a#0L]
> +- *Sort [a#4L ASC NULLS FIRST], false, 0
> +- Coalesce 1
> +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L,
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large
> number of distinct keys? If so, how would one mitigate that issue? If not, is
> this a bug in Spark?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]