[ 
https://issues.apache.org/jira/browse/FLINK-25995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-25995:
----------------------------
    Description: 
If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner 
will change them except the first one to use forward partitioner, so that these 
operators can be chained to reduce unnecessary shuffles.

However, sometimes the local keyBy operators are not chained (e.g. multiple 
inputs), and  this kind of forward partitioners will turn into forward job 
edges. These forward edges still have the local keyBy assumption, so that they 
cannot be changed into rescale/rebalance edges, otherwise it can lead to 
incorrect results. This prevents the adaptive batch scheduler from determining 
parallelism for other forward edge downstream job vertices (see FLINK-25046).

When SQL planner optimizes the case of multiple consecutive the same groupBy, 
it should use {{ForwardForLocalKeyByPartitioner}}, so that the runtime 
framework can further decide whether the partitioner can be changed to rescale 
or not.

  was:
If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner 
will change them except the first one to use forward partitioner, so that these 
operators can be chained to reduce unnecessary shuffles.

However, sometimes the local keyBy operators are not chained (e.g. multiple 
inputs), and  this kind of forward partitioners will turn into forward job 
edges. These forward edges still have the local keyBy assumption, so that they 
cannot be changed into rescale/rebalance edges, otherwise it can lead to 
incorrect results. This prevents the adaptive batch scheduler from determining 
parallelism for other forward edge downstream job vertices (see FLINK-25046).

To solve it, I propose to introduce a new {{ForwardForLocalKeyByPartitioner}}. 
When SQL planner optimizes the case of multiple consecutive the same groupBy, 
it should use the proposed partitioner, so that the runtime framework can 
further decide whether the partitioner can be changed to rescale or not.


> Make implicit assumption of SQL local keyBy/groupBy explicit
> ------------------------------------------------------------
>
>                 Key: FLINK-25995
>                 URL: https://issues.apache.org/jira/browse/FLINK-25995
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>    Affects Versions: 1.15.0
>            Reporter: Zhu Zhu
>            Priority: Major
>             Fix For: 1.15.0
>
>
> If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner 
> will change them except the first one to use forward partitioner, so that 
> these operators can be chained to reduce unnecessary shuffles.
> However, sometimes the local keyBy operators are not chained (e.g. multiple 
> inputs), and  this kind of forward partitioners will turn into forward job 
> edges. These forward edges still have the local keyBy assumption, so that 
> they cannot be changed into rescale/rebalance edges, otherwise it can lead to 
> incorrect results. This prevents the adaptive batch scheduler from 
> determining parallelism for other forward edge downstream job vertices (see 
> FLINK-25046).
> When SQL planner optimizes the case of multiple consecutive the same groupBy, 
> it should use {{ForwardForLocalKeyByPartitioner}}, so that the runtime 
> framework can further decide whether the partitioner can be changed to 
> rescale or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to