[ 
https://issues.apache.org/jira/browse/SPARK-32330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Su updated SPARK-32330:
-----------------------------
        Parent: SPARK-32461
    Issue Type: Sub-task  (was: Improvement)

> Preserve shuffled hash join build side partitioning
> ---------------------------------------------------
>
>                 Key: SPARK-32330
>                 URL: https://issues.apache.org/jira/browse/SPARK-32330
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Cheng Su
>            Assignee: Cheng Su
>            Priority: Major
>             Fix For: 3.1.0
>
>
> Currently `ShuffledHashJoin.outputPartitioning` inherits from 
> `HashJoin.outputPartitioning`, which only preserves stream side partitioning:
> `HashJoin.scala`
> {code:java}
> override def outputPartitioning: Partitioning = 
> streamedPlan.outputPartitioning
> {code}
> This loses build side partitioning information, and causes extra shuffle if 
> there's another join / group-by after this join.
> Example:
>  
> {code:java}
> // code placeholder
> withSQLConf(
>     SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
>     SQLConf.SHUFFLE_PARTITIONS.key -> "2",
>     SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
>   val df1 = spark.range(10).select($"id".as("k1"))
>   val df2 = spark.range(30).select($"id".as("k2"))
>   Seq("inner", "cross").foreach(joinType => {
>     val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
>       .queryExecution.executedPlan
>     assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
>     // No extra shuffle before aggregate
>     assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
>   })
> }{code}
>  
> Current physical plan (having an extra shuffle on `k1` before aggregate)
>  
> {code:java}
> *(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, 
> count#235L])
> +- Exchange hashpartitioning(k1#220L, 2), true, [id=#117]
>    +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], 
> output=[k1#220L, count#239L])
>       +- *(3) Project [k1#220L]
>          +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
>             :- Exchange hashpartitioning(k1#220L, 2), true, [id=#109]
>             :  +- *(1) Project [id#218L AS k1#220L]
>             :     +- *(1) Range (0, 10, step=1, splits=2)
>             +- Exchange hashpartitioning(k2#224L, 2), true, [id=#111]
>                +- *(2) Project [id#222L AS k2#224L]
>                   +- *(2) Range (0, 30, step=1, splits=2){code}
>  
> Ideal physical plan (no shuffle on `k1` before aggregate)
> {code:java}
>  *(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, 
> count#235L])
> +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], 
> output=[k1#220L, count#239L])
>    +- *(3) Project [k1#220L]
>       +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
>          :- Exchange hashpartitioning(k1#220L, 2), true, [id=#107]
>          :  +- *(1) Project [id#218L AS k1#220L]
>          :     +- *(1) Range (0, 10, step=1, splits=2)
>          +- Exchange hashpartitioning(k2#224L, 2), true, [id=#109]
>             +- *(2) Project [id#222L AS k2#224L]
>                +- *(2) Range (0, 30, step=1, splits=2){code}
>  
> This can be fixed by overriding `outputPartitioning` method in 
> `ShuffledHashJoinExec`, similar to `SortMergeJoinExec`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to