[
https://issues.apache.org/jira/browse/FLINK-39589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39589:
-----------------------------------
Labels: pull-request-available rocksdb window (was: rocksdb window)
> Window TVF aggregation emits incorrect results with RocksDB state backend
> -------------------------------------------------------------------------
>
> Key: FLINK-39589
> URL: https://issues.apache.org/jira/browse/FLINK-39589
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.18.1
> Reporter: Kanchi Masalia
> Priority: Minor
> Labels: pull-request-available, rocksdb, window
>
> Window TVF aggregation (TUMBLE/HOP/CUMULATE) silently produces incorrect
> results when using RocksDB state backend. Per window, only one key emits
> the correct aggregation - all other keys emit with count=0 and the wrong
> key. The legacy GROUP BY TUMBLE() syntax works correctly with the same
> backend and data.
> h3. Reproduction
> Using datagen connector with 5 distinct keys:
> {code:sql}
> CREATE TEMPORARY TABLE datagen_test (
> event_userid BIGINT,
> ts BIGINT,
> event_time AS TO_TIMESTAMP_LTZ(ts, 3),
> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '100000',
> 'fields.event_userid.min' = '1',
> 'fields.event_userid.max' = '5',
> 'fields.ts.kind' = 'sequence',
> 'fields.ts.start' = '1771345700000',
> 'fields.ts.end' = '1771346600000'
> );
> -- Broken (TVF syntax + RocksDB):
> SELECT event_userid, window_start, window_end, COUNT(*) AS cnt
> FROM TABLE(TUMBLE(TABLE datagen_test, DESCRIPTOR(event_time), INTERVAL '1'
> MINUTE))
> GROUP BY event_userid, window_start, window_end;
> -- Works (legacy syntax + RocksDB):
> SELECT event_userid,
> TUMBLE_START(event_time, INTERVAL '1' MINUTE),
> TUMBLE_END(event_time, INTERVAL '1' MINUTE),
> COUNT(*) AS cnt
> FROM datagen_test
> GROUP BY event_userid, TUMBLE(event_time, INTERVAL '1' MINUTE);
> {code}
> TVF output shows all rows emitting with the same key and most with cnt=0.
> Legacy output shows 5 distinct keys with correct counts (~12,000 each).
> Switching to hashmap backend resolves the issue:
> {code:sql}
> SET 'state.backend.type' = 'hashmap';
> {code}
> h3. Root Cause
> RecordsWindowBuffer.flush() reuses the same WindowKey object when
> requiresCopy=false (RocksDB path). These mutable key references are
> captured by TimerHeapInternalTimer, so when timers fire, all timers
> resolve to the last key — reading and clearing the same state repeatedly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)