[
https://issues.apache.org/jira/browse/SPARK-32330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cheng Su updated SPARK-32330:
-----------------------------
Description:
Currently `ShuffledHashJoin.outputPartitioning` inherits from
`HashJoin.outputPartitioning`, which only preserves stream side partitioning:
`HashJoin.scala`
{code:java}
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
{code}
This loses build side partitioning information, and causes extra shuffle if
there's another join / group-by after this join.
Example:
{code:java}
// code placeholder
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val df1 = spark.range(10).select($"id".as("k1"))
val df2 = spark.range(30).select($"id".as("k2"))
Seq("inner", "cross").foreach(joinType => {
val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
.queryExecution.executedPlan
assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
// No extra shuffle before aggregate
assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
})
}{code}
Current physical plan (having an extra shuffle on `k1` before aggregate)
{code:java}
*(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L,
count#235L])
+- Exchange hashpartitioning(k1#220L, 2), true, [id=#117]
+- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)],
output=[k1#220L, count#239L])
+- *(3) Project [k1#220L]
+- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
:- Exchange hashpartitioning(k1#220L, 2), true, [id=#109]
: +- *(1) Project [id#218L AS k1#220L]
: +- *(1) Range (0, 10, step=1, splits=2)
+- Exchange hashpartitioning(k2#224L, 2), true, [id=#111]
+- *(2) Project [id#222L AS k2#224L]
+- *(2) Range (0, 30, step=1, splits=2){code}
Ideal physical plan (no shuffle on `k1` before aggregate)
{code:java}
*(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L,
count#235L])
+- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)],
output=[k1#220L, count#239L])
+- *(3) Project [k1#220L]
+- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
:- Exchange hashpartitioning(k1#220L, 2), true, [id=#107]
: +- *(1) Project [id#218L AS k1#220L]
: +- *(1) Range (0, 10, step=1, splits=2)
+- Exchange hashpartitioning(k2#224L, 2), true, [id=#109]
+- *(2) Project [id#222L AS k2#224L]
+- *(2) Range (0, 30, step=1, splits=2){code}
This can be fixed by overriding `outputPartitioning` method in
`ShuffledHashJoinExec`, similar to `SortMergeJoinExec`.
was:
Currently `ShuffledHashJoin.outputPartitioning` inherits from
`HashJoin.outputPartitioning`, which only preserves stream side partitioning:
`HashJoin.scala`
{code:java}
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
{code}
This loses build side partitioning information, and causes extra shuffle if
there's another join / group-by after this join.
Example:
{code:java}
// code placeholder
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val df1 = spark.range(10).select($"id".as("k1"))
val df2 = spark.range(30).select($"id".as("k2"))
Seq("inner", "cross").foreach(joinType => {
val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
.queryExecution.executedPlan
assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
// No extra shuffle before aggregate
assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
})
}{code}
Current physical plan (having an extra shuffle on `k1` before aggregate)
{code:java}
*(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L,
count#235L])
+- Exchange hashpartitioning(k1#220L, 2), true, [id=#117]
+- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)],
output=[k1#220L, count#239L])
+- *(3) Project [k1#220L]
+- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
:- Exchange hashpartitioning(k1#220L, 2), true, [id=#109]
: +- *(1) Project [id#218L AS k1#220L]
: +- *(1) Range (0, 10, step=1, splits=2)
+- Exchange hashpartitioning(k2#224L, 2), true, [id=#111]
+- *(2) Project [id#222L AS k2#224L]
+- *(2) Range (0, 30, step=1, splits=2){code}
Ideal physical plan (no shuffle on `k1` before aggregate)
{code:java}
*(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L,
count#235L])
+- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)],
output=[k1#220L, count#239L])
+- *(3) Project [k1#220L]
+- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
:- Exchange hashpartitioning(k1#220L, 2), true, [id=#107]
: +- *(1) Project [id#218L AS k1#220L]
: +- *(1) Range (0, 10, step=1, splits=2)
+- Exchange hashpartitioning(k2#224L, 2), true, [id=#109]
+- *(2) Project [id#222L AS k2#224L]
+- *(2) Range (0, 30, step=1, splits=2){code}
This can be fixed by overriding `outputPartitioning` method in
`ShuffledHashJoinExec`, similar to `SortMergeJoinExec`.
`
> Preserve shuffled hash join build side partitioning
> ---------------------------------------------------
>
> Key: SPARK-32330
> URL: https://issues.apache.org/jira/browse/SPARK-32330
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.1.0
> Reporter: Cheng Su
> Priority: Trivial
>
> Currently `ShuffledHashJoin.outputPartitioning` inherits from
> `HashJoin.outputPartitioning`, which only preserves stream side partitioning:
> `HashJoin.scala`
> {code:java}
> override def outputPartitioning: Partitioning =
> streamedPlan.outputPartitioning
> {code}
> This loses build side partitioning information, and causes extra shuffle if
> there's another join / group-by after this join.
> Example:
>
> {code:java}
> // code placeholder
> withSQLConf(
> SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
> SQLConf.SHUFFLE_PARTITIONS.key -> "2",
> SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
> val df1 = spark.range(10).select($"id".as("k1"))
> val df2 = spark.range(30).select($"id".as("k2"))
> Seq("inner", "cross").foreach(joinType => {
> val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
> .queryExecution.executedPlan
> assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
> // No extra shuffle before aggregate
> assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
> })
> }{code}
>
> Current physical plan (having an extra shuffle on `k1` before aggregate)
>
> {code:java}
> *(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L,
> count#235L])
> +- Exchange hashpartitioning(k1#220L, 2), true, [id=#117]
> +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)],
> output=[k1#220L, count#239L])
> +- *(3) Project [k1#220L]
> +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
> :- Exchange hashpartitioning(k1#220L, 2), true, [id=#109]
> : +- *(1) Project [id#218L AS k1#220L]
> : +- *(1) Range (0, 10, step=1, splits=2)
> +- Exchange hashpartitioning(k2#224L, 2), true, [id=#111]
> +- *(2) Project [id#222L AS k2#224L]
> +- *(2) Range (0, 30, step=1, splits=2){code}
>
> Ideal physical plan (no shuffle on `k1` before aggregate)
> {code:java}
> *(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L,
> count#235L])
> +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)],
> output=[k1#220L, count#239L])
> +- *(3) Project [k1#220L]
> +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
> :- Exchange hashpartitioning(k1#220L, 2), true, [id=#107]
> : +- *(1) Project [id#218L AS k1#220L]
> : +- *(1) Range (0, 10, step=1, splits=2)
> +- Exchange hashpartitioning(k2#224L, 2), true, [id=#109]
> +- *(2) Project [id#222L AS k2#224L]
> +- *(2) Range (0, 30, step=1, splits=2){code}
>
> This can be fixed by overriding `outputPartitioning` method in
> `ShuffledHashJoinExec`, similar to `SortMergeJoinExec`.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]