[ 
https://issues.apache.org/jira/browse/SPARK-45230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wan Kun updated SPARK-45230:
----------------------------
    Description: 
If the aggregate operator comes after SMJ and the grouping expressions of 
aggregate operator contains all the join keys of the streamed side, we can add 
a sorter of the streamed side of the SMJ, so the aggregate can be convert to 
SortAggregate which will be faster than HashAggregate.

For example, with table t1(a, b, c) and t2(x, y, z):
{code}
SELECT a, b, sum(c)
FROM t1
JOIN t2
ON t1.b = t2.y
GROUP BY a, b
{code}

The optimized plan:
{code}
Scan(t1)                Scan(t2)
    |                      |
    |                      |
Exchange 1           Exchange 2
     \                     /
       \                 /
         \             /
        SMJ (t1.b = t2.y)
                |
                |
            Aggregate
{code}
Before this PR, spark EnsureReqirement will add Sorter(t1.b) to the left side 
of SMJ.
{code}
Scan(t1)                Scan(t2)
    |                      |
    |                      |
Exchange 1           Exchange 2
     \                     /
   Sort(t1.b)        Sort(t2.y) 
         \             /
       SMJ (t1.b = t2.y)
                |
                |
         HashAggregate
{code}

If we add a Sort(t1.b, t1.a) to the left side of the SMJ, the following 
aggregate could be convert to SortAggregate, will be faster.

 {code}
Scan(t1)                Scan(t2)
    |                      |
    |                      |
Exchange 1           Exchange 2
     \                    /
 Sort(t1.b, t1.a)   Sort(t2.y) 
         \             /
      SMJ (t1.b = t2.y)
                |
                |
          SortAggregate
{code}

Benchmark result 
{code}
Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.16
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
Aggregate after SMJ:                      Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Hash aggregate after SMJ                          48823          48886          
89          0.4        2328.1       1.0X
Sort aggregate after SMJ                          26529          26750         
311          0.8        1265.0       1.8X
{code}

  was:
If the aggregate operator comes after SMJ and the grouping expressions of 
aggregate operator contains all the join keys of the streamed side, we can add 
a sorter of the streamed side of the SMJ, so the aggregate can be convert to 
SortAggregate which will be faster than HashAggregate.

For example, with table t1(a, b, c) and t2(x, y, z):
{code}
SELECT a, b, sum(c)
FROM t1
JOIN t2
ON t1.b = t2.y
GROUP BY a, b
{code}

The optimized plan:
{code}
Scan(t1)                Scan(t2)
    |                      |
    |                      |
Exchange 1           Exchange 2
     \                     /
       \                 /
         \             /
        SMJ (t1.b = t2.y)
                |
                |
            Aggregate
{code}
Before this PR, spark EnsureReqirement will add Sorter(t1.b) to the left side 
of SMJ.
{code}
Scan(t1)                Scan(t2)
    |                      |
    |                      |
Exchange 1           Exchange 2
     \                     /
   Sort(t1.b)        Sort(t2.y) 
         \             /
       SMJ (t1.b = t2.y)
                |
                |
         HashAggregate
{code}

If we add a Sort(t1.b, t1.a) to the left side of the SMJ, the following 
aggregate could be convert to SortAggregate, will be faster.

 {code}
Scan(t1)                Scan(t2)
    |                      |
    |                      |
Exchange 1           Exchange 2
     \                    /
 Sort(t1.b, t1.a)   Sort(t2.y) 
         \             /
      SMJ (t1.b = t2.y)
                |
                |
          SortAggregate
{code}


> Plan sorter for Aggregate after SMJ
> -----------------------------------
>
>                 Key: SPARK-45230
>                 URL: https://issues.apache.org/jira/browse/SPARK-45230
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 4.0.0
>            Reporter: Wan Kun
>            Priority: Major
>
> If the aggregate operator comes after SMJ and the grouping expressions of 
> aggregate operator contains all the join keys of the streamed side, we can 
> add a sorter of the streamed side of the SMJ, so the aggregate can be convert 
> to SortAggregate which will be faster than HashAggregate.
> For example, with table t1(a, b, c) and t2(x, y, z):
> {code}
> SELECT a, b, sum(c)
> FROM t1
> JOIN t2
> ON t1.b = t2.y
> GROUP BY a, b
> {code}
> The optimized plan:
> {code}
> Scan(t1)                Scan(t2)
>     |                      |
>     |                      |
> Exchange 1           Exchange 2
>      \                     /
>        \                 /
>          \             /
>         SMJ (t1.b = t2.y)
>                 |
>                 |
>             Aggregate
> {code}
> Before this PR, spark EnsureReqirement will add Sorter(t1.b) to the left side 
> of SMJ.
> {code}
> Scan(t1)                Scan(t2)
>     |                      |
>     |                      |
> Exchange 1           Exchange 2
>      \                     /
>    Sort(t1.b)        Sort(t2.y) 
>          \             /
>        SMJ (t1.b = t2.y)
>                 |
>                 |
>          HashAggregate
> {code}
> If we add a Sort(t1.b, t1.a) to the left side of the SMJ, the following 
> aggregate could be convert to SortAggregate, will be faster.
>  {code}
> Scan(t1)                Scan(t2)
>     |                      |
>     |                      |
> Exchange 1           Exchange 2
>      \                    /
>  Sort(t1.b, t1.a)   Sort(t2.y) 
>          \             /
>       SMJ (t1.b = t2.y)
>                 |
>                 |
>           SortAggregate
> {code}
> Benchmark result 
> {code}
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.16
> Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
> Aggregate after SMJ:                      Best Time(ms)   Avg Time(ms)   
> Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
> ------------------------------------------------------------------------------------------------------------------------
> Hash aggregate after SMJ                          48823          48886        
>   89          0.4        2328.1       1.0X
> Sort aggregate after SMJ                          26529          26750        
>  311          0.8        1265.0       1.8X
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to