[ 
https://issues.apache.org/jira/browse/FLINK-12575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-12575:
-------------------------------
    Description: 
{{Exchange}} and {{Sort}} is the most heavy operator, they are created in 
{{FlinkExpandConversionRule}} when some operators require its inputs to satisfy 
distribution trait or collation trait in planner rules. However, many operators 
could provide distribution or collation, e.g. {{BatchExecHashAggregate}} or 
{{BatchExecHashJoin}} could provide distribution on its shuffle keys, 
{{BatchExecSortMergeJoin}} could provide distribution and collation on its join 
keys. If the provided traits could satisfy the required traits, the 
{{Exchange}} or the {{Sort}} is redundant.
e.g. 
{code:sql}
schema:
x: a int, b bigint, c varchar
y: d int, e bigint, f varchar
t1: a1 int, b1 bigint, c1 varchar
t2: d1 int, e1 bigint, f1 varchar

sql:
select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left 
outer join t2 on a1 = d1 and b1 = e1

the physical plan after redundant Exchange and Sort are removed:
SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))], 
leftSorted=[true], ...)
:- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))], 
leftSorted=[true], ...)
:  :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...)
:  :  :- Exchange(distribution=[hash[a, b]])
:  :  :  +- TableSourceScan(table=[[x]], ...)
:  :  +- Exchange(distribution=[hash[d, e]])
:  :     +- TableSourceScan(table=[[y]], ...)
:  +- Exchange(distribution=[hash[a1, b1]])
:     +- TableSourceScan(table=[[t1]], ...)
+- Exchange(distribution=[hash[d1, e1]])
   +- TableSourceScan(table=[[t2]], ...)
{code}

In above physical plan, the {{Exchange}} s between {{SortMergeJoin}} s are 
redundant due to their shuffle keys are same, the {{Sort}}s in the top tow 
{{SortMergeJoin}} s' left hand side are redundant due to its input is sorted.

another situation is the shuffle and collation could be removed between 
multiple {{Over}} s. e.g.
{code:sql}
schema:
MyTable: a int, b int, c varchar

sql:
SELECT
    COUNT(*) OVER (PARTITION BY c ORDER BY a),
    SUM(a) OVER (PARTITION BY b ORDER BY a),
    RANK() OVER (PARTITION BY c ORDER BY a, c),
    SUM(a) OVER (PARTITION BY b ORDER BY a),
    COUNT(*) OVER (PARTITION BY c ORDER BY c)
 FROM MyTable

the physical plan after redundant Exchange and Sort are removed:
Calc(select=[...])
+- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*)  ...])
   +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) ...], 
window#1=[RANK(*) ...], ...)
      +- Sort(orderBy=[c ASC, a ASC])
         +- Exchange(distribution=[hash[c]])
            +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[COUNT(a), $SUM0(a) ...], ...)
               +- Sort(orderBy=[b ASC, a ASC])
                  +- Exchange(distribution=[hash[b]])
                     +- TableSourceScan(table=[[MyTable]], ...)
{code}
the {{Exchange}}s and {{Sort}} between the top two {{OverAggregate}}s are 
redundant due to their shuffle keys and sort keys are same.

  was:
{{Exchange}} and {{Sort}} is the most heavy operator, they are created in 
{{FlinkExpandConversionRule}} when some operators require its inputs to satisfy 
distribution trait or collation trait in planner rules. However, many operators 
could provide distribution or collation, e.g. {{BatchExecHashAggregate}} or 
{{BatchExecHashJoin}} could provide distribution on its shuffle keys, 
{{BatchExecSortMergeJoin}} could provide distribution and collation on its join 
keys. If the provided traits could satisfy the required traits, the 
{{Exchange}} or the {{Sort}} is redundant.
e.g. 
{code:sql}
schema:
x: a int, b bigint, c varchar
y: d int, e bigint, f varchar
t1: a1 int, b1 bigint, c1 varchar
t2: d1 int, e1 bigint, f1 varchar

sql:
select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left 
outer join t2 on a1 = d1 and b1 = e1

the physical plan after redundant Exchange and Sort are removed:
SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))], 
leftSorted=[true], ...)
:- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))], 
leftSorted=[true], ...)
:  :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...)
:  :  :- Exchange(distribution=[hash[a, b]])
:  :  :  +- TableSourceScan(table=[[x]], ...)
:  :  +- Exchange(distribution=[hash[d, e]])
:  :     +- TableSourceScan(table=[[y]], ...)
:  +- Exchange(distribution=[hash[a1, b1]])
:     +- TableSourceScan(table=[[t1]], ...)
+- Exchange(distribution=[hash[d1, e1]])
   +- TableSourceScan(table=[[t2]], ...)
{code}

