[ https://issues.apache.org/jira/browse/FLINK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he updated FLINK-26167: ------------------------------- Description: After FLINK-25995 is finished, we have add an exchange (which will be converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not need explicitly hash shuffle (which input has already hashed) e.g. {code:sql} WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT sum(b1) FROM r group by a1 {code} the plan after FLINK-25995 is finished: {code:java} Calc(select=[EXPR$0]) +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0]) +- Exchange(distribution=[keep_input_as_is[hash[a1]]) +- Calc(select=[a1, b1]) +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, a2], build=[left]) :- Exchange(distribution=[hash[a1]]) : +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')]) : +- TableSourceScan(table=[[default_catalog, default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], fields=[a1, b1, c1]) +- Exchange(distribution=[hash[a2]]) +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2]) {code} but data between {{Calc}} and {{HashJoin}} may be out of order once their parallelism is different, so an {{Exchange(distribution=[keep_input_as_is[hash[a1]])}} should be added between them. was: After FLINK-25995 is finished, we have add an exchange (which will be converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not need explicitly hash shuffle (which input has already hashed) e.g. {code:sql} WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT sum(b1) FROM r group by a1 {code} the plan after FLINK-25995 is finished: {code:java} Calc(select=[EXPR$0]) +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0]) +- Exchange(distribution=[keep_input_as_is[hash[a1]]) +- Calc(select=[a1, b1]) +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, a2], build=[left]) :- Exchange(distribution=[hash[a1]]) : +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')]) : +- TableSourceScan(table=[[default_catalog, default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], fields=[a1, b1, c1]) +- Exchange(distribution=[hash[a2]]) +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2]) {code} but data between{{Calc}} and {{HashJoin}} may be out of order once their parallelism is different, so an {{Exchange(distribution=[keep_input_as_is[hash[a1]])}} should be added between them. > Explicitly set the partitioner for the sql operators whose shuffle and sort > are removed > --------------------------------------------------------------------------------------- > > Key: FLINK-26167 > URL: https://issues.apache.org/jira/browse/FLINK-26167 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner > Reporter: godfrey he > Assignee: godfrey he > Priority: Major > Fix For: 1.15.0 > > > After FLINK-25995 is finished, we have add an exchange (which will be > converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not > need explicitly hash shuffle (which input has already hashed) > e.g. > {code:sql} > WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT > sum(b1) FROM r group by a1 > {code} > the plan after FLINK-25995 is finished: > {code:java} > Calc(select=[EXPR$0]) > +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS > EXPR$0]) > +- Exchange(distribution=[keep_input_as_is[hash[a1]]) > +- Calc(select=[a1, b1]) > +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, > a2], build=[left]) > :- Exchange(distribution=[hash[a1]]) > : +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')]) > : +- TableSourceScan(table=[[default_catalog, > default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], > fields=[a1, b1, c1]) > +- Exchange(distribution=[hash[a2]]) > +- TableSourceScan(table=[[default_catalog, default_database, > T2, project=[a2], metadata=[]]], fields=[a2]) > {code} > but data between {{Calc}} and {{HashJoin}} may be out of order once their > parallelism is different, so an > {{Exchange(distribution=[keep_input_as_is[hash[a1]])}} should be added > between them. -- This message was sent by Atlassian Jira (v8.20.1#820001)