On Wed, Jun 8, 2016 at 10:29 AM Raghu Angadi <[email protected]>
wrote:

> On Wed, Jun 8, 2016 at 10:13 AM, Ben Chambers <[email protected]
> >
> wrote:
>
> > - If failure occurs after finishBundle() but before the consumption is
> > committed, then the bundle may be reprocessed, which leads to duplicated
> > calls to processElement() and finishBundle().
> >
>
>
>
> > - If failure occurs after consumption is committed but before
> > finishBundle(), then those elements which may have buffered state in the
> > DoFn but not had their side-effects fully processed (since the
> > finishBundle() was responsible for that) are lost.
> >

I am trying to understand this better. Does this mean during
> recovery/replay after a failure, the particular instance of DoFn that
> existed before the worker failure would not be discarded, but might still
> receive elements?  If a DoFn is caching some internal state, it should
> always assume the worker its on might abruptly fail anytime and the state
> would be lost, right?
>

To clarify -- this case is actually not allowed by the beam model. The
guarantee is that either a bundle is successfully completed (startBundle,
processElement*, finishBundle, commit) or not. If it isn't, then the bundle
is reprocessed. So, if a `DoFn` instance builds up any state while
processing a bundle and a failure happens at any point prior to the commit,
it will be retried. Even though the actual state in the first `DoFn` was
lost, the second attempt will build up the same state.

Reply via email to