[
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872236#comment-17872236
]
lincoln lee commented on FLINK-35885:
-------------------------------------
Fixed in 1.20: 01c0a24c9152721a6fe976797e197cfa72cea97d
1.19: 08649fd6981655bcd131c76ef36f6dd074566dd0
> proctime aggregate window triggered by watermark
> ------------------------------------------------
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
> Reporter: Baozhu Zhao
> Assignee: xuyang
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> We have discovered an unexpected case where abnormal data with a count of 0
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> ----
> The root cause is that
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
> supports advance progress by watermark. When the watermark suddenly exceeds
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)