[
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Baozhu Zhao updated FLINK-35885:
--------------------------------
Description:
We have discovered an unexpected case where abnormal data with a count of 0
occurs when performing proctime window aggregation on data with a watermark.
The SQL is as follows
{code:sql}
CREATE TABLE s1 (
id INT,
event_time TIMESTAMP(3),
name string,
proc_time AS PROCTIME (),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH
('connector' = 'my-source')
;
SELECT
*
FROM
(
SELECT
name,
COUNT(id) AS total_count,
window_start,
window_end
FROM
TABLE (
TUMBLE (
TABLE s1,
DESCRIPTOR (proc_time),
INTERVAL '30' SECONDS
)
)
GROUP BY
window_start,
window_end,
name
)
WHERE
total_count = 0;
{code}
For detailed test code, please refer to
[https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
----
The root cause is that
https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
supports advance progress by watermark. When the watermark suddenly exceeds
the next window end timestamp, a result of count 0 will appear.
{code:java}
public void processWatermark(Watermark mark) throws Exception {
if (mark.getTimestamp() > currentWatermark) {
windowProcessor.advanceProgress(mark.getTimestamp());
super.processWatermark(mark);
} else {
super.processWatermark(new Watermark(currentWatermark));
}
}
{code}
was:
We have discovered an unexpected case where abnormal data with a count of 0
occurs when performing proctime window aggregation on data with a watermark.
The SQL is as follows
{code:sql}
CREATE TABLE s1 (
id INT,
event_time TIMESTAMP(3),
name string,
proc_time AS PROCTIME (),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH
('connector' = 'my-source')
;
SELECT
*
FROM
(
SELECT
name,
COUNT(id) AS total_count,
window_start,
window_end
FROM
TABLE (
TUMBLE (
TABLE s1,
DESCRIPTOR (proc_time),
INTERVAL '30' SECONDS
)
)
GROUP BY
window_start,
window_end,
name
)
WHERE
total_count = 0;
{code}
For detailed test code, please refer to
https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java
----
The root cause is that
org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator.java#processWatermark()
supports advance progress by watermark. When the watermark suddenly exceeds
the next window end timestamp, a result of count 0 will appear.
{code:java}
public void processWatermark(Watermark mark) throws Exception {
if (mark.getTimestamp() > currentWatermark) {
windowProcessor.advanceProgress(mark.getTimestamp());
super.processWatermark(mark);
} else {
super.processWatermark(new Watermark(currentWatermark));
}
}
{code}
> proctime aggregate window triggered by watermark
> ------------------------------------------------
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
> Reporter: Baozhu Zhao
> Priority: Major
>
> We have discovered an unexpected case where abnormal data with a count of 0
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> ----
> The root cause is that
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
> supports advance progress by watermark. When the watermark suddenly exceeds
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)