lincoln lee created FLINK-39966:
-----------------------------------

             Summary: FlinkRelMdModifiedMonotonicity wrongly reports a 
non-time-attribute Top-1 Rank as insert-only
                 Key: FLINK-39966
                 URL: https://issues.apache.org/jira/browse/FLINK-39966
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 2.1.3, 2.2.1, 2.3.0
            Reporter: lincoln lee
            Assignee: lincoln lee
             Fix For: 2.4.0


FLINK-34702 removed the dedicated StreamPhysicalDeduplicate handler from 
FlinkRelMdModifiedMonotonicity and re-routed deduplication monotonicity 
derivation through StreamPhysicalRank. The new dispatch guard is:

case physicalRank: StreamPhysicalRank if RankUtil.isDeduplication(rel) =>

RankUtil.isDeduplication only checks "Top-1 ROW_NUMBER without rank-number 
output". It is weaker than the condition the old StreamPhysicalDeduplicate node 
type implicitly guaranteed — that node only existed when 
RankUtil.canConvertToDeduplicate held, which additionally requires sorting on a 
single time attribute (sortOnTimeAttributeOnly).

As a result, a Top-1 Rank whose ORDER BY is not a single time attribute (a 
regular column, or multiple columns) is mistakenly handled as an append-only 
FirstRow deduplication and reported as all-CONSTANT (insert-only) modified 
monotonicity. In reality such a Rank retracts and re-emits the kept row 
whenever a new winner arrives, so it produces updates.

Downstream operators that consume this metadata then make wrong decisions — 
e.g. a MIN/MAX aggregation picks the non-retract variant instead of 
MIN_RETRACT/MAX_RETRACT, yielding incorrect query results.

 

Reproduce:

SELECT b, MIN(c) AS min_c
FROM (
  SELECT a, b, c,
         ROW_NUMBER() OVER (PARTITION BY a ORDER BY b) AS rn  -- ORDER BY a 
non-time column
  FROM MyTable
) WHERE rn = 1
GROUP BY b

The inner query is a Top-1 Rank that updates. The downstream MIN(c) is planned 
with the non-retract MIN (because the Rank output is treated as insert-only) 
instead of MIN_RETRACT, so retractions from the Rank are not handled and the 
aggregate result is wrong.

Root cause: the dispatch guard dropped the sortOnTimeAttributeOnly invariant 
previously carried by the StreamPhysicalDeduplicate node type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to