[ 
https://issues.apache.org/jira/browse/FLINK-12575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12575:
-----------------------------------
    Labels: pull-request-available  (was: )

> Introduce planner rules to remove redundant shuffle and collation
> -----------------------------------------------------------------
>
>                 Key: FLINK-12575
>                 URL: https://issues.apache.org/jira/browse/FLINK-12575
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>              Labels: pull-request-available
>
> {{Exchange}} and {{Sort}} is the most heavy operator, they are created in 
> {{FlinkExpandConversionRule}} when some operators require its inputs to 
> satisfy distribution trait or collation trait in planner rules. However, many 
> operators could provide distribution or collation, e.g. 
> {{BatchExecHashAggregate}} or {{BatchExecHashJoin}} could provide 
> distribution on its shuffle keys, {{BatchExecSortMergeJoin}} could provide 
> distribution and collation on its join keys. If the provided traits could 
> satisfy the required traits, the {{Exchange}} or the {{Sort}} is redundant.
> e.g. 
> {code:sql}
> schema:
> x: a int, b bigint, c varchar
> y: d int, e bigint, f varchar
> t1: a1 int, b1 bigint, c1 varchar
> t2: d1 int, e1 bigint, f1 varchar
> sql:
> select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left 
> outer join t2 on a1 = d1 and b1 = e1
> the physical plan after redundant Exchange and Sort are removed:
> SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))], 
> leftSorted=[true], ...)
> :- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))], 
> leftSorted=[true], ...)
> :  :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...)
> :  :  :- Exchange(distribution=[hash[a, b]])
> :  :  :  +- TableSourceScan(table=[[x]], ...)
> :  :  +- Exchange(distribution=[hash[d, e]])
> :  :     +- TableSourceScan(table=[[y]], ...)
> :  +- Exchange(distribution=[hash[a1, b1]])
> :     +- TableSourceScan(table=[[t1]], ...)
> +- Exchange(distribution=[hash[d1, e1]])
>    +- TableSourceScan(table=[[t2]], ...)
> {code}
> In above physical plan, the Exchanges between SortMergeJoins are redundant 
> due to their shuffle keys are same, the Sorts in the top two SortMergeJoins' 
> left hand side are redundant due to its input is sorted.
> notes: after exchange removed, there maybe exist a sub-tree like 
> localHashAggregate -> globalHashAggregate, the localHashAggregate should be 
> removed due to localHashAggregate is redundant. so do localRank -> 
> globalRank, localSortAggregate -> globalSortAggregate.
> another situation is the shuffle and collation could be removed between 
> multiple OVERs. e.g.
> {code:sql}
> schema:
> MyTable: a int, b int, c varchar
> sql:
> SELECT
>     COUNT(*) OVER (PARTITION BY c ORDER BY a),
>     SUM(a) OVER (PARTITION BY b ORDER BY a),
>     RANK() OVER (PARTITION BY c ORDER BY a, c),
>     SUM(a) OVER (PARTITION BY b ORDER BY a),
>     COUNT(*) OVER (PARTITION BY c ORDER BY c)
>  FROM MyTable
> the physical plan after redundant Exchange and Sort are removed:
> Calc(select=[...])
> +- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*)  ...])
>    +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) 
> ...], window#1=[RANK(*) ...], ...)
>       +- Sort(orderBy=[c ASC, a ASC])
>          +- Exchange(distribution=[hash[c]])
>             +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
> window#0=[COUNT(a), $SUM0(a) ...], ...)
>                +- Sort(orderBy=[b ASC, a ASC])
>                   +- Exchange(distribution=[hash[b]])
>                      +- TableSourceScan(table=[[MyTable]], ...)
> {code}
> the {{Exchange}} and {{Sort}} between the top two OverAggregates are 
> redundant due to their shuffle keys and sort keys are same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to