The unit of commit is the bundle. Consider a DoFn that does batching (e.g. to interact with some external service less frequently). Items may be buffered during process() but these buffered items must be processed and the results emitted in finishBundle(). If inputs are committed as being consumed before finishBundle is called (and its outputs committed) this buffered state would be lost but the inputs not replayed.
Put another way, the elements are partitioned into bundles, and the exactly once guarantee states that the output is the union of exactly one processing of each bundle. (Bundles may be retried and/or partially processed; such outputs are discarded.) On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi <[email protected]> wrote: > Such data loss can still occur if the worker dies after finishBundle() > returns, but before the consumption is committed. I thought finishBundle() > exists simply as best-effort indication from the runner to user some chunk > of records have been processed.. not part of processing guarantees. Also > the term "bundle" itself is fairly loosely defined (may be intentionally). > > On Wed, Jun 8, 2016 at 8:47 AM, Thomas Groh <[email protected]> > wrote: > >> finishBundle() **must** be called before any input consumption is committed >> (i.e. marking inputs as completed, which incldues committing any elements >> they produced). Doing otherwise can cause data loss, as the state of the >> DoFn is lost if a worker dies, but the input elements will never be >> reprocessed to recreate the DoFn state. If this occurs, any buffered >> outputs are lost. >> >> On Wed, Jun 8, 2016 at 8:21 AM, Bobby Evans <[email protected]> >> wrote: >> >> > The local java runner does arbitrary batching of 10 elements. >> > >> > I'm not sure if flink exposes this or not, but couldn't you use the >> > checkpoint triggers to also start/finish a bundle? >> > - Bobby >> > >> > On Wednesday, June 8, 2016 10:17 AM, Aljoscha Krettek < >> > [email protected]> wrote: >> > >> > >> > Ahh, what we could do is artificially induce bundles using either count >> or >> > processing time or both. Just so that finishBundle() is called once in a >> > while. >> > >> > On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek <[email protected]> >> wrote: >> > >> > > Pretty sure, yes. The Iterable in a MapPartitionFunction should give >> you >> > > all the values in a given partition. >> > > >> > > I checked again for streaming execution. We're doing the opposite, >> right >> > > now: every element is a bundle in itself, startBundle()/finishBundle() >> > are >> > > called for every element which seems a bit wasteful. The only other >> > option >> > > is to see all elements as one bundle, because Flink does not >> bundle/micro >> > > batch elements in streaming execution. >> > > >> > > On Wed, 8 Jun 2016 at 16:38 Bobby Evans <[email protected]> >> > > wrote: >> > > >> > >> Are you sure about that for Flink? I thought the iterable finished >> when >> > >> you processed a maximum number of elements or the input queue was >> empty >> > so >> > >> that it could returned control back to akka for better sharing of the >> > >> thread pool. >> > >> >> > >> >> > >> >> > >> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99 >> > >> Also in the javadocs for DoFn.Context it explicitly states that you >> can >> > >> emit from the finishBundle method. >> > >> >> > >> >> > >> >> > >> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110 >> > >> I thought I had seen some example of this being used for batching >> output >> > >> to something downstream, like HDFS or Kafka, but I'm not sure on that. >> > If >> > >> you can emit from finsihBundle and an new instance of the DoFn will be >> > >> created around each bundle then I can see some people trying to do >> > >> aggregations inside a DoFn and then emitting them at the end of the >> > bundle >> > >> knowing that if a batch fails or is rolled back the system will handle >> > it. >> > >> If that is not allowed we should really update the javadocs around it >> to >> > >> explain the pitfalls of doing this. >> > >> - Bobby >> > >> >> > >> On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek < >> > >> [email protected]> wrote: >> > >> >> > >> >> > >> Hi, >> > >> a quick related question: In the Flink runner we basically see >> > everything >> > >> as one big bundle, i.e. we call startBundle() once at the beginning >> and >> > >> then keep processing indefinitely, never calling finishBundle(). Is >> this >> > >> also correct behavior? >> > >> >> > >> Best, >> > >> Aljoscha >> > >> >> > >> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <[email protected]> >> > wrote: >> > >> >> > >> > Hey everyone; >> > >> > >> > >> > I'm starting to work on BEAM-38 ( >> > >> > https://issues.apache.org/jira/browse/BEAM-38), which enables an >> > >> > optimization for runners with many small bundles. BEAM-38 allows >> > >> runners to >> > >> > reuse DoFn instances so long as that DoFn has not terminated >> > abnormally. >> > >> > This replaces the previous requirement that a DoFn be used for only >> a >> > >> > single bundle if either of startBundle or finishBundle have been >> > >> > overwritten. >> > >> > >> > >> > DoFn deserialization-per-bundle can be a significance performance >> > >> > bottleneck when there are many small bundles, as is common in >> > streaming >> > >> > executions. It has also surfaced as the cause of much of the current >> > >> > slowness in the new InProcessRunner. >> > >> > >> > >> > Existing Runners do not require any changes; they may choose to take >> > >> > advantage of of the new optimization opportunity. However, user >> DoFns >> > >> may >> > >> > need to be revised to properly set up and tear down state in >> > startBundle >> > >> > and finishBundle, respectively, if the depended on only being used >> > for a >> > >> > single bundle. >> > >> > >> > >> > The first two updates are already in pull requests: >> > >> > >> > >> > PR #419 (https://github.com/apache/incubator-beam/pull/419) updates >> > the >> > >> > Javadoc to the new spec >> > >> > PR #418 (https://github.com/apache/incubator-beam/pull/418) updates >> > the >> > >> > DirectRunner to reuse DoFns according to the new policy. >> > >> > >> > >> > Yours, >> > >> > >> > >> > Thomas >> > >> > >> > >> >> > >> >> > >> >> > > >> > > >> > >> > >> > >> > >>
