[ 
https://issues.apache.org/jira/browse/FLINK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-26167:
-------------------------------
    Description: 
After FLINK-25995 is finished, we have add an exchange (which will be converted 
to ForwardForConsecutiveHashPartitioner) for the nodes which do not need 
explicitly hash shuffle (which input has already hashed)

e.g.
{code:sql}
WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT sum(b1) 
FROM r group by a1
{code}

the plan after FLINK-25995 is finished:
{code:java}
Calc(select=[EXPR$0])
+- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0])
   +- Exchange(distribution=[keep_input_as_is[hash[a1]])
      +- Calc(select=[a1, b1])
         +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, 
a2], build=[left])
            :- Exchange(distribution=[hash[a1]])
            :  +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
            :     +- TableSourceScan(table=[[default_catalog, default_database, 
T1, filter=[], project=[a1, b1, c1], metadata=[]]], fields=[a1, b1, c1])
            +- Exchange(distribution=[hash[a2]])
               +- TableSourceScan(table=[[default_catalog, default_database, 
T2, project=[a2], metadata=[]]], fields=[a2])
{code}

but data between {{Calc}} and {{HashJoin}} may be out of order once their 
parallelism is different, so an 
{{Exchange(distribution=[keep_input_as_is[hash[a1]])}} should be added between 
them.

  was:
After FLINK-25995 is finished, we have add an exchange (which will be converted 
to ForwardForConsecutiveHashPartitioner) for the nodes which do not need 
explicitly hash shuffle (which input has already hashed)

e.g.
{code:sql}
WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT sum(b1) 
FROM r group by a1
{code}

the plan after FLINK-25995 is finished:
{code:java}
Calc(select=[EXPR$0])
+- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0])
   +- Exchange(distribution=[keep_input_as_is[hash[a1]])
      +- Calc(select=[a1, b1])
         +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, 
a2], build=[left])
            :- Exchange(distribution=[hash[a1]])
            :  +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
            :     +- TableSourceScan(table=[[default_catalog, default_database, 
T1, filter=[], project=[a1, b1, c1], metadata=[]]], fields=[a1, b1, c1])
            +- Exchange(distribution=[hash[a2]])
               +- TableSourceScan(table=[[default_catalog, default_database, 
T2, project=[a2], metadata=[]]], fields=[a2])
{code}

but data between{{Calc}} and {{HashJoin}} may be out of order once their 
parallelism is different, so an 
{{Exchange(distribution=[keep_input_as_is[hash[a1]])}} should be added between 
them.


> Explicitly set the partitioner for the sql operators whose shuffle and sort 
> are removed
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-26167
>                 URL: https://issues.apache.org/jira/browse/FLINK-26167
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>             Fix For: 1.15.0
>
>
> After FLINK-25995 is finished, we have add an exchange (which will be 
> converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not 
> need explicitly hash shuffle (which input has already hashed)
> e.g.
> {code:sql}
> WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT 
> sum(b1) FROM r group by a1
> {code}
> the plan after FLINK-25995 is finished:
> {code:java}
> Calc(select=[EXPR$0])
> +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS 
> EXPR$0])
>    +- Exchange(distribution=[keep_input_as_is[hash[a1]])
>       +- Calc(select=[a1, b1])
>          +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, 
> a2], build=[left])
>             :- Exchange(distribution=[hash[a1]])
>             :  +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
>             :     +- TableSourceScan(table=[[default_catalog, 
> default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], 
> fields=[a1, b1, c1])
>             +- Exchange(distribution=[hash[a2]])
>                +- TableSourceScan(table=[[default_catalog, default_database, 
> T2, project=[a2], metadata=[]]], fields=[a2])
> {code}
> but data between {{Calc}} and {{HashJoin}} may be out of order once their 
> parallelism is different, so an 
> {{Exchange(distribution=[keep_input_as_is[hash[a1]])}} should be added 
> between them.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to