Michael Styles created SPARK-20636:
--------------------------------------

             Summary: Eliminate unnecessary shuffle with adjacent Window 
expressions
                 Key: SPARK-20636
                 URL: https://issues.apache.org/jira/browse/SPARK-20636
             Project: Spark
          Issue Type: Improvement
          Components: Optimizer
    Affects Versions: 2.1.1
            Reporter: Michael Styles


Consider the following example:

{noformat}
w1 = Window.partitionBy("sno")
w2 = Window.partitionBy("sno", "pno")

supply \
    .select('sno', 'pno', 'qty', F.sum('qty').over(w2).alias('sum_qty_2')) \
    .select('sno', 'pno', 'qty', F.col('sum_qty_2'), 
F.sum('qty').over(w1).alias('sum_qty_1')) \
    .explain()

== Optimized Logical Plan ==
Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1112L], [sno#980]
+- Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1105L], [sno#980, 
pno#981]
   +- Relation[sno#980,pno#981,qty#982L] parquet

== Physical Plan ==
Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1112L], [sno#980]
+- *Sort [sno#980 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(sno#980, 200)
      +- Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1105L], 
[sno#980, pno#981]
         +- *Sort [sno#980 ASC NULLS FIRST, pno#981 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(sno#980, pno#981, 200)
               +- *FileScan parquet [sno#980,pno#981,qty#982L] ...
{noformat}

A more efficient query plan can be achieved by flipping the Window expressions 
to eliminate an unnecessary shuffle as follows:

{noformat}
== Optimized Logical Plan ==
Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1087L], [sno#980, 
pno#981]
+- Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1085L], [sno#980]
   +- Relation[sno#980,pno#981,qty#982L] parquet

== Physical Plan ==
Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1087L], [sno#980, 
pno#981]
+- *Sort [sno#980 ASC NULLS FIRST, pno#981 ASC NULLS FIRST], false, 0
   +- Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1085L], [sno#980]
      +- *Sort [sno#980 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(sno#980, 200)
            +- *FileScan parquet [sno#980,pno#981,qty#982L] ...
{noformat}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to