On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi <[email protected]>
wrote:

> Such data loss can still occur if the worker dies after finishBundle()
> returns, but before the consumption is committed.


If the runner is correctly implemented, there will not be data loss in this
case -- the runner
should retry the bundle (or all the elements that were in this bundle as
part of one or more new bundles)
as it has not committed the work.


> I thought finishBundle()
> exists simply as best-effort indication from the runner to user some chunk
> of records have been processed.. not part of processing guarantees. Also
> the term "bundle" itself is fairly loosely defined (may be intentionally).
>

No, finish bundle MUST be called by a runner before it can commit any work.
This
is akin to flushing a stream before closing it -- the DoFn may have some
elements
cached or pending and if you don't call finish bundle you will not have
fully
processed or produced all the elements.

Dan



>
> On Wed, Jun 8, 2016 at 8:47 AM, Thomas Groh <[email protected]>
> wrote:
>
> > finishBundle() **must** be called before any input consumption is
> committed
> > (i.e. marking inputs as completed, which incldues committing any elements
> > they produced). Doing otherwise can cause data loss, as the state of the
> > DoFn is lost if a worker dies, but the input elements will never be
> > reprocessed to recreate the DoFn state. If this occurs, any buffered
> > outputs are lost.
> >
> > On Wed, Jun 8, 2016 at 8:21 AM, Bobby Evans <[email protected]
> >
> > wrote:
> >
> > > The local java runner does arbitrary batching of 10 elements.
> > >
> > > I'm not sure if flink exposes this or not, but couldn't you use the
> > > checkpoint triggers to also start/finish a bundle?
> > >  - Bobby
> > >
> > >     On Wednesday, June 8, 2016 10:17 AM, Aljoscha Krettek <
> > > [email protected]> wrote:
> > >
> > >
> > >  Ahh, what we could do is artificially induce bundles using either
> count
> > or
> > > processing time or both. Just so that finishBundle() is called once in
> a
> > > while.
> > >
> > > On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek <[email protected]>
> > wrote:
> > >
> > > > Pretty sure, yes. The Iterable in a MapPartitionFunction should give
> > you
> > > > all the values in a given partition.
> > > >
> > > > I checked again for streaming execution. We're doing the opposite,
> > right
> > > > now: every element is a bundle in itself,
> startBundle()/finishBundle()
> > > are
> > > > called for every element which seems a bit wasteful. The only other
> > > option
> > > > is to see all elements as one bundle, because Flink does not
> > bundle/micro
> > > > batch elements in streaming execution.
> > > >
> > > > On Wed, 8 Jun 2016 at 16:38 Bobby Evans <[email protected]
> >
> > > > wrote:
> > > >
> > > >> Are you sure about that for Flink?  I thought the iterable finished
> > when
> > > >> you processed a maximum number of elements or the input queue was
> > empty
> > > so
> > > >> that it could returned control back to akka for better sharing of
> the
> > > >> thread pool.
> > > >>
> > > >>
> > > >>
> > >
> >
> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
> > > >> Also in the javadocs for DoFn.Context it explicitly states that you
> > can
> > > >> emit from the finishBundle method.
> > > >>
> > > >>
> > > >>
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
> > > >> I thought I had seen some example of this being used for batching
> > output
> > > >> to something downstream, like HDFS or Kafka, but I'm not sure on
> that.
> > > If
> > > >> you can emit from finsihBundle and an new instance of the DoFn will
> be
> > > >> created around each bundle then I can see some people trying to do
> > > >> aggregations inside a DoFn and then emitting them at the end of the
> > > bundle
> > > >> knowing that if a batch fails or is rolled back the system will
> handle
> > > it.
> > > >> If that is not allowed we should really update the javadocs around
> it
> > to
> > > >> explain the pitfalls of doing this.
> > > >>  - Bobby
> > > >>
> > > >>    On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <
> > > >> [email protected]> wrote:
> > > >>
> > > >>
> > > >>  Hi,
> > > >> a quick related question: In the Flink runner we basically see
> > > everything
> > > >> as one big bundle, i.e. we call startBundle() once at the beginning
> > and
> > > >> then keep processing indefinitely, never calling finishBundle(). Is
> > this
> > > >> also correct behavior?
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <[email protected]>
> > > wrote:
> > > >>
> > > >> > Hey everyone;
> > > >> >
> > > >> > I'm starting to work on BEAM-38 (
> > > >> > https://issues.apache.org/jira/browse/BEAM-38), which enables an
> > > >> > optimization for runners with many small bundles. BEAM-38 allows
> > > >> runners to
> > > >> > reuse DoFn instances so long as that DoFn has not terminated
> > > abnormally.
> > > >> > This replaces the previous requirement that a DoFn be used for
> only
> > a
> > > >> > single bundle if either of startBundle or finishBundle have been
> > > >> > overwritten.
> > > >> >
> > > >> > DoFn deserialization-per-bundle can be a significance performance
> > > >> > bottleneck when there are many small bundles, as is common in
> > > streaming
> > > >> > executions. It has also surfaced as the cause of much of the
> current
> > > >> > slowness in the new InProcessRunner.
> > > >> >
> > > >> > Existing Runners do not require any changes; they may choose to
> take
> > > >> > advantage of of the new optimization opportunity. However, user
> > DoFns
> > > >> may
> > > >> > need to be revised to properly set up and tear down state in
> > > startBundle
> > > >> > and finishBundle, respectively, if the depended on only being used
> > > for a
> > > >> > single bundle.
> > > >> >
> > > >> > The first two updates are already in pull requests:
> > > >> >
> > > >> > PR #419 (https://github.com/apache/incubator-beam/pull/419)
> updates
> > > the
> > > >> > Javadoc to the new spec
> > > >> > PR #418 (https://github.com/apache/incubator-beam/pull/418)
> updates
> > > the
> > > >> > DirectRunner to reuse DoFns according to the new policy.
> > > >> >
> > > >> > Yours,
> > > >> >
> > > >> > Thomas
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > >
> > >
> > >
> > >
> >
>

Reply via email to