2017-11-16 12:18 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> On Wed, Nov 15, 2017 at 9:16 PM, Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> @Reuven: it looks like a good workaround
>> @Ken: thks a lot for the link!
>>
>> @all:
>>
>> 1. do you think it is doable without windowing usage (to have
>> something more reliable in term of runner since it will depend on less
>> primitives?
>>
>
> This depends on trigggering, not on windowing. Triggering is a pretty core
> component of the model - no unbounded inputs can be processed at all
> without trigggering. "Checkpointing" is a harder thing to pin down, as it
> means different things to different runners (e.g. "checkpointing" in Flink
> means something very different than in Datafalow and different than in
> Spark).

My bad, sorry about that semantic abuse. The point is however still
here. Several IO (all) are trigger independent so I wonder if we want
to couple them for a "core feature" (chunking is needed in almost all
IO).

>
>
>
>> 2. what about allowing the user to define when to checkpoint?
>>
>
> As I mentioned, "checkpoint" is sometimes an ill-defined operation,
> especially across different runners . Instead I think it's better to have
> an annotation that defines the semantics you want (e.g. stable replay), and
> let the runner decide how to implement it (possibly by checkpointing).

Issue with that solution is it doesn't scale and requires an
implementation per semantic instead of relying on a single primitive
for all cases. Because of the runner design and portability goal of
beam we should probably avoid that, no?

>
> 3. can we get this kind of "composite" pattern in the beam core?
>>
>
> I don't see why not. Though we first need to get @StableReplay implemented.
>
>
>>
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 14:12 GMT+01:00 Kenneth Knowles <k...@google.com.invalid>:
>> > In case the connection is not clear to folks on this thread, I pinged the
>> > thread on @StableReplay / @RequiresStableInput / etc and opened a draft
>> PR
>> > at https://github.com/apache/beam/pull/4135.
>> >
>> > On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax <re...@google.com.invalid>
>> > wrote:
>> >
>> >> so I think the following will do exactly that and can be easily factored
>> >> into a reusable transform (modulo Java type boilerplate):
>> >>
>> >> pCollection.apply(WithKeys.of((Element e) ->
>> >> ThreadLocalRandom.current().nextInt(N))
>> >>                   .apply(Window.into(new GlobalWindows())
>> >>
>> >> .triggering(AfterWatermark.pastEndOfWindow().
>> withEarlyFirings(AfterPane.
>> >> elementCountAtLeast(100))))
>> >>                   .apply(GroupByKey.create())
>> >>                   .apply(ParDo.of(new DoFn<>() {
>> >>                       @ProcessElement
>> >>                       @StableReplay
>> >>                        public void processElement(ProcessContext c) {
>> >>                          // Insert c.element().getValue() into backend.
>> >>                        }
>> >>                    });
>> >>
>> >> On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau <
>> rmannibu...@gmail.com
>> >> >
>> >> wrote:
>> >>
>> >> > 2017-11-15 11:42 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
>> >> > > Can we describe this at a higher level?
>> >> > >
>> >> > > I think what you want is the following. Please correct if I'm
>> >> > > misunderstanding.
>> >> > >
>> >> > > Batches of 100 elements (is this a hard requirement, or do they
>> have to
>> >> > be
>> >> > > "approximately" 100 element?)
>> >> >
>> >> > Approximately is fine while documented (what is not is 1000000 instead
>> >> > of 10 for instance)
>> >> >
>> >> > >
>> >> > > Once you see a batch, you're guaranteed to see the same batch on
>> >> retries.
>> >> >
>> >> > +1
>> >> >
>> >> > >
>> >> > > You want to then idempotently insert this batch into some backend.
>> >> Things
>> >> > > may fail, workers may crash, but in that case you want to get the
>> exact
>> >> > > same batch back so you can insert it again.
>> >> >
>> >> > +1
>> >> >
>> >> > >
>> >> > > Do you care about ordering? On failure do you have to see the same
>> >> > batches
>> >> > > in the same order as before, or is it sufficient to see the same
>> >> batches?
>> >> >
>> >> > Beam doesnt everywhere so I guess it is not important - at least for
>> >> > my cases this statement is true.
>> >> >
>> >> > >
>> >> > > Reuven
>> >> > >
>> >> > > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
>> >> > rmannibu...@gmail.com>
>> >> > > wrote:
>> >> > >
>> >> > >> Overall goal is to ensure each 100 elements max, a "backend" (as
>> >> > >> datastore) flush/commit/push is done and is aligned with beam
>> >> > >> checkpoints. You can see it as bringing the "general"
>> commit-interval
>> >> > >> notion to beam and kind of get rid of the bundle notion which is
>> >> > >> almost impossible to use today.
>> >> > >>
>> >> > >> Romain Manni-Bucau
>> >> > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> > >>
>> >> > >>
>> >> > >> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
>> >> > >> > It's in the dev list archives, not sure if there's a doc yet.
>> >> > >> >
>> >> > >> > I'm not quite sure I understand what you mean by a "flush" Can
>> you
>> >> > >> describe
>> >> > >> > the problem you're trying to solve?
>> >> > >> >
>> >> > >> > Reuven
>> >> > >> >
>> >> > >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
>> >> > >> rmannibu...@gmail.com>
>> >> > >> > wrote:
>> >> > >> >
>> >> > >> >> Hmm, I didn't find the doc - if you have the link not far it
>> would
>> >> be
>> >> > >> >> appreciated - but "before" sounds not enough, it should be
>> "after"
>> >> in
>> >> > >> >> case there was a "flush" no?
>> >> > >> >>
>> >> > >> >> Romain Manni-Bucau
>> >> > >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> > >> >>
>> >> > >> >>
>> >> > >> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid
>> >:
>> >> > >> >> > If you set @StableReplay before a ParDo, it forces a
>> checkpoint
>> >> > before
>> >> > >> >> that
>> >> > >> >> > ParDo.
>> >> > >> >> >
>> >> > >> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
>> >> > >> >> rmannibu...@gmail.com>
>> >> > >> >> > wrote:
>> >> > >> >> >
>> >> > >> >> >> It sounds a good start. I'm not sure how a group by key (and
>> not
>> >> > by
>> >> > >> >> >> size) can help controlling the checkpointing interval.
>> Wonder if
>> >> > we
>> >> > >> >> >> shouldn't be able to have a CheckpointPolicy { boolean
>> >> > >> >> >> shouldCheckpoint() } used in the processing event loop.
>> Default
>> >> > could
>> >> > >> >> >> be up to the runner but if set on the transform (or dofn) it
>> >> > would be
>> >> > >> >> >> used to control when the checkpoint is done. Thinking out
>> loud
>> >> it
>> >> > >> >> >> sounds close to jbatch checkpoint algorithm
>> >> > >> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
>> >> > >> >> >> chunk/CheckpointAlgorithm.html)
>> >> > >> >> >>
>> >> > >> >> >> Romain Manni-Bucau
>> >> > >> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> > >> >> >>
>> >> > >> >> >>
>> >> > >> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <
>> j...@nanthrax.net
>> >> >:
>> >> > >> >> >> > Yes, @StableReplay, that's the annotation. Thanks.
>> >> > >> >> >> >
>> >> > >> >> >> >
>> >> > >> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
>> >> > >> >> >> >>
>> >> > >> >> >> >> Romain,
>> >> > >> >> >> >>
>> >> > >> >> >> >> I think the @StableReplay semantic that Kenn proposed a
>> month
>> >> > or
>> >> > >> so
>> >> > >> >> ago
>> >> > >> >> >> is
>> >> > >> >> >> >> what is needed here.
>> >> > >> >> >> >>
>> >> > >> >> >> >> Essentially it will ensure that the GroupByKey iterable is
>> >> > stable
>> >> > >> and
>> >> > >> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed
>> to
>> >> > >> receive
>> >> > >> >> the
>> >> > >> >> >> >> exact same iterable as it did before. The annotation can
>> be
>> >> put
>> >> > >> on a
>> >> > >> >> >> ParDo
>> >> > >> >> >> >> as well, in which case it ensures stability (and
>> >> > checkpointing) of
>> >> > >> >> the
>> >> > >> >> >> >> individual ParDo elements.
>> >> > >> >> >> >>
>> >> > >> >> >> >> Reuven
>> >> > >> >> >> >>
>> >> > >> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
>> >> > >> >> >> >> <rmannibu...@gmail.com>
>> >> > >> >> >> >> wrote:
>> >> > >> >> >> >>
>> >> > >> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <
>> >> > j...@nanthrax.net
>> >> > >> >:
>> >> > >> >> >> >>>>
>> >> > >> >> >> >>>> Hi Romain,
>> >> > >> >> >> >>>>
>> >> > >> >> >> >>>> You are right: currently, the chunking is related to
>> >> bundles.
>> >> > >> >> Today,
>> >> > >> >> >> the
>> >> > >> >> >> >>>> bundle size is under the runner responsibility.
>> >> > >> >> >> >>>>
>> >> > >> >> >> >>>> I think it's fine because only the runner know an
>> efficient
>> >> > >> bundle
>> >> > >> >> >> size.
>> >> > >> >> >> >>>
>> >> > >> >> >> >>> I'm
>> >> > >> >> >> >>>>
>> >> > >> >> >> >>>> afraid giving the "control" of the bundle size to the
>> end
>> >> > user
>> >> > >> (via
>> >> > >> >> >> >>>> pipeline) can result to huge performances issue
>> depending
>> >> of
>> >> > the
>> >> > >> >> >> runner.
>> >> > >> >> >> >>>>
>> >> > >> >> >> >>>> It doesn't mean that we can't use an uber layer: it's
>> what
>> >> we
>> >> > >> do in
>> >> > >> >> >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch
>> >> size.
>> >> > >> >> >> >>>>
>> >> > >> >> >> >>>> Anyway, the core problem is about the checkpoint: why a
>> >> > >> checkpoint
>> >> > >> >> is
>> >> > >> >> >> >>>> not
>> >> > >> >> >> >>>> "respected" by an IO or runner ?
>> >> > >> >> >> >>>
>> >> > >> >> >> >>>
>> >> > >> >> >> >>>
>> >> > >> >> >> >>> Take the example of a runner deciding the bundle size is
>> 4
>> >> and
>> >> > >> the
>> >> > >> >> IO
>> >> > >> >> >> >>> deciding the commit-interval (batch semantic) is 2, what
>> >> > happens
>> >> > >> if
>> >> > >> >> >> >>> the 3rd record fails? You have pushed to the store 2
>> records
>> >> > >> which
>> >> > >> >> can
>> >> > >> >> >> >>> be reprocessed by a restart of the bundle and you can get
>> >> > >> >> duplicates.
>> >> > >> >> >> >>>
>> >> > >> >> >> >>> Rephrased: I think we need as a framework a batch/chunk
>> >> > solution
>> >> > >> >> which
>> >> > >> >> >> >>> is reliable. I understand bundles is mapped on the runner
>> >> and
>> >> > not
>> >> > >> >> >> >>> really controlled but can we get something more reliable
>> for
>> >> > the
>> >> > >> >> user?
>> >> > >> >> >> >>> Maybe we need a @BeforeBatch or something like that.
>> >> > >> >> >> >>>
>> >> > >> >> >> >>>>
>> >> > >> >> >> >>>> Regards
>> >> > >> >> >> >>>> JB
>> >> > >> >> >> >>>>
>> >> > >> >> >> >>>>
>> >> > >> >> >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
>> >> > >> >> >> >>>>>
>> >> > >> >> >> >>>>>
>> >> > >> >> >> >>>>> Hi guys,
>> >> > >> >> >> >>>>>
>> >> > >> >> >> >>>>> The subject is a bit provocative but the topic is real
>> and
>> >> > >> coming
>> >> > >> >> >> >>>>> again and again with the beam usage: how a dofn can
>> handle
>> >> > some
>> >> > >> >> >> >>>>> "chunking".
>> >> > >> >> >> >>>>>
>> >> > >> >> >> >>>>> The need is to be able to commit each N records but
>> with N
>> >> > not
>> >> > >> too
>> >> > >> >> >> big.
>> >> > >> >> >> >>>>>
>> >> > >> >> >> >>>>> The natural API for that in beam is the bundle one but
>> >> > bundles
>> >> > >> are
>> >> > >> >> >> not
>> >> > >> >> >> >>>>> reliable since they can be very small (flink) - we can
>> say
>> >> > it
>> >> > >> is
>> >> > >> >> "ok"
>> >> > >> >> >> >>>>> even if it has some perf impacts - or too big (spark
>> does
>> >> > full
>> >> > >> >> size /
>> >> > >> >> >> >>>>> #workers).
>> >> > >> >> >> >>>>>
>> >> > >> >> >> >>>>> The workaround is what we see in the ES I/O: a maxSize
>> >> which
>> >> > >> does
>> >> > >> >> an
>> >> > >> >> >> >>>>> eager flush. The issue is that then the checkpoint is
>> not
>> >> > >> >> respected
>> >> > >> >> >> >>>>> and you can process multiple times the same records.
>> >> > >> >> >> >>>>>
>> >> > >> >> >> >>>>> Any plan to make this API reliable and controllable
>> from a
>> >> > beam
>> >> > >> >> point
>> >> > >> >> >> >>>>> of view (at least in a max manner)?
>> >> > >> >> >> >>>>>
>> >> > >> >> >> >>>>> Thanks,
>> >> > >> >> >> >>>>> Romain Manni-Bucau
>> >> > >> >> >> >>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> > >> >> >> >>>>>
>> >> > >> >> >> >>>>
>> >> > >> >> >> >>>> --
>> >> > >> >> >> >>>> Jean-Baptiste Onofré
>> >> > >> >> >> >>>> jbono...@apache.org
>> >> > >> >> >> >>>> http://blog.nanthrax.net
>> >> > >> >> >> >>>> Talend - http://www.talend.com
>> >> > >> >> >> >>>
>> >> > >> >> >> >>>
>> >> > >> >> >> >>
>> >> > >> >> >> >
>> >> > >> >> >> > --
>> >> > >> >> >> > Jean-Baptiste Onofré
>> >> > >> >> >> > jbono...@apache.org
>> >> > >> >> >> > http://blog.nanthrax.net
>> >> > >> >> >> > Talend - http://www.talend.com
>> >> > >> >> >>
>> >> > >> >>
>> >> > >>
>> >> >
>> >>
>>

Reply via email to