Kousuke Saruta created SPARK-32820:
--------------------------------------
Summary: Remove redundant shuffle exchanges inserted by
EnsureRequirements
Key: SPARK-32820
URL: https://issues.apache.org/jira/browse/SPARK-32820
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.1.0
Reporter: Kousuke Saruta
Assignee: Kousuke Saruta
Redundant repartition operations are removed by CollapseRepartition rule but
EnsureRequirements can insert another HashPartitioning or RangePartitioning
immediately after the repartition, leading adjacent ShuffleExchanges will be in
the physical plan.
{code:java}
val ordered = spark.range(1, 100).repartitionByRange(10,
$"id".desc).orderBy($"id")
ordered.explain(true)
...
== Physical Plan ==
*(2) Sort [id#0L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#0L ASC NULLS FIRST, 200), true, [id=#15]
+- Exchange rangepartitioning(id#0L DESC NULLS LAST, 10), false, [id=#14]
+- *(1) Range (1, 100, step=1, splits=12){code}
{code:java}
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
val left = Seq(1,2,3).toDF.repartition(10, $"value")
val right = Seq(1,2,3).toDF
val joined = left.join(right, left("value") + 1 === right("value")
joined.explain(true)
...
== Physical Plan ==
*(3) SortMergeJoin [(value#7 + 1)], [value#12], Inner
:- *(1) Sort [(value#7 + 1) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning((value#7 + 1), 200), true, [id=#67]
: +- Exchange hashpartitioning(value#7, 10), false, [id=#63]
: +- LocalTableScan [value#7]
+- *(2) Sort [value#12 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(value#12, 200), true, [id=#68]
+- LocalTableScan [value#12]{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]