I think that wrapping the DoFn is tricky -- we backed out
IntraBundleParallelization because it did that, and it has weird
interactions with both the reflective DoFn and windowing. We could maybe
make some kind of "DoFnDelegatingDoFn" that could act as a base class and
get some of that right, but...

One question I have is whether this batching should be "make batches of N
and if you need to wait for the Nth element do so" or "make batches of at
most N but don't wait too long if you don't get to N". In the former case,
we'll need to do something to buffer elements between bundles -- whether
this is using State or a GroupByKey, etc. In the latter case, the buffering
can happen entirely within a bundle -- if you get to the end of the bundle
and only have 5 elements, even if 5 < N, process that as a batch (rather
than shifting it somewhere else).

On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw <rober...@google.com.invalid>
wrote:

> On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
> <kirpic...@google.com.invalid> wrote:
> > I don't think we should make batching a core feature of the Beam
> > programming model (by adding it to DoFn as this code snippet implies).
> I'm
> > reasonably sure there are less invasive ways of implementing it.
>
> +1, either as a PTransform<Pc<T>, Pc<O>> or a DoFn<T, O> that
> wraps/delegates to a DoFn<Iterable<T>, Iterable<O>>.
>
> > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré <j...@nanthrax.net>
> > wrote:
> >
> >> Agree, I'm curious as well.
> >>
> >> I guess it would be something like:
> >>
> >> .apply(ParDo(new DoFn() {
> >>
> >>     @Override
> >>     public long batchSize() {
> >>       return 1000;
> >>     }
> >>
> >>     @ProcessElement
> >>     public void processElement(ProcessContext context) {
> >>       ...
> >>     }
> >> }));
> >>
> >> If batchSize (overrided by user) returns a positive long, then DoFn can
> >> batch with this size.
> >>
> >> Regards
> >> JB
> >>
> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
> >> > Hi Etienne,
> >> >
> >> > Could you post some snippets of how your transform is to be used in a
> >> > pipeline? I think that would make it easier to discuss on this thread
> and
> >> > could save a lot of churn if the discussion ends up leading to a
> >> different
> >> > API.
> >> >
> >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot <echauc...@gmail.com
> >
> >> > wrote:
> >> >
> >> >> Wonderful !
> >> >>
> >> >> Thanks Kenn !
> >> >>
> >> >> Etienne
> >> >>
> >> >>
> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> >> >>> Hi Etienne,
> >> >>>
> >> >>> I was drafting a proposal about @OnWindowExpiration when this email
> >> >>> arrived. I thought I would try to quickly unblock you by responding
> >> with
> >> >> a
> >> >>> TL;DR: you can achieve your goals with state & timers as they
> currently
> >> >>> exist. You'll set a timer for
> >> window.maxTimestamp().plus(allowedLateness)
> >> >>> precisely - when this timer fires, you are guaranteed that the input
> >> >>> watermark has exceeded this point (so all new data is droppable)
> while
> >> >> the
> >> >>> output timestamp is held to this point (so you can safely output
> into
> >> the
> >> >>> window).
> >> >>>
> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
> >> >> handle
> >> >>> on the allowed lateness (not a problem in your case) and (2)
> actually
> >> >>> meaningful and potentially less expensive to implement in the
> absence
> >> of
> >> >>> state (this is why it needs a design discussion at all, really).
> >> >>>
> >> >>> Caveat: these APIs are new and not supported in every runner and
> >> >> windowing
> >> >>> configuration.
> >> >>>
> >> >>> Kenn
> >> >>>
> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <
> echauc...@gmail.com
> >> >
> >> >>> wrote:
> >> >>>
> >> >>>> Hi,
> >> >>>>
> >> >>>> I have started to implement this ticket. For now it is implemented
> as
> >> a
> >> >>>> PTransform that simply does ParDo.of(new DoFn) and all the
> processing
> >> >>>> related to batching is done in the DoFn.
> >> >>>>
> >> >>>> I'm starting to deal with windows and bundles (starting to take a
> look
> >> >> at
> >> >>>> the State API to process trans-bundles, more questions about this
> to
> >> >> come).
> >> >>>> My comments/questions are inline:
> >> >>>>
> >> >>>>
> >> >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
> >> >>>>
> >> >>>>> We should start by understanding the goals. If elements are in
> >> >> different
> >> >>>>> windows can they be out in the same batch? If they have different
> >> >>>>> timestamps what timestamp should the batch have?
> >> >>>>>
> >> >>>> Regarding timestamps: currently design is as so: the transform does
> >> not
> >> >>>> group elements in the PCollection, so the "batch" does not exist
> as an
> >> >>>> element in the PCollection. There is only a user defined function
> >> >>>> (perBatchFn) that gets called when batchSize elements have been
> >> >> processed.
> >> >>>> This function takes an ArrayList as parameter. So elements keep
> their
> >> >>>> original timestamps
> >> >>>>
> >> >>>>
> >> >>>> Regarding windowing: I guess that if elements are not in the same
> >> >> window,
> >> >>>> they are not expected to be in the same batch.
> >> >>>> I'm just starting to work on these subjects, so I might lack a bit
> of
> >> >>>> information;
> >> >>>> what I am currently thinking about is that I need a way to know in
> the
> >> >>>> DoFn that the window has expired so that I can call the perBatchFn
> >> even
> >> >> if
> >> >>>> batchSize is not reached.  This is the @OnWindowExpiration callback
> >> that
> >> >>>> Kenneth mentioned in an email about bundles.
> >> >>>> Lets imagine that we have a collection of elements artificially
> >> >>>> timestamped every 10 seconds (for simplicity of the example) and a
> >> fixed
> >> >>>> windowing of 1 minute. Then each window contains 6 elements. If we
> >> were
> >> >> to
> >> >>>> buffer the elements by batches of 5 elements, then for each window
> we
> >> >>>> expect to get 2 batches (one of 5 elements, one of 1 element). For
> >> that
> >> >> to
> >> >>>> append, we need a @OnWindowExpiration on the DoFn where we call
> >> >> perBatchFn
> >> >>>>
> >> >>>> As a composite transform this will likely require a group by key
> which
> >> >> may
> >> >>>>> affect performance. Maybe within a dofn is better.
> >> >>>>>
> >> >>>> Yes, the processing is done with a DoFn indeed.
> >> >>>>
> >> >>>>> Then it could be some annotation or API that informs the runner.
> >> Should
> >> >>>>> batch sizes be fixed in the annotation (element count or size) or
> >> >> should
> >> >>>>> the user have some method that lets them decide when to process a
> >> batch
> >> >>>>> based on the contents?
> >> >>>>>
> >> >>>> For now, the user passes batchSize as an argument to
> BatchParDo.via()
> >> it
> >> >>>> is a number of elements. But batch based on content might be useful
> >> for
> >> >> the
> >> >>>> user. Give hint to the runner might be more flexible for the
> runner.
> >> >> Thanks.
> >> >>>>
> >> >>>>> Another thing to think about is whether this should be connected
> to
> >> the
> >> >>>>> ability to run parts of the bundle in parallel.
> >> >>>>>
> >> >>>> Yes!
> >> >>>>
> >> >>>>> Maybe each batch is an RPC
> >> >>>>> and you just want to start an async RPC for each batch. Then in
> >> >> addition
> >> >>>>> to
> >> >>>>> start the final RPC in finishBundle, you also need to wait for all
> >> the
> >> >>>>> RPCs
> >> >>>>> to complete.
> >> >>>>>
> >> >>>> Actually, currently each batch processing is whatever the user
> wants
> >> >>>> (perBatchFn user defined function). If the user decides to issue an
> >> >> async
> >> >>>> RPC in that function (call with the arrayList of input elements),
> IMHO
> >> >> he
> >> >>>> is responsible for waiting for the response in that method if he
> needs
> >> >> the
> >> >>>> response, but he can also do a send and forget, depending on his
> use
> >> >> case.
> >> >>>>
> >> >>>> Besides, I have also included a perElementFn user function to allow
> >> the
> >> >>>> user to do some processing on the elements before adding them to
> the
> >> >> batch
> >> >>>> (example use case: convert a String to a DTO object to invoke an
> >> >> external
> >> >>>> service)
> >> >>>>
> >> >>>> Etienne
> >> >>>>
> >> >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com
> >
> >> >>>>> wrote:
> >> >>>>>
> >> >>>>> Hi JB,
> >> >>>>>
> >> >>>>> I meant jira vote but discussion on the ML works also :)
> >> >>>>>
> >> >>>>> As I understand the need (see stackoverflow links in jira ticket)
> the
> >> >>>>> aim is to avoid the user having to code the batching logic in his
> own
> >> >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the
> >> >> bundles.
> >> >>>>> For example, possible use case is to batch a call to an external
> >> >> service
> >> >>>>> (for performance).
> >> >>>>>
> >> >>>>> I was thinking about providing a PTransform that implements the
> >> >> batching
> >> >>>>> in its own DoFn and that takes user defined functions for
> >> >> customization.
> >> >>>>>
> >> >>>>> Etienne
> >> >>>>>
> >> >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
> >> >>>>>
> >> >>>>>> Hi
> >> >>>>>>
> >> >>>>>> I guess you mean discussion on the mailing list about that,
> right ?
> >> >>>>>>
> >> >>>>>> AFAIR the ide⁣a is to provide a utility class to deal with
> >> >>>>>>
> >> >>>>> pooling/batching. However not sure it's required as with
> @StartBundle
> >> >> etc
> >> >>>>> in DoFn and batching depends of the end user "logic".
> >> >>>>>
> >> >>>>>> Regards
> >> >>>>>> JB
> >> >>>>>>
> >> >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne Chauchot<
> >> >> echauc...@gmail.com>
> >> >>>>>>
> >> >>>>> wrote:
> >> >>>>>
> >> >>>>>> Hi all,
> >> >>>>>>> I have started to work on this ticket
> >> >>>>>>> https://issues.apache.org/jira/browse/BEAM-135
> >> >>>>>>>
> >> >>>>>>> As there where no vote since March 18th, is the issue still
> >> >>>>>>> relevant/needed?
> >> >>>>>>>
> >> >>>>>>> Regards,
> >> >>>>>>>
> >> >>>>>>> Etienne
> >> >>>>>>>
> >> >>
> >> >>
> >> >
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbono...@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >>
>

Reply via email to