[ https://issues.apache.org/jira/browse/FLINK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he updated FLINK-26167: ------------------------------- Parent: (was: FLINK-24892) Issue Type: Bug (was: Sub-task) > Explicitly set the partitioner for the sql operators whose shuffle and sort > are removed > --------------------------------------------------------------------------------------- > > Key: FLINK-26167 > URL: https://issues.apache.org/jira/browse/FLINK-26167 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: godfrey he > Assignee: godfrey he > Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > After FLINK-25995 is finished, we have add an exchange (which will be > converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not > need explicitly hash shuffle (which input has already hashed) > e.g. > {code:sql} > WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT > sum(b1) FROM r group by a1 > {code} > the plan after FLINK-25995 is finished: > {code:java} > Calc(select=[EXPR$0]) > +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS > EXPR$0]) > +- Exchange(distribution=[keep_input_as_is[hash[a1]]) > +- Calc(select=[a1, b1]) > +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, > a2], build=[left]) > :- Exchange(distribution=[hash[a1]]) > : +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')]) > : +- TableSourceScan(table=[[default_catalog, > default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], > fields=[a1, b1, c1]) > +- Exchange(distribution=[hash[a2]]) > +- TableSourceScan(table=[[default_catalog, default_database, > T2, project=[a2], metadata=[]]], fields=[a2]) > {code} > but data between {{Calc}} and {{HashJoin}} may be out of order once their > parallelism is different, so an > {{Exchange(distribution=[keep_input_as_is[hash[a1]])}} should be added > between them. -- This message was sent by Atlassian Jira (v8.20.1#820001)