[
https://issues.apache.org/jira/browse/FLINK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he updated FLINK-26167:
-------------------------------
Description:
After FLINK-25995 is finished, we have add an exchange (which will be converted
to ForwardForConsecutiveHashPartitioner) for the nodes which do not need
explicitly hash shuffle (which input has already hashed)
e.g.
{code:sql}
WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT sum(b1)
FROM r group by a1
{code}
the plan after FLINK-25995 is finished:
{code:java}
Calc(select=[EXPR$0])
+- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0])
+- Exchange(distribution=[keep_input_as_is[hash[a1]])
+- Calc(select=[a1, b1])
+- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1,
a2], build=[left])
:- Exchange(distribution=[hash[a1]])
: +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
: +- TableSourceScan(table=[[default_catalog, default_database,
T1, filter=[], project=[a1, b1, c1], metadata=[]]], fields=[a1, b1, c1])
+- Exchange(distribution=[hash[a2]])
+- TableSourceScan(table=[[default_catalog, default_database,
T2, project=[a2], metadata=[]]], fields=[a2])
{code}
but data between{{Calc}} and {{HashJoin}} may be out of order once their
parallelism is different, so an
{{Exchange(distribution=[keep_input_as_is[hash[a1]])}} should be added between
them.
was:
After FLINK-25995 is finished, we have add an exchange (which will be converted
to ForwardForConsecutiveHashPartitioner) for the nodes which do not need
explictly
SELECT * FROM (
SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (
SELECT a, SUM(b) AS b FROM T GROUP BY a
)
) WHERE rk <= 10
Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a],
orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[a ASC, b ASC])
+- Exchange(distribution=[keep_input_as_is[hash[a]]])
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a,
Final_SUM(sum$0) AS b])
+- Exchange(distribution=[hash[a]])
+- TableSourceScan(table=[[default_catalog, default_database, T,
project=[a, b], metadata=[], aggregates=[grouping=[a],
aggFunctions=[LongSumAggFunction(b)]]]], fields=[a, sum$0])
> Explicitly set the partitioner for the sql operators whose shuffle and sort
> are removed
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-26167
> URL: https://issues.apache.org/jira/browse/FLINK-26167
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner
> Reporter: godfrey he
> Assignee: godfrey he
> Priority: Major
> Fix For: 1.15.0
>
>
> After FLINK-25995 is finished, we have add an exchange (which will be
> converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not
> need explicitly hash shuffle (which input has already hashed)
> e.g.
> {code:sql}
> WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT
> sum(b1) FROM r group by a1
> {code}
> the plan after FLINK-25995 is finished:
> {code:java}
> Calc(select=[EXPR$0])
> +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS
> EXPR$0])
> +- Exchange(distribution=[keep_input_as_is[hash[a1]])
> +- Calc(select=[a1, b1])
> +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1,
> a2], build=[left])
> :- Exchange(distribution=[hash[a1]])
> : +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
> : +- TableSourceScan(table=[[default_catalog,
> default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]],
> fields=[a1, b1, c1])
> +- Exchange(distribution=[hash[a2]])
> +- TableSourceScan(table=[[default_catalog, default_database,
> T2, project=[a2], metadata=[]]], fields=[a2])
> {code}
> but data between{{Calc}} and {{HashJoin}} may be out of order once their
> parallelism is different, so an
> {{Exchange(distribution=[keep_input_as_is[hash[a1]])}} should be added
> between them.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)