[ 
https://issues.apache.org/jira/browse/SPARK-24066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

caoxuewen updated SPARK-24066:
------------------------------
    Description: 
Currently, when two adjacent window functions have the same partition and the 
same intersection of order, 
There will be two sorted after shuffling, which is not necessary. This PR adds 
a new optimization rule to eliminate unnecessary sort by exchanged adjacent 
Window expressions.

For example:

val df = Seq(
      ("a", "p1", 10.0, 20.0, 30.0),
      ("a", "p2", 20.0, 10.0, 40.0)).toDF("key", "value", "value1", "value2", 
"value3").select(
        $"key",
        sum("value1").over(Window.partitionBy("key").orderBy("value")),
        max("value2").over(Window.partitionBy("key").orderBy("value", 
"value1")),
        avg("value3").over(Window.partitionBy("key").orderBy("value", "value1", 
"value2"))
      ).queryExecution.executedPlan

 

Before  this PR:

*(5) Project [key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC 
NULLS FIRST unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER 
BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, 
avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
+- Window [max(value2#19) windowspecdefinition(key#16, value#17 ASC NULLS 
FIRST, value1#18 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
unboundedpreceding$(), currentrow$())) AS max(value2) OVER (PARTITION BY key 
ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST 
unspecifiedframe$())#30], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC 
NULLS FIRST]
   +- *(4) Project [key#16, value1#18, value#17, value2#19, sum(value1) OVER 
(PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29, 
avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
      +- Window [avg(value3#20) windowspecdefinition(key#16, value#17 ASC NULLS 
FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31], [key#16], 
[value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST]
         +- *(3) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST, 
value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST], false, 0
            +- Window [sum(value1#18) windowspecdefinition(key#16, value#17 ASC 
NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS 
FIRST unspecifiedframe$())#29], [key#16], [value#17 ASC NULLS FIRST]
               +- *(2) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST], 
false, 0
                  +- Exchange hashpartitioning(key#16, 5)
                     +- *(1) Project [_1#5 AS key#16, _3#7 AS value1#18, _2#6 
AS value#17, _4#8 AS value2#19, _5#9 AS value3#20]
                        +- LocalTableScan [_1#5, _2#6, _3#7, _4#8, _5#9]

 

After  this PR:

*(5) Project [key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC 
NULLS FIRST unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER 
BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, 
avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
+- Window [sum(value1#18) windowspecdefinition(key#16, value#17 ASC NULLS 
FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) 
AS sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST 
unspecifiedframe$())#29], [key#16], [value#17 ASC NULLS FIRST]
   +- *(4) Project [key#16, value1#18, value#17, avg(value3) OVER (PARTITION BY 
key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS 
FIRST unspecifiedframe$())#31, max(value2) OVER (PARTITION BY key ORDER BY 
value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30]
      +- Window [max(value2#19) windowspecdefinition(key#16, value#17 ASC NULLS 
FIRST, value1#18 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
unboundedpreceding$(), currentrow$())) AS max(value2) OVER (PARTITION BY key 
ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST 
unspecifiedframe$())#30], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC 
NULLS FIRST]
         +- *(3) Project [key#16, value1#18, value#17, value2#19, avg(value3) 
OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, 
value2 ASC NULLS FIRST unspecifiedframe$())#31]
            +- Window [avg(value3#20) windowspecdefinition(key#16, value#17 ASC 
NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31], [key#16], 
[value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST]
               +- *(2) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST, 
value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(key#16, 5)
                     +- *(1) Project [_1#5 AS key#16, _3#7 AS value1#18, _2#6 
AS value#17, _4#8 AS value2#19, _5#9 AS value3#20]
                        +- LocalTableScan [_1#5, _2#6, _3#7, _4#8, _5#9]

 

 

  was:
Currently, when the order field of window function has a subset relationship, 
SparkSQL will randomly generate different physical plan.
Similar like:

case class DistinctAgg(a: Int, b: Float, c: Double, d: Int, e: String)
val df = spark.sparkContext.parallelize(
      DistinctAgg(8, 2, 3, 4, "a") ::
      DistinctAgg(9, 3, 4, 5, "b") ::
      DistinctAgg(3, 4, 5, 6, "c") ::
      DistinctAgg(3, 4, 5, 7, "c") ::
      DistinctAgg(3, 4, 5, 8, "c") ::
      DistinctAgg(3, 6, 6, 9, "d") ::
      DistinctAgg(30, 40, 50, 60, "e") ::
      DistinctAgg(41, 51, 61, 71, null) ::
      DistinctAgg(42, 52, 62, 72, null) ::
      DistinctAgg(43, 53, 63, 73, "k") ::Nil).toDF()
df.createOrReplaceTempView("distinctAgg")

select a, b, c, 
avg(b) over(partition by a order by b) as sumIb, 
sum(d) over(partition by a order by b, c) as sumId, d 
from distinctAgg 

The physics plan will produce different results randomly.  
One: there is only one sort of physical plan  
== Physical Plan ==
*(3) Project [a#181, b#182, c#183, sumId#210L, sumIb#209L, d#184]
+- Window [sum(cast(b#182 as bigint)) windowspecdefinition(a#181, b#182 ASC 
NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS sumIb#209L], [a#181], [b#182 ASC NULLS FIRST]
   +- Window [sum(cast(d#184 as bigint)) windowspecdefinition(a#181, b#182 ASC 
NULLS FIRST, c#183 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
unboundedpreceding$(), currentrow$())) AS sumId#210L], [a#181], [b#182 ASC 
NULLS FIRST, c#183 ASC NULLS FIRST]
      +- *(2) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST, c#183 ASC 
NULLS FIRST], false, 0
         +- Exchange hashpartitioning(a#181, 5)
            +- *(1) Project [a#181, b#182, c#183, d#184]
               +- *(1) SerializeFromObject [assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).a AS a#181, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).b AS b#182, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).c AS c#183, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).d AS d#184, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).e, true, false) AS 
e#185]
                  +- Scan ExternalRDDScan[obj#180]

Another one: there is two sort of physical plans
== Physical Plan ==
*(4) Project [a#181, b#182, c#183, sumId#210L, sumIb#209L, d#184]
+- Window [sum(cast(d#184 as bigint)) windowspecdefinition(a#181, b#182 ASC 
NULLS FIRST, c#183 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
unboundedpreceding$(), currentrow$())) AS sumId#210L], [a#181], [b#182 ASC 
NULLS FIRST, c#183 ASC NULLS FIRST]
   +- *(3) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST, c#183 ASC NULLS 
FIRST], false, 0
      +- Window [sum(cast(b#182 as bigint)) windowspecdefinition(a#181, b#182 
ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS sumIb#209L], [a#181], [b#182 ASC NULLS FIRST]
         +- *(2) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(a#181, 5)
               +- *(1) Project [a#181, b#182, c#183, d#184]
                  +- *(1) SerializeFromObject [assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).a AS a#181, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).b AS b#182, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).c AS c#183, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).d AS d#184, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).e, true, false) AS 
e#185]
                     +- Scan ExternalRDDScan[obj#180]

this PR add an exchange rule to ensure that no redundant physical plan SortExec 
is generated.


> Add new optimization rule to eliminate unnecessary sort by exchanged adjacent 
> Window expressions
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24066
>                 URL: https://issues.apache.org/jira/browse/SPARK-24066
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: caoxuewen
>            Priority: Major
>
> Currently, when two adjacent window functions have the same partition and the 
> same intersection of order, 
> There will be two sorted after shuffling, which is not necessary. This PR 
> adds a new optimization rule to eliminate unnecessary sort by exchanged 
> adjacent Window expressions.
> For example:
> val df = Seq(
>       ("a", "p1", 10.0, 20.0, 30.0),
>       ("a", "p2", 20.0, 10.0, 40.0)).toDF("key", "value", "value1", "value2", 
> "value3").select(
>         $"key",
>         sum("value1").over(Window.partitionBy("key").orderBy("value")),
>         max("value2").over(Window.partitionBy("key").orderBy("value", 
> "value1")),
>         avg("value3").over(Window.partitionBy("key").orderBy("value", 
> "value1", "value2"))
>       ).queryExecution.executedPlan
>  
> Before  this PR:
> *(5) Project [key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC 
> NULLS FIRST unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER 
> BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, 
> avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
> NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
> +- Window [max(value2#19) windowspecdefinition(key#16, value#17 ASC NULLS 
> FIRST, value1#18 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
> unboundedpreceding$(), currentrow$())) AS max(value2) OVER (PARTITION BY key 
> ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST 
> unspecifiedframe$())#30], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC 
> NULLS FIRST]
>    +- *(4) Project [key#16, value1#18, value#17, value2#19, sum(value1) OVER 
> (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29, 
> avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
> NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
>       +- Window [avg(value3#20) windowspecdefinition(key#16, value#17 ASC 
> NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, 
> specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
> avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
> NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31], [key#16], 
> [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS 
> FIRST]
>          +- *(3) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST, 
> value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST], false, 0
>             +- Window [sum(value1#18) windowspecdefinition(key#16, value#17 
> ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
> currentrow$())) AS sum(value1) OVER (PARTITION BY key ORDER BY value ASC 
> NULLS FIRST unspecifiedframe$())#29], [key#16], [value#17 ASC NULLS FIRST]
>                +- *(2) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS 
> FIRST], false, 0
>                   +- Exchange hashpartitioning(key#16, 5)
>                      +- *(1) Project [_1#5 AS key#16, _3#7 AS value1#18, _2#6 
> AS value#17, _4#8 AS value2#19, _5#9 AS value3#20]
>                         +- LocalTableScan [_1#5, _2#6, _3#7, _4#8, _5#9]
>  
> After  this PR:
> *(5) Project [key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC 
> NULLS FIRST unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER 
> BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, 
> avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
> NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
> +- Window [sum(value1#18) windowspecdefinition(key#16, value#17 ASC NULLS 
> FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
> currentrow$())) AS sum(value1) OVER (PARTITION BY key ORDER BY value ASC 
> NULLS FIRST unspecifiedframe$())#29], [key#16], [value#17 ASC NULLS FIRST]
>    +- *(4) Project [key#16, value1#18, value#17, avg(value3) OVER (PARTITION 
> BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC 
> NULLS FIRST unspecifiedframe$())#31, max(value2) OVER (PARTITION BY key ORDER 
> BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30]
>       +- Window [max(value2#19) windowspecdefinition(key#16, value#17 ASC 
> NULLS FIRST, value1#18 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
> unboundedpreceding$(), currentrow$())) AS max(value2) OVER (PARTITION BY key 
> ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST 
> unspecifiedframe$())#30], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC 
> NULLS FIRST]
>          +- *(3) Project [key#16, value1#18, value#17, value2#19, avg(value3) 
> OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS 
> FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
>             +- Window [avg(value3#20) windowspecdefinition(key#16, value#17 
> ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, 
> specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
> avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
> NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31], [key#16], 
> [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS 
> FIRST]
>                +- *(2) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS 
> FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST], false, 0
>                   +- Exchange hashpartitioning(key#16, 5)
>                      +- *(1) Project [_1#5 AS key#16, _3#7 AS value1#18, _2#6 
> AS value#17, _4#8 AS value2#19, _5#9 AS value3#20]
>                         +- LocalTableScan [_1#5, _2#6, _3#7, _4#8, _5#9]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to