lincoln lee created FLINK-39966:
-----------------------------------
Summary: FlinkRelMdModifiedMonotonicity wrongly reports a
non-time-attribute Top-1 Rank as insert-only
Key: FLINK-39966
URL: https://issues.apache.org/jira/browse/FLINK-39966
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 2.1.3, 2.2.1, 2.3.0
Reporter: lincoln lee
Assignee: lincoln lee
Fix For: 2.4.0
FLINK-34702 removed the dedicated StreamPhysicalDeduplicate handler from
FlinkRelMdModifiedMonotonicity and re-routed deduplication monotonicity
derivation through StreamPhysicalRank. The new dispatch guard is:
case physicalRank: StreamPhysicalRank if RankUtil.isDeduplication(rel) =>
RankUtil.isDeduplication only checks "Top-1 ROW_NUMBER without rank-number
output". It is weaker than the condition the old StreamPhysicalDeduplicate node
type implicitly guaranteed — that node only existed when
RankUtil.canConvertToDeduplicate held, which additionally requires sorting on a
single time attribute (sortOnTimeAttributeOnly).
As a result, a Top-1 Rank whose ORDER BY is not a single time attribute (a
regular column, or multiple columns) is mistakenly handled as an append-only
FirstRow deduplication and reported as all-CONSTANT (insert-only) modified
monotonicity. In reality such a Rank retracts and re-emits the kept row
whenever a new winner arrives, so it produces updates.
Downstream operators that consume this metadata then make wrong decisions —
e.g. a MIN/MAX aggregation picks the non-retract variant instead of
MIN_RETRACT/MAX_RETRACT, yielding incorrect query results.
Reproduce:
SELECT b, MIN(c) AS min_c
FROM (
SELECT a, b, c,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY b) AS rn -- ORDER BY a
non-time column
FROM MyTable
) WHERE rn = 1
GROUP BY b
The inner query is a Top-1 Rank that updates. The downstream MIN(c) is planned
with the non-retract MIN (because the Rank output is treated as insert-only)
instead of MIN_RETRACT, so retractions from the Rank are not handled and the
aggregate result is wrong.
Root cause: the dispatch guard dropped the sortOnTimeAttributeOnly invariant
previously carried by the StreamPhysicalDeduplicate node type.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)