[
https://issues.apache.org/jira/browse/FLINK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he updated FLINK-26167:
-------------------------------
Parent: (was: FLINK-24892)
Issue Type: Bug (was: Sub-task)
> Explicitly set the partitioner for the sql operators whose shuffle and sort
> are removed
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-26167
> URL: https://issues.apache.org/jira/browse/FLINK-26167
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: godfrey he
> Assignee: godfrey he
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.15.0
>
>
> After FLINK-25995 is finished, we have add an exchange (which will be
> converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not
> need explicitly hash shuffle (which input has already hashed)
> e.g.
> {code:sql}
> WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT
> sum(b1) FROM r group by a1
> {code}
> the plan after FLINK-25995 is finished:
> {code:java}
> Calc(select=[EXPR$0])
> +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS
> EXPR$0])
> +- Exchange(distribution=[keep_input_as_is[hash[a1]])
> +- Calc(select=[a1, b1])
> +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1,
> a2], build=[left])
> :- Exchange(distribution=[hash[a1]])
> : +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
> : +- TableSourceScan(table=[[default_catalog,
> default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]],
> fields=[a1, b1, c1])
> +- Exchange(distribution=[hash[a2]])
> +- TableSourceScan(table=[[default_catalog, default_database,
> T2, project=[a2], metadata=[]]], fields=[a2])
> {code}
> but data between {{Calc}} and {{HashJoin}} may be out of order once their
> parallelism is different, so an
> {{Exchange(distribution=[keep_input_as_is[hash[a1]])}} should be added
> between them.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)