On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi <[email protected]> wrote:
> Such data loss can still occur if the worker dies after finishBundle() > returns, but before the consumption is committed. If the runner is correctly implemented, there will not be data loss in this case -- the runner should retry the bundle (or all the elements that were in this bundle as part of one or more new bundles) as it has not committed the work. > I thought finishBundle() > exists simply as best-effort indication from the runner to user some chunk > of records have been processed.. not part of processing guarantees. Also > the term "bundle" itself is fairly loosely defined (may be intentionally). > No, finish bundle MUST be called by a runner before it can commit any work. This is akin to flushing a stream before closing it -- the DoFn may have some elements cached or pending and if you don't call finish bundle you will not have fully processed or produced all the elements. Dan > > On Wed, Jun 8, 2016 at 8:47 AM, Thomas Groh <[email protected]> > wrote: > > > finishBundle() **must** be called before any input consumption is > committed > > (i.e. marking inputs as completed, which incldues committing any elements > > they produced). Doing otherwise can cause data loss, as the state of the > > DoFn is lost if a worker dies, but the input elements will never be > > reprocessed to recreate the DoFn state. If this occurs, any buffered > > outputs are lost. > > > > On Wed, Jun 8, 2016 at 8:21 AM, Bobby Evans <[email protected] > > > > wrote: > > > > > The local java runner does arbitrary batching of 10 elements. > > > > > > I'm not sure if flink exposes this or not, but couldn't you use the > > > checkpoint triggers to also start/finish a bundle? > > > - Bobby > > > > > > On Wednesday, June 8, 2016 10:17 AM, Aljoscha Krettek < > > > [email protected]> wrote: > > > > > > > > > Ahh, what we could do is artificially induce bundles using either > count > > or > > > processing time or both. Just so that finishBundle() is called once in > a > > > while. > > > > > > On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek <[email protected]> > > wrote: > > > > > > > Pretty sure, yes. The Iterable in a MapPartitionFunction should give > > you > > > > all the values in a given partition. > > > > > > > > I checked again for streaming execution. We're doing the opposite, > > right > > > > now: every element is a bundle in itself, > startBundle()/finishBundle() > > > are > > > > called for every element which seems a bit wasteful. The only other > > > option > > > > is to see all elements as one bundle, because Flink does not > > bundle/micro > > > > batch elements in streaming execution. > > > > > > > > On Wed, 8 Jun 2016 at 16:38 Bobby Evans <[email protected] > > > > > > wrote: > > > > > > > >> Are you sure about that for Flink? I thought the iterable finished > > when > > > >> you processed a maximum number of elements or the input queue was > > empty > > > so > > > >> that it could returned control back to akka for better sharing of > the > > > >> thread pool. > > > >> > > > >> > > > >> > > > > > > https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99 > > > >> Also in the javadocs for DoFn.Context it explicitly states that you > > can > > > >> emit from the finishBundle method. > > > >> > > > >> > > > >> > > > > > > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110 > > > >> I thought I had seen some example of this being used for batching > > output > > > >> to something downstream, like HDFS or Kafka, but I'm not sure on > that. > > > If > > > >> you can emit from finsihBundle and an new instance of the DoFn will > be > > > >> created around each bundle then I can see some people trying to do > > > >> aggregations inside a DoFn and then emitting them at the end of the > > > bundle > > > >> knowing that if a batch fails or is rolled back the system will > handle > > > it. > > > >> If that is not allowed we should really update the javadocs around > it > > to > > > >> explain the pitfalls of doing this. > > > >> - Bobby > > > >> > > > >> On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek < > > > >> [email protected]> wrote: > > > >> > > > >> > > > >> Hi, > > > >> a quick related question: In the Flink runner we basically see > > > everything > > > >> as one big bundle, i.e. we call startBundle() once at the beginning > > and > > > >> then keep processing indefinitely, never calling finishBundle(). Is > > this > > > >> also correct behavior? > > > >> > > > >> Best, > > > >> Aljoscha > > > >> > > > >> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <[email protected]> > > > wrote: > > > >> > > > >> > Hey everyone; > > > >> > > > > >> > I'm starting to work on BEAM-38 ( > > > >> > https://issues.apache.org/jira/browse/BEAM-38), which enables an > > > >> > optimization for runners with many small bundles. BEAM-38 allows > > > >> runners to > > > >> > reuse DoFn instances so long as that DoFn has not terminated > > > abnormally. > > > >> > This replaces the previous requirement that a DoFn be used for > only > > a > > > >> > single bundle if either of startBundle or finishBundle have been > > > >> > overwritten. > > > >> > > > > >> > DoFn deserialization-per-bundle can be a significance performance > > > >> > bottleneck when there are many small bundles, as is common in > > > streaming > > > >> > executions. It has also surfaced as the cause of much of the > current > > > >> > slowness in the new InProcessRunner. > > > >> > > > > >> > Existing Runners do not require any changes; they may choose to > take > > > >> > advantage of of the new optimization opportunity. However, user > > DoFns > > > >> may > > > >> > need to be revised to properly set up and tear down state in > > > startBundle > > > >> > and finishBundle, respectively, if the depended on only being used > > > for a > > > >> > single bundle. > > > >> > > > > >> > The first two updates are already in pull requests: > > > >> > > > > >> > PR #419 (https://github.com/apache/incubator-beam/pull/419) > updates > > > the > > > >> > Javadoc to the new spec > > > >> > PR #418 (https://github.com/apache/incubator-beam/pull/418) > updates > > > the > > > >> > DirectRunner to reuse DoFns according to the new policy. > > > >> > > > > >> > Yours, > > > >> > > > > >> > Thomas > > > >> > > > > >> > > > >> > > > >> > > > > > > > > > > > > > > > > > > > > > > >
