[
https://issues.apache.org/jira/browse/FLINK-26498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503321#comment-17503321
]
Jark Wu commented on FLINK-26498:
---------------------------------
I don't think we should emit window result when cleanup time triggerred, this
will break the semantic of window result emitting. According to the
configuration, the window should only emit when
1. the early periodic timer trigger
2. late periodic timer trigger
3. window watermark is arrived
So cleanup shouldn't fire window, otherwise the semantic is confusing.
I think this is definitely a bug and we need to check why the window is not
fired when after-end-of-window watermark is arrived.
> The window result may not have been emitted when use window emit feature and
> set allow-latency
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-26498
> URL: https://issues.apache.org/jira/browse/FLINK-26498
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: hehuiyuan
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2022-03-05-23-53-37-086.png,
> image-2022-03-05-23-53-44-196.png, image-2022-03-06-00-03-11-670.png
>
>
> the sql of job :
> {code:java}
> CREATE TABLE tableSource(
> name string,
> age int not null,
> sex string,
> dt TIMESTAMP(3),
> WATERMARK FOR dt AS dt - INTERVAL '0' SECOND
> ) WITH (
> );
> CREATE TABLE tableSink(
> windowstart timestamp(3),
> windowend timestamp(3),
> name string,
> age int,
> cou bigint
> )
> WITH (
> );
> INSERT INTO tablesink
> SELECT
> TUMBLE_START(dt, INTERVAL '1' HOUR),
> TUMBLE_END(dt, INTERVAL '1' HOUR),
> name,
> age,
> count(sex)
> FROM tableSource
> GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code}
>
> and table config:
> {code:java}
> table.exec.emit.allow-lateness = 1 hour
> table.exec.emit.late-fire.delay = 1 min
> table.exec.emit.early-fire.delay = 1min{code}
>
> The data:
> {code:java}
> >hehuiyuan1,22,woman,2022-03-05 00:30:22.000
> >hehuiyuan1,22,woman,2022-03-05 00:40:22.000
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 00:50:22.000
> >hehuiyuan1,22,woman,2022-03-05 00:56:22.000
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 01:00:00.000
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data
> >hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for
> >[0:00:00 1:00:00]
> >hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code}
>
> The result:
> {code:java}
> > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
>
>
> > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
>
> The window result is lost when `hehuiyuan1,22,woman,2022-03-05 00:59:20.100`
> arrived, the lateTrigger is not trigger and the window[0:00:00 ,1:00:00] is
> cleaned when the data `hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived
> that updated watermark.
>
> The window[0:00:00 ,1:00:00] has 6 pieces of data, but we got 5.
> The trigger is AfterEndOfWindowEarlyAndLate .
> So WindowOpearator may need to emit reuslt when the window cleanupTimer call
> onEventTime.
>
> I think the correct result is as follows:
> {code:java}
> > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6])
> > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)