Zhu Zhu created FLINK-25995:
-------------------------------
Summary: Make implicit assumption of SQL local keyBy/groupBy
explicit
Key: FLINK-25995
URL: https://issues.apache.org/jira/browse/FLINK-25995
Project: Flink
Issue Type: Sub-task
Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: Zhu Zhu
Fix For: 1.15.0
If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner
will change them except the first one to use forward partitioner, so that these
operators can be chained to reduce unnecessary shuffles.
However, sometimes the local keyBy operators are not chained (e.g. multiple
inputs), and this kind of forward partitioners will turn into forward job
edges. These forward edges still have the local keyBy assumption, so that they
cannot be changed into rescale/rebalance edges, otherwise it can lead to
incorrect results. This prevents the adaptive batch scheduler from determining
parallelism for other forward edge downstream job vertices (see FLINK-25046).
To solve it, I propose to introduce a new {{ForwardForRescalePartitioner}}.
When SQL planner optimizes the case of multiple consecutive the same groupBy,
it should use the proposed partitioner, so that the runtime framework can
further decide whether the partitioner can be changed to rescale or not.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)