colin fang created SPARK-28148:
----------------------------------
Summary: repartition after join is not optimized away
Key: SPARK-28148
URL: https://issues.apache.org/jira/browse/SPARK-28148
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 2.4.3
Reporter: colin fang
Partitioning & sorting is usually retained after join.
{code}
spark.conf.set('spark.sql.shuffle.partitions', '42')
df1 = spark.range(5000000, numPartitions=5)
df2 = spark.range(10000000, numPartitions=5)
df3 = spark.range(20000000, numPartitions=5)
# Reuse previous partitions & sort.
df1.join(df2, on='id').join(df3, on='id').explain()
# == Physical Plan ==
# *(8) Project [id#367L]
# +- *(8) SortMergeJoin [id#367L], [id#374L], Inner
# :- *(5) Project [id#367L]
# : +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
# : :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
# : : +- Exchange hashpartitioning(id#367L, 42)
# : : +- *(1) Range (0, 5000000, step=1, splits=5)
# : +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
# : +- Exchange hashpartitioning(id#369L, 42)
# : +- *(3) Range (0, 10000000, step=1, splits=5)
# +- *(7) Sort [id#374L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#374L, 42)
# +- *(6) Range (0, 20000000, step=1, splits=5)
{code}
However here: Partitions persist through left join, sort doesn't.
{code}
df1.join(df2, on='id',
how='left').repartition('id').sortWithinPartitions('id').explain()
# == Physical Plan ==
# *(5) Sort [id#367L ASC NULLS FIRST], false, 0
# +- *(5) Project [id#367L]
# +- SortMergeJoin [id#367L], [id#369L], LeftOuter
# :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
# : +- Exchange hashpartitioning(id#367L, 42)
# : +- *(1) Range (0, 5000000, step=1, splits=5)
# +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#369L, 42)
# +- *(3) Range (0, 10000000, step=1, splits=5)
{code}
Also here: Partitions do not persist though inner join.
{code}
df1.join(df2, on='id').repartition('id').sortWithinPartitions('id').explain()
# == Physical Plan ==
# *(6) Sort [id#367L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#367L, 42)
# +- *(5) Project [id#367L]
# +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
# :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
# : +- Exchange hashpartitioning(id#367L, 42)
# : +- *(1) Range (0, 5000000, step=1, splits=5)
# +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#369L, 42)
# +- *(3) Range (0, 10000000, step=1, splits=5)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]