A Bundle is an arbitrary collection of elements. A PCollection is divided into bundles at the discretion of the runner. However, the bundles must partition the input PCollection; each element is in exactly one bundle, and each bundle is successfully committed exactly once in a successful pipeline.
Ben's distinction is useful - notably, in the second sequence, as the bundle has been committed, the elements will not (and can not) be reprocessed, and outputs can be entirely lost. For ParDo, the existing sequence was <startBundle> <processElement>* <finishBundle>, and the new sequence is (<startBundle> <processElement>* <finishBundle>)* The documentation for the earlier sequence is at https://github.com/apache/incubator-beam/blob/0393a7917318baaa1e580259a74bff2c1dcbe6b8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L88 finishBundle is noted as being called whenever an input bundle is completed. There is also documentation that permits the system to run multiple copies of a DoFn, starting at https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L400; in either case, completion includes the execution of the finishBundle() method. " * Sometimes two or more {@link DoFn} instances will be running on the * same bundle simultaneously, with the system taking the results of * the first instance to complete successfully " On Wed, Jun 8, 2016 at 10:13 AM, Ben Chambers <[email protected]> wrote: > I think there is a difference: > > - If failure occurs after finishBundle() but before the consumption is > committed, then the bundle may be reprocessed, which leads to duplicated > calls to processElement() and finishBundle(). > - If failure occurs after consumption is committed but before > finishBundle(), then those elements which may have buffered state in the > DoFn but not had their side-effects fully processed (since the > finishBundle() was responsible for that) are lost. > > > > On Wed, Jun 8, 2016 at 10:09 AM Raghu Angadi <[email protected]> > wrote: > > > On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi <[email protected]> > wrote: > > > > > > I thought finishBundle() exists simply as best-effort indication from > the > > > runner to user some chunk of records have been processed.. > > > > also to help with DoFn's own clean up if there is any. > > >
