Jacky Lau created FLINK-34665:
---------------------------------
Summary: Add streaming rule for union to Rand and it convert to
StreamExecDeduplicate finally
Key: FLINK-34665
URL: https://issues.apache.org/jira/browse/FLINK-34665
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: Jacky Lau
Fix For: 1.20.0
The semantics of a union in SQL involves deduplication, and in Calcite, when
converting a SQL node to a RelNode, a Distinct Aggregate is inserted above the
Union to achieve this deduplication. In Flink, the Distinct Aggregate
eventually gets converted into a StreamExecGroupAggregate operator. This
operator accesses the state multiple times, and from our observations of
numerous jobs, we can see that the stack often gets stuck at state access. This
is because the key for the distinct aggregate is all the fields of the union,
meaning that for the state, the key will be relatively large, and repeated
access and comparisons to the state can be time-consuming.
In fact, a potential optimization is to add a rule to convert the Union into a
Rank with processing time, which then ultimately gets converted into a
StreamExecDeduplicate. Currently, we have users rewrite their SQL to use
Row_number for deduplication, and this approach works very well. Therefore, it
is possible to add a rule at the engine level to support this optimization.
and it will break the change of plan, it will cause user upgrade flink version
failed. so i suggest add a flag.default value is not change the behavior
--
This message was sent by Atlassian Jira
(v8.20.10#820010)