In above physical plan, the {{Exchange}}s between {{SortMergeJoin}}s are 
redundant due to their shuffle keys are same, the {{Sort}}s in the top tow 
{{SortMergeJoin}}s' left hand side are redundant due to its input is sorted.

another situation is the shuffle and collation could be removed between 
multiple {{Over}}s. e.g.
{code:sql}
schema:
MyTable: a int, b int, c varchar

sql:
SELECT
    COUNT(*) OVER (PARTITION BY c ORDER BY a),
    SUM(a) OVER (PARTITION BY b ORDER BY a),
    RANK() OVER (PARTITION BY c ORDER BY a, c),
    SUM(a) OVER (PARTITION BY b ORDER BY a),
    COUNT(*) OVER (PARTITION BY c ORDER BY c)
 FROM MyTable

the physical plan after redundant Exchange and Sort are removed:
Calc(select=[...])
+- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*)  ...])
   +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) ...], 
window#1=[RANK(*) ...], ...)
      +- Sort(orderBy=[c ASC, a ASC])
         +- Exchange(distribution=[hash[c]])
            +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[COUNT(a), $SUM0(a) ...], ...)
               +- Sort(orderBy=[b ASC, a ASC])
                  +- Exchange(distribution=[hash[b]])
                     +- TableSourceScan(table=[[MyTable]], ...)
{code}
the {{Exchange}}s and {{Sort}} between the top two {{OverAggregate}}s are 
redundant due to their shuffle keys and sort keys are same.


> Introduce planner rules to remove redundant shuffle and collation
> -----------------------------------------------------------------
>
>                 Key: FLINK-12575
>                 URL: https://issues.apache.org/jira/browse/FLINK-12575
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>
> {{Exchange}} and {{Sort}} is the most heavy operator, they are created in 
> {{FlinkExpandConversionRule}} when some operators require its inputs to 
> satisfy distribution trait or collation trait in planner rules. However, many 
> operators could provide distribution or collation, e.g. 
> {{BatchExecHashAggregate}} or {{BatchExecHashJoin}} could provide 
> distribution on its shuffle keys, {{BatchExecSortMergeJoin}} could provide 
> distribution and collation on its join keys. If the provided traits could 
> satisfy the required traits, the {{Exchange}} or the {{Sort}} is redundant.
> e.g. 
> {code:sql}
> schema:
> x: a int, b bigint, c varchar
> y: d int, e bigint, f varchar
> t1: a1 int, b1 bigint, c1 varchar
> t2: d1 int, e1 bigint, f1 varchar
> sql:
> select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left 
> outer join t2 on a1 = d1 and b1 = e1
> the physical plan after redundant Exchange and Sort are removed:
> SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))], 
> leftSorted=[true], ...)
> :- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))], 
> leftSorted=[true], ...)
> :  :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...)
> :  :  :- Exchange(distribution=[hash[a, b]])
> :  :  :  +- TableSourceScan(table=[[x]], ...)
> :  :  +- Exchange(distribution=[hash[d, e]])
> :  :     +- TableSourceScan(table=[[y]], ...)
> :  +- Exchange(distribution=[hash[a1, b1]])
> :     +- TableSourceScan(table=[[t1]], ...)
> +- Exchange(distribution=[hash[d1, e1]])
>    +- TableSourceScan(table=[[t2]], ...)
> {code}
> In above physical plan, the {{Exchange}} s between {{SortMergeJoin}} s are 
> redundant due to their shuffle keys are same, the {{Sort}}s in the top tow 
> {{SortMergeJoin}} s' left hand side are redundant due to its input is sorted.
> another situation is the shuffle and collation could be removed between 
> multiple {{Over}} s. e.g.
> {code:sql}
> schema:
> MyTable: a int, b int, c varchar
> sql:
> SELECT
>     COUNT(*) OVER (PARTITION BY c ORDER BY a),
>     SUM(a) OVER (PARTITION BY b ORDER BY a),
>     RANK() OVER (PARTITION BY c ORDER BY a, c),
>     SUM(a) OVER (PARTITION BY b ORDER BY a),
>     COUNT(*) OVER (PARTITION BY c ORDER BY c)
>  FROM MyTable
> the physical plan after redundant Exchange and Sort are removed:
> Calc(select=[...])
> +- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*)  ...])
>    +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) 
> ...], window#1=[RANK(*) ...], ...)
>       +- Sort(orderBy=[c ASC, a ASC])
>          +- Exchange(distribution=[hash[c]])
>             +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
> window#0=[COUNT(a), $SUM0(a) ...], ...)
>                +- Sort(orderBy=[b ASC, a ASC])
>                   +- Exchange(distribution=[hash[b]])
>                      +- TableSourceScan(table=[[MyTable]], ...)
> {code}
> the {{Exchange}}s and {{Sort}} between the top two {{OverAggregate}}s are 
> redundant due to their shuffle keys and sort keys are same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to