[
https://issues.apache.org/jira/browse/FLINK-26498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503393#comment-17503393
]
Jark Wu edited comment on FLINK-26498 at 3/9/22, 8:42 AM:
----------------------------------------------------------
Thanks [~hehuiyuan], I got what you mean. The window has been fired when the
"01:00:00.000" record arrived. So the {{hasFired}} is true when cleanup timer
triggerred.
So I think the result is as expected, because the records are out-of-order, and
you declared the watermark definition as "dt - INTERVAL '0' SECOND" which
expects strictly ascending timestamps (out-of-order records maybe dropped, the
"00:59:20.100" record in your case).
If you don't want to drop out-of-order records, please declare watermark to
allow out-of-orders, e.g. "dt - INTERVAL '5' MINUTE" allows 5min out-of-order.
was (Author: jark):
Thanks [~hehuiyuan], I got what you mean. The window has been fired when the
"01:00:00.000" record arrived. So the {{hasFired}} is true when cleanup timer
triggerred.
So I think the result is as expected, because the records are out-of-order, and
you declared the watermark definition as "dt - INTERVAL '0' SECOND" which
expects strictly ascending timestamps (out-of-order records maybe dropped).
If you don't want to drop out-of-order records, please declare watermark to
allow out-of-orders, e.g. "dt - INTERVAL '5' MINUTE" allows 5min out-of-order.
> The window result may not have been emitted when use window emit feature and
> set allow-latency
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-26498
> URL: https://issues.apache.org/jira/browse/FLINK-26498
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: hehuiyuan
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2022-03-05-23-53-37-086.png,
> image-2022-03-05-23-53-44-196.png, image-2022-03-06-00-03-11-670.png
>
>
> the sql of job :
> {code:java}
> CREATE TABLE tableSource(
> name string,
> age int not null,
> sex string,
> dt TIMESTAMP(3),
> WATERMARK FOR dt AS dt - INTERVAL '0' SECOND
> ) WITH (
> );
> CREATE TABLE tableSink(
> windowstart timestamp(3),
> windowend timestamp(3),
> name string,
> age int,
> cou bigint
> )
> WITH (
> );
> INSERT INTO tablesink
> SELECT
> TUMBLE_START(dt, INTERVAL '1' HOUR),
> TUMBLE_END(dt, INTERVAL '1' HOUR),
> name,
> age,
> count(sex)
> FROM tableSource
> GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code}
>
> and table config:
> {code:java}
> table.exec.emit.allow-lateness = 1 hour
> table.exec.emit.late-fire.delay = 1 min
> table.exec.emit.early-fire.delay = 1min{code}
>
> The data:
> {code:java}
> >hehuiyuan1,22,woman,2022-03-05 00:30:22.000
> >hehuiyuan1,22,woman,2022-03-05 00:40:22.000
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 00:50:22.000
> >hehuiyuan1,22,woman,2022-03-05 00:56:22.000
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 01:00:00.000
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data
> >hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for
> >[0:00:00 1:00:00]
> >hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code}
>
> The result:
> {code:java}
> > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
>
>
> > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
>
> The window result is lost when `hehuiyuan1,22,woman,2022-03-05 00:59:20.100`
> arrived, the lateTrigger is not trigger and the window[0:00:00 ,1:00:00] is
> cleaned when the data `hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived
> that updated watermark.
>
> The window[0:00:00 ,1:00:00] has 6 pieces of data, but we got 5.
> The trigger is AfterEndOfWindowEarlyAndLate .
> So WindowOpearator may need to emit reuslt when the window cleanupTimer call
> onEventTime.
>
> I think the correct result is as follows:
> {code:java}
> > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6])
> > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)