Thanks for the clarification, Thomas. I think we have to improve the bundle execution of the Flink Runner. It is also not uniform among batch/streaming execution and different operators.
On Wed, Jun 8, 2016 at 7:43 PM, Raghu Angadi <[email protected]> wrote: > On Wed, Jun 8, 2016 at 10:39 AM, Ben Chambers <[email protected]> > wrote: > >> >> To clarify -- this case is actually not allowed by the beam model. The >> guarantee is that either a bundle is successfully completed (startBundle, >> processElement*, finishBundle, commit) or not. If it isn't, then the bundle >> is reprocessed. So, if a `DoFn` instance builds up any state while >> processing a bundle and a failure happens at any point prior to the commit, >> it will be retried. Even though the actual state in the first `DoFn` was >> lost, the second attempt will build up the same state. > > > Makes sense. I missed the fact that finshBundle(Context) could emit more > records, which affects the pipeline state.
