A Bundle is an arbitrary collection of elements. A PCollection is divided
into bundles at the discretion of the runner. However, the bundles must
partition the input PCollection; each element is in exactly one bundle, and
each bundle is successfully committed exactly once in a successful pipeline.

Ben's distinction is useful - notably, in the second sequence, as the
bundle has been committed, the elements will not (and can not) be
reprocessed, and outputs can be entirely lost.


For ParDo, the existing sequence was <startBundle> <processElement>*
<finishBundle>, and the new sequence is (<startBundle> <processElement>*
<finishBundle>)*

The documentation for the earlier sequence is at
https://github.com/apache/incubator-beam/blob/0393a7917318baaa1e580259a74bff2c1dcbe6b8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L88
finishBundle is noted as being called whenever an input bundle is completed.

There is also documentation that permits the system to run multiple copies
of a DoFn, starting at
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L400;
in either case, completion includes the execution of the finishBundle()
method.
"
* Sometimes two or more {@link DoFn} instances will be running on the
* same bundle simultaneously, with the system taking the results of
* the first instance to complete successfully
"

On Wed, Jun 8, 2016 at 10:13 AM, Ben Chambers <[email protected]>
wrote:

> I think there is a difference:
>
> - If failure occurs after finishBundle() but before the consumption is
> committed, then the bundle may be reprocessed, which leads to duplicated
> calls to processElement() and finishBundle().
> - If failure occurs after consumption is committed but before
> finishBundle(), then those elements which may have buffered state in the
> DoFn but not had their side-effects fully processed (since the
> finishBundle() was responsible for that) are lost.
>
>
>
> On Wed, Jun 8, 2016 at 10:09 AM Raghu Angadi <[email protected]>
> wrote:
>
> > On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi <[email protected]>
> wrote:
> > >
> > > I thought finishBundle() exists simply as best-effort indication from
> the
> > > runner to user some chunk of records have been processed..
> >
> > also to help with DoFn's own clean up if there is any.
> >
>

Reply via email to