[ 
https://issues.apache.org/jira/browse/BEAM-5307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16661260#comment-16661260
 ] 

Kenneth Knowles commented on BEAM-5307:
---------------------------------------

Hi [~sploegsma], thanks for the report. The trouble is that {{@FinishBundle}} 
is not partitioned by key or window, but is a way to flush outgoing buffers in 
a fairly naive way. What you want in the stateful case is to set a timer for 
the window expiration and flush there.

We have some work in progress to make this a convenient {{@OnWindowExpiration}} 
method - you can follow BEAM-1589 if you like.

> Allow injection of state in DoFn.FinishBundle
> ---------------------------------------------
>
>                 Key: BEAM-5307
>                 URL: https://issues.apache.org/jira/browse/BEAM-5307
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Sander Ploegsma
>            Assignee: Kenneth Knowles
>            Priority: Major
>             Fix For: Not applicable
>
>
> Example use case: a stateful {{DoFn}} that requires persisting its state to 
> an external database. Instead of writing to the external database for each 
> element, it is much more efficient to flush the state every once in a while, 
> or when cleaning up.
> Currently, this is not possible because the {{@StateId}} injection is only 
> available in processing functions and timers. 
> [This|https://stackoverflow.com/questions/51789776/calculating-deltas-in-apache-beam-using-stateful-processing]
>  might be a workaround, but I'm not even sure if it works.
> Instead, by allowing the use of {{@StateId}} inside a {{@FinishBundle}} 
> function, we can make sure the internal state is persisted in all scenarios:
> {code:java}
> @FinishBundle
> public void flush(FinishBundleContext context, @StateId("myState") 
> ValueState<StateObj> state) {
>     repository.save(state.read());
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to