[ https://issues.apache.org/jira/browse/BEAM-5307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16661260#comment-16661260 ]
Kenneth Knowles commented on BEAM-5307: --------------------------------------- Hi [~sploegsma], thanks for the report. The trouble is that {{@FinishBundle}} is not partitioned by key or window, but is a way to flush outgoing buffers in a fairly naive way. What you want in the stateful case is to set a timer for the window expiration and flush there. We have some work in progress to make this a convenient {{@OnWindowExpiration}} method - you can follow BEAM-1589 if you like. > Allow injection of state in DoFn.FinishBundle > --------------------------------------------- > > Key: BEAM-5307 > URL: https://issues.apache.org/jira/browse/BEAM-5307 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core > Reporter: Sander Ploegsma > Assignee: Kenneth Knowles > Priority: Major > Fix For: Not applicable > > > Example use case: a stateful {{DoFn}} that requires persisting its state to > an external database. Instead of writing to the external database for each > element, it is much more efficient to flush the state every once in a while, > or when cleaning up. > Currently, this is not possible because the {{@StateId}} injection is only > available in processing functions and timers. > [This|https://stackoverflow.com/questions/51789776/calculating-deltas-in-apache-beam-using-stateful-processing] > might be a workaround, but I'm not even sure if it works. > Instead, by allowing the use of {{@StateId}} inside a {{@FinishBundle}} > function, we can make sure the internal state is persisted in all scenarios: > {code:java} > @FinishBundle > public void flush(FinishBundleContext context, @StateId("myState") > ValueState<StateObj> state) { > repository.save(state.read()); > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)