[ 
https://issues.apache.org/jira/browse/BEAM-1589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863184#comment-16863184
 ] 

Kenneth Knowles commented on BEAM-1589:
---------------------------------------

You have the right idea. The reason it got blocked is that we don't have all 
the necessary watermarks and watermark holds in place. That system timer fires 
_after_ the window has expired. In today's implementation, the output watermark 
may also have advanced, so any output from {{@OnWindowExpiration}} is not 
allowed and dropped. In order to implement it in the natural way, we need to 
address BEAM-2535. Specifically:

 - the pending timers have a separate watermark from the incoming elements, and 
it also holds the output watermark
 - timers fire according to the watermark of the incoming elements
 - timers have a "wakeup time" that is independent of their "output timestamp", 
so you can fire a timer at event time X but output with timestamp less than X
 - the _garbage collection_ timer fires after _both_ the element watermark and 
the pending timer watermark are advanced

So, notably, the {{@OnWindowExpiration}} timer would have an output watermark 
hold at the end of the window (or earlier? that would be an interesting 
addition to the design) but would have a firing time _after_ the end of the 
window.

Even though we cannot yet do this ideal design, it does seem like we could use 
the annotation as a convenience for what the user has to do today: set a timer 
at the last moment in the window. I believe the problem with that is that it 
cannot be guaranteed to fire just once: since the window is not expired yet, 
more elements can come in. But not sure...


> Add OnWindowExpiration method to Stateful DoFn
> ----------------------------------------------
>
>                 Key: BEAM-1589
>                 URL: https://issues.apache.org/jira/browse/BEAM-1589
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-core, sdk-java-core
>            Reporter: Jingsong Lee
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> See BEAM-1517
> This allows the user to do some work before the state's garbage collection.
> It seems kind of annoying, but on the other hand forgetting to set a final 
> timer to flush state is probably data loss most of the time.
> FlinkRunner does this work very simply, but other runners, such as 
> DirectRunner, need to traverse all the states to do this, and maybe it's a 
> little hard.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to