[ 
https://issues.apache.org/jira/browse/SPARK-32330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Su updated SPARK-32330:
-----------------------------
    Description: 
Currently `ShuffledHashJoin.outputPartitioning` inherits from 
`HashJoin.outputPartitioning`, which only preserves stream side partitioning:

`HashJoin.scala`
{code:java}
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
{code}
This loses build side partitioning information, and causes extra shuffle if 
there's another join / group-by after this join.

Example:

 
{code:java}
// code placeholder
withSQLConf(
    SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
    SQLConf.SHUFFLE_PARTITIONS.key -> "2",
    SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
  val df1 = spark.range(10).select($"id".as("k1"))
  val df2 = spark.range(30).select($"id".as("k2"))
  Seq("inner", "cross").foreach(joinType => {
    val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
      .queryExecution.executedPlan
    assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
    // No extra shuffle before aggregate
    assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
  })
}{code}
 

Current physical plan (having an extra shuffle on `k1` before aggregate)

 
{code:java}
*(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, 
count#235L])
+- Exchange hashpartitioning(k1#220L, 2), true, [id=#117]
   +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], 
output=[k1#220L, count#239L])
      +- *(3) Project [k1#220L]
         +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
            :- Exchange hashpartitioning(k1#220L, 2), true, [id=#109]
            :  +- *(1) Project [id#218L AS k1#220L]
            :     +- *(1) Range (0, 10, step=1, splits=2)
            +- Exchange hashpartitioning(k2#224L, 2), true, [id=#111]
               +- *(2) Project [id#222L AS k2#224L]
                  +- *(2) Range (0, 30, step=1, splits=2){code}
 

Ideal physical plan (no shuffle on `k1` before aggregate)
{code:java}
 *(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, 
count#235L])
+- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], 
output=[k1#220L, count#239L])
   +- *(3) Project [k1#220L]
      +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
         :- Exchange hashpartitioning(k1#220L, 2), true, [id=#107]
         :  +- *(1) Project [id#218L AS k1#220L]
         :     +- *(1) Range (0, 10, step=1, splits=2)
         +- Exchange hashpartitioning(k2#224L, 2), true, [id=#109]
            +- *(2) Project [id#222L AS k2#224L]
               +- *(2) Range (0, 30, step=1, splits=2){code}
 

This can be fixed by overriding `outputPartitioning` method in 
`ShuffledHashJoinExec`, similar to `SortMergeJoinExec`.

  was:
Currently `ShuffledHashJoin.outputPartitioning` inherits from 
`HashJoin.outputPartitioning`, which only preserves stream side partitioning:

`HashJoin.scala`
{code:java}
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
{code}
This loses build side partitioning information, and causes extra shuffle if 
there's another join / group-by after this join.

Example:

 
{code:java}
// code placeholder
withSQLConf(
    SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
    SQLConf.SHUFFLE_PARTITIONS.key -> "2",
    SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
  val df1 = spark.range(10).select($"id".as("k1"))
  val df2 = spark.range(30).select($"id".as("k2"))
  Seq("inner", "cross").foreach(joinType => {
    val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
      .queryExecution.executedPlan
    assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
    // No extra shuffle before aggregate
    assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
  })
}{code}
 

Current physical plan (having an extra shuffle on `k1` before aggregate)

 
{code:java}
*(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, 
count#235L])
+- Exchange hashpartitioning(k1#220L, 2), true, [id=#117]
   +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], 
output=[k1#220L, count#239L])
      +- *(3) Project [k1#220L]
         +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
            :- Exchange hashpartitioning(k1#220L, 2), true, [id=#109]
            :  +- *(1) Project [id#218L AS k1#220L]
            :     +- *(1) Range (0, 10, step=1, splits=2)
            +- Exchange hashpartitioning(k2#224L, 2), true, [id=#111]
               +- *(2) Project [id#222L AS k2#224L]
                  +- *(2) Range (0, 30, step=1, splits=2){code}
 

Ideal physical plan (no shuffle on `k1` before aggregate)
{code:java}
 *(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, 
count#235L])
+- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], 
output=[k1#220L, count#239L])
   +- *(3) Project [k1#220L]
      +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
         :- Exchange hashpartitioning(k1#220L, 2), true, [id=#107]
         :  +- *(1) Project [id#218L AS k1#220L]
         :     +- *(1) Range (0, 10, step=1, splits=2)
         +- Exchange hashpartitioning(k2#224L, 2), true, [id=#109]
            +- *(2) Project [id#222L AS k2#224L]
               +- *(2) Range (0, 30, step=1, splits=2){code}
 

This can be fixed by overriding `outputPartitioning` method in 
`ShuffledHashJoinExec`, similar to `SortMergeJoinExec`.

`


> Preserve shuffled hash join build side partitioning
> ---------------------------------------------------
>
>                 Key: SPARK-32330
>                 URL: https://issues.apache.org/jira/browse/SPARK-32330
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Cheng Su
>            Priority: Trivial
>
> Currently `ShuffledHashJoin.outputPartitioning` inherits from 
> `HashJoin.outputPartitioning`, which only preserves stream side partitioning:
> `HashJoin.scala`
> {code:java}
> override def outputPartitioning: Partitioning = 
> streamedPlan.outputPartitioning
> {code}
> This loses build side partitioning information, and causes extra shuffle if 
> there's another join / group-by after this join.
> Example:
>  
> {code:java}
> // code placeholder
> withSQLConf(
>     SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
>     SQLConf.SHUFFLE_PARTITIONS.key -> "2",
>     SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
>   val df1 = spark.range(10).select($"id".as("k1"))
>   val df2 = spark.range(30).select($"id".as("k2"))
>   Seq("inner", "cross").foreach(joinType => {
>     val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
>       .queryExecution.executedPlan
>     assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
>     // No extra shuffle before aggregate
>     assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
>   })
> }{code}
>  
> Current physical plan (having an extra shuffle on `k1` before aggregate)
>  
> {code:java}
> *(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, 
> count#235L])
> +- Exchange hashpartitioning(k1#220L, 2), true, [id=#117]
>    +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], 
> output=[k1#220L, count#239L])
>       +- *(3) Project [k1#220L]
>          +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
>             :- Exchange hashpartitioning(k1#220L, 2), true, [id=#109]
>             :  +- *(1) Project [id#218L AS k1#220L]
>             :     +- *(1) Range (0, 10, step=1, splits=2)
>             +- Exchange hashpartitioning(k2#224L, 2), true, [id=#111]
>                +- *(2) Project [id#222L AS k2#224L]
>                   +- *(2) Range (0, 30, step=1, splits=2){code}
>  
> Ideal physical plan (no shuffle on `k1` before aggregate)
> {code:java}
>  *(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, 
> count#235L])
> +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], 
> output=[k1#220L, count#239L])
>    +- *(3) Project [k1#220L]
>       +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
>          :- Exchange hashpartitioning(k1#220L, 2), true, [id=#107]
>          :  +- *(1) Project [id#218L AS k1#220L]
>          :     +- *(1) Range (0, 10, step=1, splits=2)
>          +- Exchange hashpartitioning(k2#224L, 2), true, [id=#109]
>             +- *(2) Project [id#222L AS k2#224L]
>                +- *(2) Range (0, 30, step=1, splits=2){code}
>  
> This can be fixed by overriding `outputPartitioning` method in 
> `ShuffledHashJoinExec`, similar to `SortMergeJoinExec`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to