[
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868658#comment-17868658
]
xuyang commented on FLINK-35885:
--------------------------------
The bug is caused by the advancement of the watermark and the proctime timer,
both of which trigger the advance of the window buffer. When the window buffer
advances, it records the time of the last advancement, so that when a new
advancement point arrives that is smaller than this time, it does not need to
flush the data again but can instead retrieve it directly from the state.
The bug occurs when a watermark (wt) that is larger than the proctime (pt)
arrives first. This causes the window buffer to advance and flush the data in
the buffer to the state. When the proctime is triggered by the timer, the
window buffer assumes that the data for this time point (pt) has already been
flushed to the state because pt < wt. However, in reality, the data in the
state is incomplete (or possibly even empty).
> proctime aggregate window triggered by watermark
> ------------------------------------------------
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
> Reporter: Baozhu Zhao
> Priority: Major
>
> We have discovered an unexpected case where abnormal data with a count of 0
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> ----
> The root cause is that
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
> supports advance progress by watermark. When the watermark suddenly exceeds
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)