[ https://issues.apache.org/jira/browse/FLINK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he updated FLINK-26167: ------------------------------- Description: After FLINK-25995 is finished, we have add an exchange (which will be converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not need explictly SELECT * FROM ( SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM ( SELECT a, SUM(b) AS b FROM T GROUP BY a ) ) WHERE rk <= 10 Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b ASC], global=[true], select=[a, b, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[a ASC, b ASC]) +- Exchange(distribution=[keep_input_as_is[hash[a]]]) +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS b]) +- Exchange(distribution=[hash[a]]) +- TableSourceScan(table=[[default_catalog, default_database, T, project=[a, b], metadata=[], aggregates=[grouping=[a], aggFunctions=[LongSumAggFunction(b)]]]], fields=[a, sum$0]) > Explicitly set the partitioner for the sql operators whose shuffle and sort > are removed > --------------------------------------------------------------------------------------- > > Key: FLINK-26167 > URL: https://issues.apache.org/jira/browse/FLINK-26167 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner > Reporter: godfrey he > Assignee: godfrey he > Priority: Major > Fix For: 1.15.0 > > > After FLINK-25995 is finished, we have add an exchange (which will be > converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not > need explictly > SELECT * FROM ( > SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM ( > SELECT a, SUM(b) AS b FROM T GROUP BY a > ) > ) WHERE rk <= 10 > Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], > orderBy=[b ASC], global=[true], select=[a, b, w0$o0]) > +- Exchange(distribution=[forward]) > +- Sort(orderBy=[a ASC, b ASC]) > +- Exchange(distribution=[keep_input_as_is[hash[a]]]) > +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, > Final_SUM(sum$0) AS b]) > +- Exchange(distribution=[hash[a]]) > +- TableSourceScan(table=[[default_catalog, default_database, > T, project=[a, b], metadata=[], aggregates=[grouping=[a], > aggFunctions=[LongSumAggFunction(b)]]]], fields=[a, sum$0]) -- This message was sent by Atlassian Jira (v8.20.1#820001)