[ 
https://issues.apache.org/jira/browse/SPARK-23560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390111#comment-16390111
 ] 

Bruce Robbins commented on SPARK-23560:
---------------------------------------

The main issue is that an AttributeReference instance never considers a 
GetStructField instance to be semantically equal, even if they both refer to 
the same field.

When I patched AttributeReference.semanticEquals to allow comparison with a 
GetStructField instance, the extra exchange drops out of the plan. It's not a 
production-quality patch, as the check is rudimentary, and I am not sure two 
such things should ever match. But it does confirm how the extra Exchange is 
getting added to the plan.

In a nutshell, the HashAggregate is getting its output partitionings from the 
Exchange below the Sort in the two branches under the SortMergeJoin in the 
plan, which contains the expressions for both table1.id1 and table2.id2 as an 
GetStructField instances. However, the HashAggregate's distribution (held in 
its parent's requiredChildDistributions field) contains an expression for 
table1.id1 as a AttributeReference reference. When EnsureRequirements tries to 
determine whether to add an Exchange to the plan, the two expressions for 
table1.id don't match (using AttributeReference.semanticEquals), resulting in 
an Exchange getting added.

In the "join" case, the two expressions for table1.id1 are both 
AttributeReference instances, so they match.

As for a fix, I need to explore the planner/optimizer code some more and try to 
determine if two expressions with different implementations should be 
comparable, or maybe if these two expressions should have had the same 
implementation.

> A joinWith followed by groupBy requires extra shuffle
> -----------------------------------------------------
>
>                 Key: SPARK-23560
>                 URL: https://issues.apache.org/jira/browse/SPARK-23560
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0
>         Environment: debian 8.9, macos x high sierra
>            Reporter: Bruce Robbins
>            Priority: Major
>
> Depending on the size of the input, a joinWith followed by a groupBy requires 
> more shuffles than a join followed by a groupBy.
> For example, here's a joinWith on two CSV files, followed by a groupBy:
> {noformat}
> import org.apache.spark.sql.types._
> val schema = StructType(StructField("id1", LongType) :: StructField("id2", 
> LongType) :: Nil)
> val df1 = spark.read.schema(schema).csv("ds1.csv")
> val df2 = spark.read.schema(schema).csv("ds2.csv")
> val result1 = df1.joinWith(df2, df1.col("id1") === 
> df2.col("id2")).groupBy("_1.id1").count
> result1.explain
> == Physical Plan ==
> *(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)])
> +- Exchange hashpartitioning(_1#8.id1#19L, 200)
>    +- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], 
> functions=[partial_count(1)])
>       +- *(5) Project [_1#8]
>          +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner
>             :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0
>             :  +- Exchange hashpartitioning(_1#8.id1, 200)
>             :     +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS 
> _1#8]
>             :        +- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, 
> Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
>             +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0
>                +- Exchange hashpartitioning(_2#9.id2, 200)
>                   +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS 
> _2#9]
>                      +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, 
> Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
> {noformat}
> Using join, there is one less shuffle:
> {noformat}
> val result2 = df1.join(df2,  df1.col("id1") === 
> df2.col("id2")).groupBy(df1("id1")).count
> result2.explain
> == Physical Plan ==
> *(5) HashAggregate(keys=[id1#0L], functions=[count(1)])
> +- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)])
>    +- *(5) Project [id1#0L]
>       +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner
>          :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0
>          :  +- Exchange hashpartitioning(id1#0L, 200)
>          :     +- *(1) Project [id1#0L]
>          :        +- *(1) Filter isnotnull(id1#0L)
>          :           +- *(1) FileScan csv [id1#0L] Batched: false, Format: 
> CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], 
> PushedFilters: [IsNotNull(id1)], ReadSchema: struct<id1:bigint>
>          +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0
>             +- Exchange hashpartitioning(id2#5L, 200)
>                +- *(3) Project [id2#5L]
>                   +- *(3) Filter isnotnull(id2#5L)
>                      +- *(3) FileScan csv [id2#5L] Batched: false, Format: 
> CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], 
> PushedFilters: [IsNotNull(id2)], ReadSchema: struct<id2:bigint>
> {noformat}
> The extra exchange is reflected in the run time of the query.
> My tests were on inputs with more than 2 million records.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to