[ 
https://issues.apache.org/jira/browse/SPARK-23560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Robbins updated SPARK-23560:
----------------------------------
    Description: 
Depending on the size of the input, a joinWith followed by a groupBy requires 
more shuffles than a join followed by a groupBy.

For example, here's a joinWith on two CSV files, followed by a groupBy:
{noformat}
import org.apache.spark.sql.types._
val schema = StructType(StructField("id1", LongType) :: StructField("id2", 
LongType) :: Nil)

val df1 = spark.read.schema(schema).csv("ds1.csv")
val df2 = spark.read.schema(schema).csv("ds2.csv")

val result1 = df1.joinWith(df2, df1.col("id1") === 
df2.col("id2")).groupBy("_1.id1").count

result1.explain
== Physical Plan ==
*(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)])
+- Exchange hashpartitioning(_1#8.id1#19L, 200)
   +- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], 
functions=[partial_count(1)])
      +- *(5) Project [_1#8]
         +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner
            :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(_1#8.id1, 200)
            :     +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS 
_1#8]
            :        +- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, 
Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: 
[], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
            +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(_2#9.id2, 200)
                  +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS 
_2#9]
                     +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, 
Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: 
[], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
{noformat}
Using join, there is one less shuffle:
{noformat}
val result2 = df1.join(df2,  df1.col("id1") === 
df2.col("id2")).groupBy(df1("id1")).count

result2.explain
== Physical Plan ==
*(5) HashAggregate(keys=[id1#0L], functions=[count(1)])
+- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)])
   +- *(5) Project [id1#0L]
      +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner
         :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id1#0L, 200)
         :     +- *(1) Project [id1#0L]
         :        +- *(1) Filter isnotnull(id1#0L)
         :           +- *(1) FileScan csv [id1#0L] Batched: false, Format: CSV, 
Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], 
PushedFilters: [IsNotNull(id1)], ReadSchema: struct<id1:bigint>
         +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id2#5L, 200)
               +- *(3) Project [id2#5L]
                  +- *(3) Filter isnotnull(id2#5L)
                     +- *(3) FileScan csv [id2#5L] Batched: false, Format: CSV, 
Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], 
PushedFilters: [IsNotNull(id2)], ReadSchema: struct<id2:bigint>
{noformat}
T-he extra exchange is reflected in the run time of the query.- Actually, I 
recant this bit. In my particular tests, the extra exchange has negligible 
impact on run time. All the difference is in stage 2.

My tests were on inputs with more than 2 million records.

  was:
Depending on the size of the input, a joinWith followed by a groupBy requires 
more shuffles than a join followed by a groupBy.

For example, here's a joinWith on two CSV files, followed by a groupBy:
{noformat}
import org.apache.spark.sql.types._
val schema = StructType(StructField("id1", LongType) :: StructField("id2", 
LongType) :: Nil)

val df1 = spark.read.schema(schema).csv("ds1.csv")
val df2 = spark.read.schema(schema).csv("ds2.csv")

val result1 = df1.joinWith(df2, df1.col("id1") === 
df2.col("id2")).groupBy("_1.id1").count

result1.explain
== Physical Plan ==
*(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)])
+- Exchange hashpartitioning(_1#8.id1#19L, 200)
   +- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], 
functions=[partial_count(1)])
      +- *(5) Project [_1#8]
         +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner
            :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(_1#8.id1, 200)
            :     +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS 
_1#8]
            :        +- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, 
Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: 
[], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
            +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(_2#9.id2, 200)
                  +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS 
_2#9]
                     +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, 
Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: 
[], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
{noformat}
Using join, there is one less shuffle:
{noformat}
val result2 = df1.join(df2,  df1.col("id1") === 
df2.col("id2")).groupBy(df1("id1")).count

result2.explain
== Physical Plan ==
*(5) HashAggregate(keys=[id1#0L], functions=[count(1)])
+- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)])
   +- *(5) Project [id1#0L]
      +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner
         :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id1#0L, 200)
         :     +- *(1) Project [id1#0L]
         :        +- *(1) Filter isnotnull(id1#0L)
         :           +- *(1) FileScan csv [id1#0L] Batched: false, Format: CSV, 
Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], 
PushedFilters: [IsNotNull(id1)], ReadSchema: struct<id1:bigint>
         +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id2#5L, 200)
               +- *(3) Project [id2#5L]
                  +- *(3) Filter isnotnull(id2#5L)
                     +- *(3) FileScan csv [id2#5L] Batched: false, Format: CSV, 
Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], 
PushedFilters: [IsNotNull(id2)], ReadSchema: struct<id2:bigint>
{noformat}
The extra exchange is reflected in the run time of the query.

My tests were on inputs with more than 2 million records.


> A joinWith followed by groupBy requires extra shuffle
> -----------------------------------------------------
>
>                 Key: SPARK-23560
>                 URL: https://issues.apache.org/jira/browse/SPARK-23560
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0
>         Environment: debian 8.9, macos x high sierra
>            Reporter: Bruce Robbins
>            Priority: Major
>
> Depending on the size of the input, a joinWith followed by a groupBy requires 
> more shuffles than a join followed by a groupBy.
> For example, here's a joinWith on two CSV files, followed by a groupBy:
> {noformat}
> import org.apache.spark.sql.types._
> val schema = StructType(StructField("id1", LongType) :: StructField("id2", 
> LongType) :: Nil)
> val df1 = spark.read.schema(schema).csv("ds1.csv")
> val df2 = spark.read.schema(schema).csv("ds2.csv")
> val result1 = df1.joinWith(df2, df1.col("id1") === 
> df2.col("id2")).groupBy("_1.id1").count
> result1.explain
> == Physical Plan ==
> *(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)])
> +- Exchange hashpartitioning(_1#8.id1#19L, 200)
>    +- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], 
> functions=[partial_count(1)])
>       +- *(5) Project [_1#8]
>          +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner
>             :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0
>             :  +- Exchange hashpartitioning(_1#8.id1, 200)
>             :     +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS 
> _1#8]
>             :        +- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, 
> Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
>             +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0
>                +- Exchange hashpartitioning(_2#9.id2, 200)
>                   +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS 
> _2#9]
>                      +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, 
> Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
> {noformat}
> Using join, there is one less shuffle:
> {noformat}
> val result2 = df1.join(df2,  df1.col("id1") === 
> df2.col("id2")).groupBy(df1("id1")).count
> result2.explain
> == Physical Plan ==
> *(5) HashAggregate(keys=[id1#0L], functions=[count(1)])
> +- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)])
>    +- *(5) Project [id1#0L]
>       +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner
>          :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0
>          :  +- Exchange hashpartitioning(id1#0L, 200)
>          :     +- *(1) Project [id1#0L]
>          :        +- *(1) Filter isnotnull(id1#0L)
>          :           +- *(1) FileScan csv [id1#0L] Batched: false, Format: 
> CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], 
> PushedFilters: [IsNotNull(id1)], ReadSchema: struct<id1:bigint>
>          +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0
>             +- Exchange hashpartitioning(id2#5L, 200)
>                +- *(3) Project [id2#5L]
>                   +- *(3) Filter isnotnull(id2#5L)
>                      +- *(3) FileScan csv [id2#5L] Batched: false, Format: 
> CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], 
> PushedFilters: [IsNotNull(id2)], ReadSchema: struct<id2:bigint>
> {noformat}
> T-he extra exchange is reflected in the run time of the query.- Actually, I 
> recant this bit. In my particular tests, the extra exchange has negligible 
> impact on run time. All the difference is in stage 2.
> My tests were on inputs with more than 2 million records.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to