Michael Styles created SPARK-20636:
--------------------------------------
Summary: Eliminate unnecessary shuffle with adjacent Window
expressions
Key: SPARK-20636
URL: https://issues.apache.org/jira/browse/SPARK-20636
Project: Spark
Issue Type: Improvement
Components: Optimizer
Affects Versions: 2.1.1
Reporter: Michael Styles
Consider the following example:
{noformat}
w1 = Window.partitionBy("sno")
w2 = Window.partitionBy("sno", "pno")
supply \
.select('sno', 'pno', 'qty', F.sum('qty').over(w2).alias('sum_qty_2')) \
.select('sno', 'pno', 'qty', F.col('sum_qty_2'),
F.sum('qty').over(w1).alias('sum_qty_1')) \
.explain()
== Optimized Logical Plan ==
Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1112L], [sno#980]
+- Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1105L], [sno#980,
pno#981]
+- Relation[sno#980,pno#981,qty#982L] parquet
== Physical Plan ==
Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1112L], [sno#980]
+- *Sort [sno#980 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(sno#980, 200)
+- Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1105L],
[sno#980, pno#981]
+- *Sort [sno#980 ASC NULLS FIRST, pno#981 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(sno#980, pno#981, 200)
+- *FileScan parquet [sno#980,pno#981,qty#982L] ...
{noformat}
A more efficient query plan can be achieved by flipping the Window expressions
to eliminate an unnecessary shuffle as follows:
{noformat}
== Optimized Logical Plan ==
Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1087L], [sno#980,
pno#981]
+- Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1085L], [sno#980]
+- Relation[sno#980,pno#981,qty#982L] parquet
== Physical Plan ==
Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1087L], [sno#980,
pno#981]
+- *Sort [sno#980 ASC NULLS FIRST, pno#981 ASC NULLS FIRST], false, 0
+- Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1085L], [sno#980]
+- *Sort [sno#980 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(sno#980, 200)
+- *FileScan parquet [sno#980,pno#981,qty#982L] ...
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]