[ 
https://issues.apache.org/jira/browse/BEAM-3186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pawel Bartoszek updated BEAM-3186:
----------------------------------
    Description: 
*The context: *
I want to count how many events of given type(A,B, etc) I receive every minute 
using 1 minute windows and AfterWatermark trigger with allowed lateness 1 min.

*Data loss case*
In the case below if there is at least one A element with the event time 
belonging to the window 14:00-14:01 read from Kinesis stream after job is 
restored from savepoint the data loss will not be observed for this key and 
this window. (Please refer to restore_with_trigger.png)

*Not data loss case*
However, if no new A element element is read from Kinesis stream than data loss 
is observable.
(Please refer to restore_no_trigger.png)

*Workaround*
As a workaround we could configure early firings every X seconds which gives up 
to X seconds data loss per key on restore.

*My guess where the issue might be*

I believe this is Beam-Flink integration layer bug. From my investigation I 
don't think it's KinesisReader and possibility that it couldn't advance 
watermark. To prove that after I restore from savepoint I sent some records for 
different key (B) for the same window as shown in the pictures(14:00-14:01) 
without seeing trigger going off for restored window and key A.

My guess is that Beam after job is restored doesn't register flink event time 
timer for restored window unless there is a new element (key) coming for the 
restored window.

Please refer to [this 
gist|https://gist.github.com/pbartoszek/91d2509531f3040492e54146db48cb99] for 
test job that shows this behaviour.


  was:
!restore_no_trigger.png|thumbnail!
!restore_with_trigger.png|thumbnail!


> In-flight data loss when restoring from savepoint
> -------------------------------------------------
>
>                 Key: BEAM-3186
>                 URL: https://issues.apache.org/jira/browse/BEAM-3186
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Pawel Bartoszek
>            Assignee: Aljoscha Krettek
>         Attachments: restore_no_trigger.png, restore_with_trigger.png
>
>
> *The context: *
> I want to count how many events of given type(A,B, etc) I receive every 
> minute using 1 minute windows and AfterWatermark trigger with allowed 
> lateness 1 min.
> *Data loss case*
> In the case below if there is at least one A element with the event time 
> belonging to the window 14:00-14:01 read from Kinesis stream after job is 
> restored from savepoint the data loss will not be observed for this key and 
> this window. (Please refer to restore_with_trigger.png)
> *Not data loss case*
> However, if no new A element element is read from Kinesis stream than data 
> loss is observable.
> (Please refer to restore_no_trigger.png)
> *Workaround*
> As a workaround we could configure early firings every X seconds which gives 
> up to X seconds data loss per key on restore.
> *My guess where the issue might be*
> I believe this is Beam-Flink integration layer bug. From my investigation I 
> don't think it's KinesisReader and possibility that it couldn't advance 
> watermark. To prove that after I restore from savepoint I sent some records 
> for different key (B) for the same window as shown in the 
> pictures(14:00-14:01) without seeing trigger going off for restored window 
> and key A.
> My guess is that Beam after job is restored doesn't register flink event time 
> timer for restored window unless there is a new element (key) coming for the 
> restored window.
> Please refer to [this 
> gist|https://gist.github.com/pbartoszek/91d2509531f3040492e54146db48cb99] for 
> test job that shows this behaviour.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to