[ 
https://issues.apache.org/jira/browse/FLINK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-26167:
-------------------------------
    Description: 
After FLINK-25995 is finished, we have add an exchange (which will be converted 
to ForwardForConsecutiveHashPartitioner) for the nodes which do not need 
explictly 


SELECT * FROM (
                SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (
                        SELECT a, SUM(b) AS b FROM T GROUP BY a
                )
        ) WHERE rk <= 10

Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], 
orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
+- Exchange(distribution=[forward])
   +- Sort(orderBy=[a ASC, b ASC])
      +- Exchange(distribution=[keep_input_as_is[hash[a]]])
         +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, 
Final_SUM(sum$0) AS b])
            +- Exchange(distribution=[hash[a]])
               +- TableSourceScan(table=[[default_catalog, default_database, T, 
project=[a, b], metadata=[], aggregates=[grouping=[a], 
aggFunctions=[LongSumAggFunction(b)]]]], fields=[a, sum$0])


> Explicitly set the partitioner for the sql operators whose shuffle and sort 
> are removed
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-26167
>                 URL: https://issues.apache.org/jira/browse/FLINK-26167
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>             Fix For: 1.15.0
>
>
> After FLINK-25995 is finished, we have add an exchange (which will be 
> converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not 
> need explictly 
> SELECT * FROM (
>                 SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (
>                         SELECT a, SUM(b) AS b FROM T GROUP BY a
>                 )
>         ) WHERE rk <= 10
> Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], 
> orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
> +- Exchange(distribution=[forward])
>    +- Sort(orderBy=[a ASC, b ASC])
>       +- Exchange(distribution=[keep_input_as_is[hash[a]]])
>          +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, 
> Final_SUM(sum$0) AS b])
>             +- Exchange(distribution=[hash[a]])
>                +- TableSourceScan(table=[[default_catalog, default_database, 
> T, project=[a, b], metadata=[], aggregates=[grouping=[a], 
> aggFunctions=[LongSumAggFunction(b)]]]], fields=[a, sum$0])



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to