[ 
https://issues.apache.org/jira/browse/FLINK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-26167:
-------------------------------
        Parent:     (was: FLINK-24892)
    Issue Type: Bug  (was: Sub-task)

> Explicitly set the partitioner for the sql operators whose shuffle and sort 
> are removed
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-26167
>                 URL: https://issues.apache.org/jira/browse/FLINK-26167
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> After FLINK-25995 is finished, we have add an exchange (which will be 
> converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not 
> need explicitly hash shuffle (which input has already hashed)
> e.g.
> {code:sql}
> WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT 
> sum(b1) FROM r group by a1
> {code}
> the plan after FLINK-25995 is finished:
> {code:java}
> Calc(select=[EXPR$0])
> +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS 
> EXPR$0])
>    +- Exchange(distribution=[keep_input_as_is[hash[a1]])
>       +- Calc(select=[a1, b1])
>          +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, 
> a2], build=[left])
>             :- Exchange(distribution=[hash[a1]])
>             :  +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
>             :     +- TableSourceScan(table=[[default_catalog, 
> default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], 
> fields=[a1, b1, c1])
>             +- Exchange(distribution=[hash[a2]])
>                +- TableSourceScan(table=[[default_catalog, default_database, 
> T2, project=[a2], metadata=[]]], fields=[a2])
> {code}
> but data between {{Calc}} and {{HashJoin}} may be out of order once their 
> parallelism is different, so an 
> {{Exchange(distribution=[keep_input_as_is[hash[a1]])}} should be added 
> between them.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to