[
https://issues.apache.org/jira/browse/FLINK-26498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17505943#comment-17505943
]
Yao Zhang commented on FLINK-26498:
-----------------------------------
Hi [~jark] ,
Thank you very much for your reply. I agreed that this implementation might
increase complexity. The problems might be the lack of interaction between
event time trigger and processing time trigger. I also agreed that their logic
should not be intermingled but the two triggers do not know who has fired
before and whether to flush the update. For
table.exec.emit.late-fire.delay = 1 min
it registered a periodical ProcessingTimeTrigger but the window cleanup timer
is a EventTimeTrigger that will be fired at the end of the window +
allowedLateness. There is no guarantee that the elements entered the window
between the last fired moment of ProcessingTimeTrigger and window cleanup timer
have the chance to be emitted. Those elements seem like they have been
disppeared.
Is there a better way to solve this?
> The window result may not have been emitted when use window emit feature and
> set allow-latency
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-26498
> URL: https://issues.apache.org/jira/browse/FLINK-26498
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: hehuiyuan
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2022-03-05-23-53-37-086.png,
> image-2022-03-05-23-53-44-196.png, image-2022-03-06-00-03-11-670.png
>
>
> the sql of job :
> {code:java}
> CREATE TABLE tableSource(
> name string,
> age int not null,
> sex string,
> dt TIMESTAMP(3),
> WATERMARK FOR dt AS dt
> ) WITH (
> );
> CREATE TABLE tableSink(
> windowstart timestamp(3),
> windowend timestamp(3),
> name string,
> age int,
> cou bigint
> )
> WITH (
> );
> INSERT INTO tablesink
> SELECT
> TUMBLE_START(dt, INTERVAL '1' HOUR),
> TUMBLE_END(dt, INTERVAL '1' HOUR),
> name,
> age,
> count(sex)
> FROM tableSource
> GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code}
>
> and table config:
> {code:java}
> table.exec.emit.allow-lateness = 1 hour
> table.exec.emit.late-fire.delay = 1 min
> table.exec.emit.early-fire.delay = 1min{code}
>
> The data:
> {code:java}
> >hehuiyuan1,22,woman,2022-03-05 00:30:22.000
> >hehuiyuan1,22,woman,2022-03-05 00:40:22.000
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 00:50:22.000
> >hehuiyuan1,22,woman,2022-03-05 00:56:22.000
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 01:00:00.000
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data
> //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data
> >hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for
> >[0:00:00 1:00:00]
> >hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code}
>
> The result:
> {code:java}
> > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
>
>
> > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
>
> The window result is lost when `hehuiyuan1,22,woman,2022-03-05 00:59:20.100`
> arrived, the lateTrigger is not trigger and the window[0:00:00 ,1:00:00] is
> cleaned when the data `hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived
> that updated watermark.
>
> The window[0:00:00 ,1:00:00] has 6 pieces of data, but we got 5.
> The trigger is AfterEndOfWindowEarlyAndLate .
> So WindowOpearator may need to emit reuslt when the window cleanupTimer call
> onEventTime.
>
> I think the correct result is as follows:
> {code:java}
> > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6])
> > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)