[
https://issues.apache.org/jira/browse/BEAM-1589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863184#comment-16863184
]
Kenneth Knowles commented on BEAM-1589:
---------------------------------------
You have the right idea. The reason it got blocked is that we don't have all
the necessary watermarks and watermark holds in place. That system timer fires
_after_ the window has expired. In today's implementation, the output watermark
may also have advanced, so any output from {{@OnWindowExpiration}} is not
allowed and dropped. In order to implement it in the natural way, we need to
address BEAM-2535. Specifically:
- the pending timers have a separate watermark from the incoming elements, and
it also holds the output watermark
- timers fire according to the watermark of the incoming elements
- timers have a "wakeup time" that is independent of their "output timestamp",
so you can fire a timer at event time X but output with timestamp less than X
- the _garbage collection_ timer fires after _both_ the element watermark and
the pending timer watermark are advanced
So, notably, the {{@OnWindowExpiration}} timer would have an output watermark
hold at the end of the window (or earlier? that would be an interesting
addition to the design) but would have a firing time _after_ the end of the
window.
Even though we cannot yet do this ideal design, it does seem like we could use
the annotation as a convenience for what the user has to do today: set a timer
at the last moment in the window. I believe the problem with that is that it
cannot be guaranteed to fire just once: since the window is not expired yet,
more elements can come in. But not sure...
> Add OnWindowExpiration method to Stateful DoFn
> ----------------------------------------------
>
> Key: BEAM-1589
> URL: https://issues.apache.org/jira/browse/BEAM-1589
> Project: Beam
> Issue Type: New Feature
> Components: runner-core, sdk-java-core
> Reporter: Jingsong Lee
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> See BEAM-1517
> This allows the user to do some work before the state's garbage collection.
> It seems kind of annoying, but on the other hand forgetting to set a final
> timer to flush state is probably data loss most of the time.
> FlinkRunner does this work very simply, but other runners, such as
> DirectRunner, need to traverse all the states to do this, and maybe it's a
> little hard.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)