[
https://issues.apache.org/jira/browse/FLINK-12575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he updated FLINK-12575:
-------------------------------
Description:
{{Exchange}} and {{Sort}} is the most heavy operator, they are created in
{{FlinkExpandConversionRule}} when some operators require its inputs to satisfy
distribution trait or collation trait in planner rules. However, many operators
could provide distribution or collation, e.g. {{BatchExecHashAggregate}} or
{{BatchExecHashJoin}} could provide distribution on its shuffle keys,
{{BatchExecSortMergeJoin}} could provide distribution and collation on its join
keys. If the provided traits could satisfy the required traits, the
{{Exchange}} or the {{Sort}} is redundant.
e.g.
{code:sql}
schema:
x: a int, b bigint, c varchar
y: d int, e bigint, f varchar
t1: a1 int, b1 bigint, c1 varchar
t2: d1 int, e1 bigint, f1 varchar
sql:
select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left
outer join t2 on a1 = d1 and b1 = e1
the physical plan after redundant Exchange and Sort are removed:
SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))],
leftSorted=[true], ...)
:- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))],
leftSorted=[true], ...)
: :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...)
: : :- Exchange(distribution=[hash[a, b]])
: : : +- TableSourceScan(table=[[x]], ...)
: : +- Exchange(distribution=[hash[d, e]])
: : +- TableSourceScan(table=[[y]], ...)
: +- Exchange(distribution=[hash[a1, b1]])
: +- TableSourceScan(table=[[t1]], ...)
+- Exchange(distribution=[hash[d1, e1]])
+- TableSourceScan(table=[[t2]], ...)
{code}
In above physical plan, the {{Exchange}} s between {{SortMergeJoin}} s are
redundant due to their shuffle keys are same, the {{Sort}}s in the top tow
{{SortMergeJoin}} s' left hand side are redundant due to its input is sorted.
another situation is the shuffle and collation could be removed between
multiple {{Over}} s. e.g.
{code:sql}
schema:
MyTable: a int, b int, c varchar
sql:
SELECT
COUNT(*) OVER (PARTITION BY c ORDER BY a),
SUM(a) OVER (PARTITION BY b ORDER BY a),
RANK() OVER (PARTITION BY c ORDER BY a, c),
SUM(a) OVER (PARTITION BY b ORDER BY a),
COUNT(*) OVER (PARTITION BY c ORDER BY c)
FROM MyTable
the physical plan after redundant Exchange and Sort are removed:
Calc(select=[...])
+- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*) ...])
+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) ...],
window#1=[RANK(*) ...], ...)
+- Sort(orderBy=[c ASC, a ASC])
+- Exchange(distribution=[hash[c]])
+- OverAggregate(partitionBy=[b], orderBy=[a ASC],
window#0=[COUNT(a), $SUM0(a) ...], ...)
+- Sort(orderBy=[b ASC, a ASC])
+- Exchange(distribution=[hash[b]])
+- TableSourceScan(table=[[MyTable]], ...)
{code}
the {{Exchange}}s and {{Sort}} between the top two {{OverAggregate}}s are
redundant due to their shuffle keys and sort keys are same.
was:
{{Exchange}} and {{Sort}} is the most heavy operator, they are created in
{{FlinkExpandConversionRule}} when some operators require its inputs to satisfy
distribution trait or collation trait in planner rules. However, many operators
could provide distribution or collation, e.g. {{BatchExecHashAggregate}} or
{{BatchExecHashJoin}} could provide distribution on its shuffle keys,
{{BatchExecSortMergeJoin}} could provide distribution and collation on its join
keys. If the provided traits could satisfy the required traits, the
{{Exchange}} or the {{Sort}} is redundant.
e.g.
{code:sql}
schema:
x: a int, b bigint, c varchar
y: d int, e bigint, f varchar
t1: a1 int, b1 bigint, c1 varchar
t2: d1 int, e1 bigint, f1 varchar
sql:
select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left
outer join t2 on a1 = d1 and b1 = e1
the physical plan after redundant Exchange and Sort are removed:
SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))],
leftSorted=[true], ...)
:- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))],
leftSorted=[true], ...)
: :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...)
: : :- Exchange(distribution=[hash[a, b]])
: : : +- TableSourceScan(table=[[x]], ...)
: : +- Exchange(distribution=[hash[d, e]])
: : +- TableSourceScan(table=[[y]], ...)
: +- Exchange(distribution=[hash[a1, b1]])
: +- TableSourceScan(table=[[t1]], ...)
+- Exchange(distribution=[hash[d1, e1]])
+- TableSourceScan(table=[[t2]], ...)
{code}
In above physical plan, the {{Exchange}}s between {{SortMergeJoin}}s are
redundant due to their shuffle keys are same, the {{Sort}}s in the top tow
{{SortMergeJoin}}s' left hand side are redundant due to its input is sorted.
another situation is the shuffle and collation could be removed between
multiple {{Over}}s. e.g.
{code:sql}
schema:
MyTable: a int, b int, c varchar
sql:
SELECT
COUNT(*) OVER (PARTITION BY c ORDER BY a),
SUM(a) OVER (PARTITION BY b ORDER BY a),
RANK() OVER (PARTITION BY c ORDER BY a, c),
SUM(a) OVER (PARTITION BY b ORDER BY a),
COUNT(*) OVER (PARTITION BY c ORDER BY c)
FROM MyTable
the physical plan after redundant Exchange and Sort are removed:
Calc(select=[...])
+- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*) ...])
+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) ...],
window#1=[RANK(*) ...], ...)
+- Sort(orderBy=[c ASC, a ASC])
+- Exchange(distribution=[hash[c]])
+- OverAggregate(partitionBy=[b], orderBy=[a ASC],
window#0=[COUNT(a), $SUM0(a) ...], ...)
+- Sort(orderBy=[b ASC, a ASC])
+- Exchange(distribution=[hash[b]])
+- TableSourceScan(table=[[MyTable]], ...)
{code}
the {{Exchange}}s and {{Sort}} between the top two {{OverAggregate}}s are
redundant due to their shuffle keys and sort keys are same.
> Introduce planner rules to remove redundant shuffle and collation
> -----------------------------------------------------------------
>
> Key: FLINK-12575
> URL: https://issues.apache.org/jira/browse/FLINK-12575
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / Planner
> Reporter: godfrey he
> Assignee: godfrey he
> Priority: Major
>
> {{Exchange}} and {{Sort}} is the most heavy operator, they are created in
> {{FlinkExpandConversionRule}} when some operators require its inputs to
> satisfy distribution trait or collation trait in planner rules. However, many
> operators could provide distribution or collation, e.g.
> {{BatchExecHashAggregate}} or {{BatchExecHashJoin}} could provide
> distribution on its shuffle keys, {{BatchExecSortMergeJoin}} could provide
> distribution and collation on its join keys. If the provided traits could
> satisfy the required traits, the {{Exchange}} or the {{Sort}} is redundant.
> e.g.
> {code:sql}
> schema:
> x: a int, b bigint, c varchar
> y: d int, e bigint, f varchar
> t1: a1 int, b1 bigint, c1 varchar
> t2: d1 int, e1 bigint, f1 varchar
> sql:
> select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left
> outer join t2 on a1 = d1 and b1 = e1
> the physical plan after redundant Exchange and Sort are removed:
> SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))],
> leftSorted=[true], ...)
> :- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))],
> leftSorted=[true], ...)
> : :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...)
> : : :- Exchange(distribution=[hash[a, b]])
> : : : +- TableSourceScan(table=[[x]], ...)
> : : +- Exchange(distribution=[hash[d, e]])
> : : +- TableSourceScan(table=[[y]], ...)
> : +- Exchange(distribution=[hash[a1, b1]])
> : +- TableSourceScan(table=[[t1]], ...)
> +- Exchange(distribution=[hash[d1, e1]])
> +- TableSourceScan(table=[[t2]], ...)
> {code}
> In above physical plan, the {{Exchange}} s between {{SortMergeJoin}} s are
> redundant due to their shuffle keys are same, the {{Sort}}s in the top tow
> {{SortMergeJoin}} s' left hand side are redundant due to its input is sorted.
> another situation is the shuffle and collation could be removed between
> multiple {{Over}} s. e.g.
> {code:sql}
> schema:
> MyTable: a int, b int, c varchar
> sql:
> SELECT
> COUNT(*) OVER (PARTITION BY c ORDER BY a),
> SUM(a) OVER (PARTITION BY b ORDER BY a),
> RANK() OVER (PARTITION BY c ORDER BY a, c),
> SUM(a) OVER (PARTITION BY b ORDER BY a),
> COUNT(*) OVER (PARTITION BY c ORDER BY c)
> FROM MyTable
> the physical plan after redundant Exchange and Sort are removed:
> Calc(select=[...])
> +- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*) ...])
> +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*)
> ...], window#1=[RANK(*) ...], ...)
> +- Sort(orderBy=[c ASC, a ASC])
> +- Exchange(distribution=[hash[c]])
> +- OverAggregate(partitionBy=[b], orderBy=[a ASC],
> window#0=[COUNT(a), $SUM0(a) ...], ...)
> +- Sort(orderBy=[b ASC, a ASC])
> +- Exchange(distribution=[hash[b]])
> +- TableSourceScan(table=[[MyTable]], ...)
> {code}
> the {{Exchange}}s and {{Sort}} between the top two {{OverAggregate}}s are
> redundant due to their shuffle keys and sort keys are same.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)