Cheng Su created SPARK-32383:
--------------------------------
Summary: Preserve hash join (BHJ and SHJ) stream side ordering
Key: SPARK-32383
URL: https://issues.apache.org/jira/browse/SPARK-32383
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.1.0
Reporter: Cheng Su
Currently `BroadcastHashJoinExec` and `ShuffledHashJoinExec` do not preserve
children output ordering information (inherit from `SparkPlan.outputOrdering`,
which is Nil). This can add unnecessary sort in complex queries involved
multiple joins.
Example:
{code:java}
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") {
val df1 = spark.range(100).select($"id".as("k1"))
val df2 = spark.range(100).select($"id".as("k2"))
val df3 = spark.range(3).select($"id".as("k3"))
val df4 = spark.range(100).select($"id".as("k4"))
val plan = df1.join(df2, $"k1" === $"k2")
.join(df3, $"k1" === $"k3")
.join(df4, $"k1" === $"k4")
.queryExecution
.executedPlan
}
{code}
Current physical plan (extra sort on `k1` before top sort merge join):
{code:java}
*(9) SortMergeJoin [k1#220L], [k4#232L], Inner
:- *(6) Sort [k1#220L ASC NULLS FIRST], false, 0
: +- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight
: :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner
: : :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(k1#220L, 5), true, [id=#128]
: : : +- *(1) Project [id#218L AS k1#220L]
: : : +- *(1) Range (0, 100, step=1, splits=2)
: : +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(k2#224L, 5), true, [id=#134]
: : +- *(3) Project [id#222L AS k2#224L]
: : +- *(3) Range (0, 100, step=1, splits=2)
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint,
false])), [id=#141]
: +- *(5) Project [id#226L AS k3#228L]
: +- *(5) Range (0, 3, step=1, splits=2)
+- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(k4#232L, 5), true, [id=#148]
+- *(7) Project [id#230L AS k4#232L]
+- *(7) Range (0, 100, step=1, splits=2)
{code}
Ideal physical plan (no extra sort on `k1` before top sort merge join):
{code:java}
*(9) SortMergeJoin [k1#220L], [k4#232L], Inner
:- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight
: :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner
: : :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(k1#220L, 5), true, [id=#127]
: : : +- *(1) Project [id#218L AS k1#220L]
: : : +- *(1) Range (0, 100, step=1, splits=2)
: : +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(k2#224L, 5), true, [id=#133]
: : +- *(3) Project [id#222L AS k2#224L]
: : +- *(3) Range (0, 100, step=1, splits=2)
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint,
false])), [id=#140]
: +- *(5) Project [id#226L AS k3#228L]
: +- *(5) Range (0, 3, step=1, splits=2)
+- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(k4#232L, 5), true, [id=#146]
+- *(7) Project [id#230L AS k4#232L]
+- *(7) Range (0, 100, step=1, splits=2){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]