On Wed, Jun 8, 2016 at 10:29 AM Raghu Angadi <[email protected]> wrote:
> On Wed, Jun 8, 2016 at 10:13 AM, Ben Chambers <[email protected] > > > wrote: > > > - If failure occurs after finishBundle() but before the consumption is > > committed, then the bundle may be reprocessed, which leads to duplicated > > calls to processElement() and finishBundle(). > > > > > > > - If failure occurs after consumption is committed but before > > finishBundle(), then those elements which may have buffered state in the > > DoFn but not had their side-effects fully processed (since the > > finishBundle() was responsible for that) are lost. > > I am trying to understand this better. Does this mean during > recovery/replay after a failure, the particular instance of DoFn that > existed before the worker failure would not be discarded, but might still > receive elements? If a DoFn is caching some internal state, it should > always assume the worker its on might abruptly fail anytime and the state > would be lost, right? > To clarify -- this case is actually not allowed by the beam model. The guarantee is that either a bundle is successfully completed (startBundle, processElement*, finishBundle, commit) or not. If it isn't, then the bundle is reprocessed. So, if a `DoFn` instance builds up any state while processing a bundle and a failure happens at any point prior to the commit, it will be retried. Even though the actual state in the first `DoFn` was lost, the second attempt will build up the same state.
