[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15656199#comment-15656199
 ] 

Nicholas Chammas edited comment on SPARK-18367 at 11/11/16 5:30 AM:
--------------------------------------------------------------------

Looks to be 8 partitions before the exchange.

Here's a tweaked repro script to show the number of partitions before and after 
the join:

{code}
import pyspark
from pyspark.sql import Row


if __name__ == '__main__':
    spark = pyspark.sql.SparkSession.builder.getOrCreate()

    df = spark.createDataFrame([
        Row(a=n)
        for n in range(500000)
    ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
    df.explain()
    print('partitions:', df.rdd.getNumPartitions())

    df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
    df.explain()
    print('partitions:', df.rdd.getNumPartitions())

    df.show(1)
{code}

Output:

{code}
== Physical Plan ==
Scan ExistingRDD[a#0L]
partitions: 8
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#0L, 200)
   :     +- *Filter isnotnull(a#0L)
   :        +- Scan ExistingRDD[a#0L]
   +- *Sort [a#3L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#3L, 200)
         +- *Filter isnotnull(a#3L)
            +- Scan ExistingRDD[a#3L]
partitions: 200
16/11/11 00:27:32 WARN TaskSetManager: Stage 0 contains a task of very large 
size (510 KB). The maximum recommended task size is 100 KB.
16/11/11 00:27:33 WARN TaskSetManager: Stage 1 contains a task of very large 
size (510 KB). The maximum recommended task size is 100 KB.
+---+                                                                           
|  a|
+---+
| 26|
+---+
only showing top 1 row
{code}



was (Author: nchammas):
Tweaked repro script to show partitions before and after the join:

{code}
import pyspark
from pyspark.sql import Row


if __name__ == '__main__':
    spark = pyspark.sql.SparkSession.builder.getOrCreate()

    df = spark.createDataFrame([
        Row(a=n)
        for n in range(500000)
    ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
    df.explain()
    print('partitions:', df.rdd.getNumPartitions())

    df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
    df.explain()
    print('partitions:', df.rdd.getNumPartitions())

    df.show(1)
{code}

Output:

{code}
== Physical Plan ==
Scan ExistingRDD[a#0L]
partitions: 8
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#0L, 200)
   :     +- *Filter isnotnull(a#0L)
   :        +- Scan ExistingRDD[a#0L]
   +- *Sort [a#3L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#3L, 200)
         +- *Filter isnotnull(a#3L)
            +- Scan ExistingRDD[a#3L]
partitions: 200
16/11/11 00:27:32 WARN TaskSetManager: Stage 0 contains a task of very large 
size (510 KB). The maximum recommended task size is 100 KB.
16/11/11 00:27:33 WARN TaskSetManager: Stage 1 contains a task of very large 
size (510 KB). The maximum recommended task size is 100 KB.
+---+                                                                           
|  a|
+---+
| 26|
+---+
only showing top 1 row
{code}


> DataFrame join spawns unreasonably high number of open files
> ------------------------------------------------------------
>
>                 Key: SPARK-18367
>                 URL: https://issues.apache.org/jira/browse/SPARK-18367
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.1, 2.1.0
>         Environment: Python 3.5, Java 8
>            Reporter: Nicholas Chammas
>         Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
>     spark = pyspark.sql.SparkSession.builder.getOrCreate()
>     df = spark.createDataFrame([
>         Row(a=n)
>         for n in range(500000)
>     ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
>     df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
>     print('partitions:', df.rdd.getNumPartitions())
>     df.explain()
>     df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>    :- *Sort [a#0L ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(a#0L, 200)
>    :     +- *Filter isnotnull(a#0L)
>    :        +- Scan ExistingRDD[a#0L]
>    +- *Sort [a#3L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(a#3L, 200)
>          +- *Filter isnotnull(a#3L)
>             +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>    :- *Sort [a#0L ASC NULLS FIRST], false, 0
>    :  +- Coalesce 1
>    :     +- *Filter isnotnull(a#0L)
>    :        +- Scan ExistingRDD[a#0L]
>    +- *Sort [a#4L ASC NULLS FIRST], false, 0
>       +- Coalesce 1
>          +- *Filter isnotnull(a#4L)
>             +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to