[ https://issues.apache.org/jira/browse/SPARK-32330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Cheng Su updated SPARK-32330: ----------------------------- Parent: SPARK-32461 Issue Type: Sub-task (was: Improvement) > Preserve shuffled hash join build side partitioning > --------------------------------------------------- > > Key: SPARK-32330 > URL: https://issues.apache.org/jira/browse/SPARK-32330 > Project: Spark > Issue Type: Sub-task > Components: SQL > Affects Versions: 3.1.0 > Reporter: Cheng Su > Assignee: Cheng Su > Priority: Major > Fix For: 3.1.0 > > > Currently `ShuffledHashJoin.outputPartitioning` inherits from > `HashJoin.outputPartitioning`, which only preserves stream side partitioning: > `HashJoin.scala` > {code:java} > override def outputPartitioning: Partitioning = > streamedPlan.outputPartitioning > {code} > This loses build side partitioning information, and causes extra shuffle if > there's another join / group-by after this join. > Example: > > {code:java} > // code placeholder > withSQLConf( > SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", > SQLConf.SHUFFLE_PARTITIONS.key -> "2", > SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { > val df1 = spark.range(10).select($"id".as("k1")) > val df2 = spark.range(30).select($"id".as("k2")) > Seq("inner", "cross").foreach(joinType => { > val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count() > .queryExecution.executedPlan > assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1) > // No extra shuffle before aggregate > assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2) > }) > }{code} > > Current physical plan (having an extra shuffle on `k1` before aggregate) > > {code:java} > *(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, > count#235L]) > +- Exchange hashpartitioning(k1#220L, 2), true, [id=#117] > +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], > output=[k1#220L, count#239L]) > +- *(3) Project [k1#220L] > +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft > :- Exchange hashpartitioning(k1#220L, 2), true, [id=#109] > : +- *(1) Project [id#218L AS k1#220L] > : +- *(1) Range (0, 10, step=1, splits=2) > +- Exchange hashpartitioning(k2#224L, 2), true, [id=#111] > +- *(2) Project [id#222L AS k2#224L] > +- *(2) Range (0, 30, step=1, splits=2){code} > > Ideal physical plan (no shuffle on `k1` before aggregate) > {code:java} > *(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, > count#235L]) > +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], > output=[k1#220L, count#239L]) > +- *(3) Project [k1#220L] > +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft > :- Exchange hashpartitioning(k1#220L, 2), true, [id=#107] > : +- *(1) Project [id#218L AS k1#220L] > : +- *(1) Range (0, 10, step=1, splits=2) > +- Exchange hashpartitioning(k2#224L, 2), true, [id=#109] > +- *(2) Project [id#222L AS k2#224L] > +- *(2) Range (0, 30, step=1, splits=2){code} > > This can be fixed by overriding `outputPartitioning` method in > `ShuffledHashJoinExec`, similar to `SortMergeJoinExec`. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org