[ 
https://issues.apache.org/jira/browse/FLINK-26004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-26004:
----------------------------
    Description: 
If there are multiple consecutive the same hash shuffle(i.e. keyBy), SQL 
planner will change them except the first one to use forward partitioner, so 
that these operators can be chained to reduce unnecessary shuffles.

However, sometimes the consecutive hash operators are not chained (e.g. 
multiple inputs), and this kind of forward partitioners will turn into forward 
job edges. These forward edges still have the consecutive hash assumption, so 
that they cannot be changed into rescale/rebalance edges, otherwise it can lead 
to incorrect results. This prevents the adaptive batch scheduler from 
determining parallelism for other forward edge downstream job vertices (see 
FLINK-25046).

To solve it, I propose to introduce a new 
{{{}ForwardForConsecutiveHashPartitioner}}. When SQL planner optimizes the case 
of multiple consecutive the same groupBy, it should use the proposed 
partitioner, so that the runtime framework can further decide whether the 
partitioner can be changed to hash or not.
h4.  

  was:
If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner 
will change them except the first one to use forward partitioner, so that these 
operators can be chained to reduce unnecessary shuffles.

However, sometimes the consecutive hash operators are not chained (e.g. 
multiple inputs), and this kind of forward partitioners will turn into forward 
job edges. These forward edges still have the consecutive hash assumption, so 
that they cannot be changed into rescale/rebalance edges, otherwise it can lead 
to incorrect results. This prevents the adaptive batch scheduler from 
determining parallelism for other forward edge downstream job vertices (see 
FLINK-25046).

To solve it, I propose to introduce a new 
{{{}ForwardForConsecutiveHashPartitioner{}}}. When SQL planner optimizes the 
case of multiple consecutive the same groupBy, it should use the proposed 
partitioner, so that the runtime framework can further decide whether the 
partitioner can be changed to hash or not.
h4.  


> Introduce ForwardForConsecutiveHashPartitioner
> ----------------------------------------------
>
>                 Key: FLINK-26004
>                 URL: https://issues.apache.org/jira/browse/FLINK-26004
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Lijie Wang
>            Priority: Major
>              Labels: pull-request-available
>
> If there are multiple consecutive the same hash shuffle(i.e. keyBy), SQL 
> planner will change them except the first one to use forward partitioner, so 
> that these operators can be chained to reduce unnecessary shuffles.
> However, sometimes the consecutive hash operators are not chained (e.g. 
> multiple inputs), and this kind of forward partitioners will turn into 
> forward job edges. These forward edges still have the consecutive hash 
> assumption, so that they cannot be changed into rescale/rebalance edges, 
> otherwise it can lead to incorrect results. This prevents the adaptive batch 
> scheduler from determining parallelism for other forward edge downstream job 
> vertices (see FLINK-25046).
> To solve it, I propose to introduce a new 
> {{{}ForwardForConsecutiveHashPartitioner}}. When SQL planner optimizes the 
> case of multiple consecutive the same groupBy, it should use the proposed 
> partitioner, so that the runtime framework can further decide whether the 
> partitioner can be changed to hash or not.
> h4.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to