[
https://issues.apache.org/jira/browse/FLINK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he updated FLINK-26167:
-------------------------------
Description:
After FLINK-25995 is finished, we have add an exchange (which will be converted
to ForwardForConsecutiveHashPartitioner) for the nodes which do not need
explictly
SELECT * FROM (
SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (
SELECT a, SUM(b) AS b FROM T GROUP BY a
)
) WHERE rk <= 10
Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a],
orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[a ASC, b ASC])
+- Exchange(distribution=[keep_input_as_is[hash[a]]])
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a,
Final_SUM(sum$0) AS b])
+- Exchange(distribution=[hash[a]])
+- TableSourceScan(table=[[default_catalog, default_database, T,
project=[a, b], metadata=[], aggregates=[grouping=[a],
aggFunctions=[LongSumAggFunction(b)]]]], fields=[a, sum$0])
> Explicitly set the partitioner for the sql operators whose shuffle and sort
> are removed
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-26167
> URL: https://issues.apache.org/jira/browse/FLINK-26167
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner
> Reporter: godfrey he
> Assignee: godfrey he
> Priority: Major
> Fix For: 1.15.0
>
>
> After FLINK-25995 is finished, we have add an exchange (which will be
> converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not
> need explictly
> SELECT * FROM (
> SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (
> SELECT a, SUM(b) AS b FROM T GROUP BY a
> )
> ) WHERE rk <= 10
> Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a],
> orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
> +- Exchange(distribution=[forward])
> +- Sort(orderBy=[a ASC, b ASC])
> +- Exchange(distribution=[keep_input_as_is[hash[a]]])
> +- HashAggregate(isMerge=[true], groupBy=[a], select=[a,
> Final_SUM(sum$0) AS b])
> +- Exchange(distribution=[hash[a]])
> +- TableSourceScan(table=[[default_catalog, default_database,
> T, project=[a, b], metadata=[], aggregates=[grouping=[a],
> aggFunctions=[LongSumAggFunction(b)]]]], fields=[a, sum$0])
--
This message was sent by Atlassian Jira
(v8.20.1#820001)