I agree that wrapping the DoFn is probably not the way to go, because the
DoFn may be quite tricky due to all the reflective features: e.g. how do
you automatically "batch" a DoFn that uses state and timers? What about a
DoFn that uses a BoundedWindow parameter? What about a splittable DoFn?
What about future reflective features? The class for invoking DoFn's,
DoFnInvokers, is absent from the SDK (and present in runners-core) for a
good reason.

I'd rather leave the intricacies of invoking DoFn's to runners, and say
that you can't wrap DoFn's, period - "adapter", "decorator" and other
design patterns just don't apply to DoFn's.

The two options for batching are:
- A transform that takes elements and produces batches, like Robert said
- A simple Beam-agnostic library that takes Java objects and produces
batches of Java objects, with an API that makes it convenient to use in a
typical batching DoFn

On Thu, Jan 26, 2017 at 3:31 PM Ben Chambers <bchamb...@google.com.invalid>
wrote:

> I think that wrapping the DoFn is tricky -- we backed out
> IntraBundleParallelization because it did that, and it has weird
> interactions with both the reflective DoFn and windowing. We could maybe
> make some kind of "DoFnDelegatingDoFn" that could act as a base class and
> get some of that right, but...
>
> One question I have is whether this batching should be "make batches of N
> and if you need to wait for the Nth element do so" or "make batches of at
> most N but don't wait too long if you don't get to N". In the former case,
> we'll need to do something to buffer elements between bundles -- whether
> this is using State or a GroupByKey, etc. In the latter case, the buffering
> can happen entirely within a bundle -- if you get to the end of the bundle
> and only have 5 elements, even if 5 < N, process that as a batch (rather
> than shifting it somewhere else).
>
> On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw
> <rober...@google.com.invalid>
> wrote:
>
> > On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
> > <kirpic...@google.com.invalid> wrote:
> > > I don't think we should make batching a core feature of the Beam
> > > programming model (by adding it to DoFn as this code snippet implies).
> > I'm
> > > reasonably sure there are less invasive ways of implementing it.
> >
> > +1, either as a PTransform<Pc<T>, Pc<O>> or a DoFn<T, O> that
> > wraps/delegates to a DoFn<Iterable<T>, Iterable<O>>.
> >
> > > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré <j...@nanthrax.net
> >
> > > wrote:
> > >
> > >> Agree, I'm curious as well.
> > >>
> > >> I guess it would be something like:
> > >>
> > >> .apply(ParDo(new DoFn() {
> > >>
> > >>     @Override
> > >>     public long batchSize() {
> > >>       return 1000;
> > >>     }
> > >>
> > >>     @ProcessElement
> > >>     public void processElement(ProcessContext context) {
> > >>       ...
> > >>     }
> > >> }));
> > >>
> > >> If batchSize (overrided by user) returns a positive long, then DoFn
> can
> > >> batch with this size.
> > >>
> > >> Regards
> > >> JB
> > >>
> > >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
> > >> > Hi Etienne,
> > >> >
> > >> > Could you post some snippets of how your transform is to be used in
> a
> > >> > pipeline? I think that would make it easier to discuss on this
> thread
> > and
> > >> > could save a lot of churn if the discussion ends up leading to a
> > >> different
> > >> > API.
> > >> >
> > >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot <
> echauc...@gmail.com
> > >
> > >> > wrote:
> > >> >
> > >> >> Wonderful !
> > >> >>
> > >> >> Thanks Kenn !
> > >> >>
> > >> >> Etienne
> > >> >>
> > >> >>
> > >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> > >> >>> Hi Etienne,
> > >> >>>
> > >> >>> I was drafting a proposal about @OnWindowExpiration when this
> email
> > >> >>> arrived. I thought I would try to quickly unblock you by
> responding
> > >> with
> > >> >> a
> > >> >>> TL;DR: you can achieve your goals with state & timers as they
> > currently
> > >> >>> exist. You'll set a timer for
> > >> window.maxTimestamp().plus(allowedLateness)
> > >> >>> precisely - when this timer fires, you are guaranteed that the
> input
> > >> >>> watermark has exceeded this point (so all new data is droppable)
> > while
> > >> >> the
> > >> >>> output timestamp is held to this point (so you can safely output
> > into
> > >> the
> > >> >>> window).
> > >> >>>
> > >> >>> @OnWindowExpiration is (1) a convenience to save you from needing
> a
> > >> >> handle
> > >> >>> on the allowed lateness (not a problem in your case) and (2)
> > actually
> > >> >>> meaningful and potentially less expensive to implement in the
> > absence
> > >> of
> > >> >>> state (this is why it needs a design discussion at all, really).
> > >> >>>
> > >> >>> Caveat: these APIs are new and not supported in every runner and
> > >> >> windowing
> > >> >>> configuration.
> > >> >>>
> > >> >>> Kenn
> > >> >>>
> > >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <
> > echauc...@gmail.com
> > >> >
> > >> >>> wrote:
> > >> >>>
> > >> >>>> Hi,
> > >> >>>>
> > >> >>>> I have started to implement this ticket. For now it is
> implemented
> > as
> > >> a
> > >> >>>> PTransform that simply does ParDo.of(new DoFn) and all the
> > processing
> > >> >>>> related to batching is done in the DoFn.
> > >> >>>>
> > >> >>>> I'm starting to deal with windows and bundles (starting to take a
> > look
> > >> >> at
> > >> >>>> the State API to process trans-bundles, more questions about this
> > to
> > >> >> come).
> > >> >>>> My comments/questions are inline:
> > >> >>>>
> > >> >>>>
> > >> >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
> > >> >>>>
> > >> >>>>> We should start by understanding the goals. If elements are in
> > >> >> different
> > >> >>>>> windows can they be out in the same batch? If they have
> different
> > >> >>>>> timestamps what timestamp should the batch have?
> > >> >>>>>
> > >> >>>> Regarding timestamps: currently design is as so: the transform
> does
> > >> not
> > >> >>>> group elements in the PCollection, so the "batch" does not exist
> > as an
> > >> >>>> element in the PCollection. There is only a user defined function
> > >> >>>> (perBatchFn) that gets called when batchSize elements have been
> > >> >> processed.
> > >> >>>> This function takes an ArrayList as parameter. So elements keep
> > their
> > >> >>>> original timestamps
> > >> >>>>
> > >> >>>>
> > >> >>>> Regarding windowing: I guess that if elements are not in the same
> > >> >> window,
> > >> >>>> they are not expected to be in the same batch.
> > >> >>>> I'm just starting to work on these subjects, so I might lack a
> bit
> > of
> > >> >>>> information;
> > >> >>>> what I am currently thinking about is that I need a way to know
> in
> > the
> > >> >>>> DoFn that the window has expired so that I can call the
> perBatchFn
> > >> even
> > >> >> if
> > >> >>>> batchSize is not reached.  This is the @OnWindowExpiration
> callback
> > >> that
> > >> >>>> Kenneth mentioned in an email about bundles.
> > >> >>>> Lets imagine that we have a collection of elements artificially
> > >> >>>> timestamped every 10 seconds (for simplicity of the example) and
> a
> > >> fixed
> > >> >>>> windowing of 1 minute. Then each window contains 6 elements. If
> we
> > >> were
> > >> >> to
> > >> >>>> buffer the elements by batches of 5 elements, then for each
> window
> > we
> > >> >>>> expect to get 2 batches (one of 5 elements, one of 1 element).
> For
> > >> that
> > >> >> to
> > >> >>>> append, we need a @OnWindowExpiration on the DoFn where we call
> > >> >> perBatchFn
> > >> >>>>
> > >> >>>> As a composite transform this will likely require a group by key
> > which
> > >> >> may
> > >> >>>>> affect performance. Maybe within a dofn is better.
> > >> >>>>>
> > >> >>>> Yes, the processing is done with a DoFn indeed.
> > >> >>>>
> > >> >>>>> Then it could be some annotation or API that informs the runner.
> > >> Should
> > >> >>>>> batch sizes be fixed in the annotation (element count or size)
> or
> > >> >> should
> > >> >>>>> the user have some method that lets them decide when to process
> a
> > >> batch
> > >> >>>>> based on the contents?
> > >> >>>>>
> > >> >>>> For now, the user passes batchSize as an argument to
> > BatchParDo.via()
> > >> it
> > >> >>>> is a number of elements. But batch based on content might be
> useful
> > >> for
> > >> >> the
> > >> >>>> user. Give hint to the runner might be more flexible for the
> > runner.
> > >> >> Thanks.
> > >> >>>>
> > >> >>>>> Another thing to think about is whether this should be connected
> > to
> > >> the
> > >> >>>>> ability to run parts of the bundle in parallel.
> > >> >>>>>
> > >> >>>> Yes!
> > >> >>>>
> > >> >>>>> Maybe each batch is an RPC
> > >> >>>>> and you just want to start an async RPC for each batch. Then in
> > >> >> addition
> > >> >>>>> to
> > >> >>>>> start the final RPC in finishBundle, you also need to wait for
> all
> > >> the
> > >> >>>>> RPCs
> > >> >>>>> to complete.
> > >> >>>>>
> > >> >>>> Actually, currently each batch processing is whatever the user
> > wants
> > >> >>>> (perBatchFn user defined function). If the user decides to issue
> an
> > >> >> async
> > >> >>>> RPC in that function (call with the arrayList of input elements),
> > IMHO
> > >> >> he
> > >> >>>> is responsible for waiting for the response in that method if he
> > needs
> > >> >> the
> > >> >>>> response, but he can also do a send and forget, depending on his
> > use
> > >> >> case.
> > >> >>>>
> > >> >>>> Besides, I have also included a perElementFn user function to
> allow
> > >> the
> > >> >>>> user to do some processing on the elements before adding them to
> > the
> > >> >> batch
> > >> >>>> (example use case: convert a String to a DTO object to invoke an
> > >> >> external
> > >> >>>> service)
> > >> >>>>
> > >> >>>> Etienne
> > >> >>>>
> > >> >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<
> echauc...@gmail.com
> > >
> > >> >>>>> wrote:
> > >> >>>>>
> > >> >>>>> Hi JB,
> > >> >>>>>
> > >> >>>>> I meant jira vote but discussion on the ML works also :)
> > >> >>>>>
> > >> >>>>> As I understand the need (see stackoverflow links in jira
> ticket)
> > the
> > >> >>>>> aim is to avoid the user having to code the batching logic in
> his
> > own
> > >> >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the
> > >> >> bundles.
> > >> >>>>> For example, possible use case is to batch a call to an external
> > >> >> service
> > >> >>>>> (for performance).
> > >> >>>>>
> > >> >>>>> I was thinking about providing a PTransform that implements the
> > >> >> batching
> > >> >>>>> in its own DoFn and that takes user defined functions for
> > >> >> customization.
> > >> >>>>>
> > >> >>>>> Etienne
> > >> >>>>>
> > >> >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
> > >> >>>>>
> > >> >>>>>> Hi
> > >> >>>>>>
> > >> >>>>>> I guess you mean discussion on the mailing list about that,
> > right ?
> > >> >>>>>>
> > >> >>>>>> AFAIR the ide⁣a is to provide a utility class to deal with
> > >> >>>>>>
> > >> >>>>> pooling/batching. However not sure it's required as with
> > @StartBundle
> > >> >> etc
> > >> >>>>> in DoFn and batching depends of the end user "logic".
> > >> >>>>>
> > >> >>>>>> Regards
> > >> >>>>>> JB
> > >> >>>>>>
> > >> >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne Chauchot<
> > >> >> echauc...@gmail.com>
> > >> >>>>>>
> > >> >>>>> wrote:
> > >> >>>>>
> > >> >>>>>> Hi all,
> > >> >>>>>>> I have started to work on this ticket
> > >> >>>>>>> https://issues.apache.org/jira/browse/BEAM-135
> > >> >>>>>>>
> > >> >>>>>>> As there where no vote since March 18th, is the issue still
> > >> >>>>>>> relevant/needed?
> > >> >>>>>>>
> > >> >>>>>>> Regards,
> > >> >>>>>>>
> > >> >>>>>>> Etienne
> > >> >>>>>>>
> > >> >>
> > >> >>
> > >> >
> > >>
> > >> --
> > >> Jean-Baptiste Onofré
> > >> jbono...@apache.org
> > >> http://blog.nanthrax.net
> > >> Talend - http://www.talend.com
> > >>
> >
>

Reply via email to