I agree that wrapping the DoFn is probably not the way to go, because the DoFn may be quite tricky due to all the reflective features: e.g. how do you automatically "batch" a DoFn that uses state and timers? What about a DoFn that uses a BoundedWindow parameter? What about a splittable DoFn? What about future reflective features? The class for invoking DoFn's, DoFnInvokers, is absent from the SDK (and present in runners-core) for a good reason.
I'd rather leave the intricacies of invoking DoFn's to runners, and say that you can't wrap DoFn's, period - "adapter", "decorator" and other design patterns just don't apply to DoFn's. The two options for batching are: - A transform that takes elements and produces batches, like Robert said - A simple Beam-agnostic library that takes Java objects and produces batches of Java objects, with an API that makes it convenient to use in a typical batching DoFn On Thu, Jan 26, 2017 at 3:31 PM Ben Chambers <bchamb...@google.com.invalid> wrote: > I think that wrapping the DoFn is tricky -- we backed out > IntraBundleParallelization because it did that, and it has weird > interactions with both the reflective DoFn and windowing. We could maybe > make some kind of "DoFnDelegatingDoFn" that could act as a base class and > get some of that right, but... > > One question I have is whether this batching should be "make batches of N > and if you need to wait for the Nth element do so" or "make batches of at > most N but don't wait too long if you don't get to N". In the former case, > we'll need to do something to buffer elements between bundles -- whether > this is using State or a GroupByKey, etc. In the latter case, the buffering > can happen entirely within a bundle -- if you get to the end of the bundle > and only have 5 elements, even if 5 < N, process that as a batch (rather > than shifting it somewhere else). > > On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw > <rober...@google.com.invalid> > wrote: > > > On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov > > <kirpic...@google.com.invalid> wrote: > > > I don't think we should make batching a core feature of the Beam > > > programming model (by adding it to DoFn as this code snippet implies). > > I'm > > > reasonably sure there are less invasive ways of implementing it. > > > > +1, either as a PTransform<Pc<T>, Pc<O>> or a DoFn<T, O> that > > wraps/delegates to a DoFn<Iterable<T>, Iterable<O>>. > > > > > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré <j...@nanthrax.net > > > > > wrote: > > > > > >> Agree, I'm curious as well. > > >> > > >> I guess it would be something like: > > >> > > >> .apply(ParDo(new DoFn() { > > >> > > >> @Override > > >> public long batchSize() { > > >> return 1000; > > >> } > > >> > > >> @ProcessElement > > >> public void processElement(ProcessContext context) { > > >> ... > > >> } > > >> })); > > >> > > >> If batchSize (overrided by user) returns a positive long, then DoFn > can > > >> batch with this size. > > >> > > >> Regards > > >> JB > > >> > > >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote: > > >> > Hi Etienne, > > >> > > > >> > Could you post some snippets of how your transform is to be used in > a > > >> > pipeline? I think that would make it easier to discuss on this > thread > > and > > >> > could save a lot of churn if the discussion ends up leading to a > > >> different > > >> > API. > > >> > > > >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot < > echauc...@gmail.com > > > > > >> > wrote: > > >> > > > >> >> Wonderful ! > > >> >> > > >> >> Thanks Kenn ! > > >> >> > > >> >> Etienne > > >> >> > > >> >> > > >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : > > >> >>> Hi Etienne, > > >> >>> > > >> >>> I was drafting a proposal about @OnWindowExpiration when this > email > > >> >>> arrived. I thought I would try to quickly unblock you by > responding > > >> with > > >> >> a > > >> >>> TL;DR: you can achieve your goals with state & timers as they > > currently > > >> >>> exist. You'll set a timer for > > >> window.maxTimestamp().plus(allowedLateness) > > >> >>> precisely - when this timer fires, you are guaranteed that the > input > > >> >>> watermark has exceeded this point (so all new data is droppable) > > while > > >> >> the > > >> >>> output timestamp is held to this point (so you can safely output > > into > > >> the > > >> >>> window). > > >> >>> > > >> >>> @OnWindowExpiration is (1) a convenience to save you from needing > a > > >> >> handle > > >> >>> on the allowed lateness (not a problem in your case) and (2) > > actually > > >> >>> meaningful and potentially less expensive to implement in the > > absence > > >> of > > >> >>> state (this is why it needs a design discussion at all, really). > > >> >>> > > >> >>> Caveat: these APIs are new and not supported in every runner and > > >> >> windowing > > >> >>> configuration. > > >> >>> > > >> >>> Kenn > > >> >>> > > >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot < > > echauc...@gmail.com > > >> > > > >> >>> wrote: > > >> >>> > > >> >>>> Hi, > > >> >>>> > > >> >>>> I have started to implement this ticket. For now it is > implemented > > as > > >> a > > >> >>>> PTransform that simply does ParDo.of(new DoFn) and all the > > processing > > >> >>>> related to batching is done in the DoFn. > > >> >>>> > > >> >>>> I'm starting to deal with windows and bundles (starting to take a > > look > > >> >> at > > >> >>>> the State API to process trans-bundles, more questions about this > > to > > >> >> come). > > >> >>>> My comments/questions are inline: > > >> >>>> > > >> >>>> > > >> >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit : > > >> >>>> > > >> >>>>> We should start by understanding the goals. If elements are in > > >> >> different > > >> >>>>> windows can they be out in the same batch? If they have > different > > >> >>>>> timestamps what timestamp should the batch have? > > >> >>>>> > > >> >>>> Regarding timestamps: currently design is as so: the transform > does > > >> not > > >> >>>> group elements in the PCollection, so the "batch" does not exist > > as an > > >> >>>> element in the PCollection. There is only a user defined function > > >> >>>> (perBatchFn) that gets called when batchSize elements have been > > >> >> processed. > > >> >>>> This function takes an ArrayList as parameter. So elements keep > > their > > >> >>>> original timestamps > > >> >>>> > > >> >>>> > > >> >>>> Regarding windowing: I guess that if elements are not in the same > > >> >> window, > > >> >>>> they are not expected to be in the same batch. > > >> >>>> I'm just starting to work on these subjects, so I might lack a > bit > > of > > >> >>>> information; > > >> >>>> what I am currently thinking about is that I need a way to know > in > > the > > >> >>>> DoFn that the window has expired so that I can call the > perBatchFn > > >> even > > >> >> if > > >> >>>> batchSize is not reached. This is the @OnWindowExpiration > callback > > >> that > > >> >>>> Kenneth mentioned in an email about bundles. > > >> >>>> Lets imagine that we have a collection of elements artificially > > >> >>>> timestamped every 10 seconds (for simplicity of the example) and > a > > >> fixed > > >> >>>> windowing of 1 minute. Then each window contains 6 elements. If > we > > >> were > > >> >> to > > >> >>>> buffer the elements by batches of 5 elements, then for each > window > > we > > >> >>>> expect to get 2 batches (one of 5 elements, one of 1 element). > For > > >> that > > >> >> to > > >> >>>> append, we need a @OnWindowExpiration on the DoFn where we call > > >> >> perBatchFn > > >> >>>> > > >> >>>> As a composite transform this will likely require a group by key > > which > > >> >> may > > >> >>>>> affect performance. Maybe within a dofn is better. > > >> >>>>> > > >> >>>> Yes, the processing is done with a DoFn indeed. > > >> >>>> > > >> >>>>> Then it could be some annotation or API that informs the runner. > > >> Should > > >> >>>>> batch sizes be fixed in the annotation (element count or size) > or > > >> >> should > > >> >>>>> the user have some method that lets them decide when to process > a > > >> batch > > >> >>>>> based on the contents? > > >> >>>>> > > >> >>>> For now, the user passes batchSize as an argument to > > BatchParDo.via() > > >> it > > >> >>>> is a number of elements. But batch based on content might be > useful > > >> for > > >> >> the > > >> >>>> user. Give hint to the runner might be more flexible for the > > runner. > > >> >> Thanks. > > >> >>>> > > >> >>>>> Another thing to think about is whether this should be connected > > to > > >> the > > >> >>>>> ability to run parts of the bundle in parallel. > > >> >>>>> > > >> >>>> Yes! > > >> >>>> > > >> >>>>> Maybe each batch is an RPC > > >> >>>>> and you just want to start an async RPC for each batch. Then in > > >> >> addition > > >> >>>>> to > > >> >>>>> start the final RPC in finishBundle, you also need to wait for > all > > >> the > > >> >>>>> RPCs > > >> >>>>> to complete. > > >> >>>>> > > >> >>>> Actually, currently each batch processing is whatever the user > > wants > > >> >>>> (perBatchFn user defined function). If the user decides to issue > an > > >> >> async > > >> >>>> RPC in that function (call with the arrayList of input elements), > > IMHO > > >> >> he > > >> >>>> is responsible for waiting for the response in that method if he > > needs > > >> >> the > > >> >>>> response, but he can also do a send and forget, depending on his > > use > > >> >> case. > > >> >>>> > > >> >>>> Besides, I have also included a perElementFn user function to > allow > > >> the > > >> >>>> user to do some processing on the elements before adding them to > > the > > >> >> batch > > >> >>>> (example use case: convert a String to a DTO object to invoke an > > >> >> external > > >> >>>> service) > > >> >>>> > > >> >>>> Etienne > > >> >>>> > > >> >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot< > echauc...@gmail.com > > > > > >> >>>>> wrote: > > >> >>>>> > > >> >>>>> Hi JB, > > >> >>>>> > > >> >>>>> I meant jira vote but discussion on the ML works also :) > > >> >>>>> > > >> >>>>> As I understand the need (see stackoverflow links in jira > ticket) > > the > > >> >>>>> aim is to avoid the user having to code the batching logic in > his > > own > > >> >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the > > >> >> bundles. > > >> >>>>> For example, possible use case is to batch a call to an external > > >> >> service > > >> >>>>> (for performance). > > >> >>>>> > > >> >>>>> I was thinking about providing a PTransform that implements the > > >> >> batching > > >> >>>>> in its own DoFn and that takes user defined functions for > > >> >> customization. > > >> >>>>> > > >> >>>>> Etienne > > >> >>>>> > > >> >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit : > > >> >>>>> > > >> >>>>>> Hi > > >> >>>>>> > > >> >>>>>> I guess you mean discussion on the mailing list about that, > > right ? > > >> >>>>>> > > >> >>>>>> AFAIR the idea is to provide a utility class to deal with > > >> >>>>>> > > >> >>>>> pooling/batching. However not sure it's required as with > > @StartBundle > > >> >> etc > > >> >>>>> in DoFn and batching depends of the end user "logic". > > >> >>>>> > > >> >>>>>> Regards > > >> >>>>>> JB > > >> >>>>>> > > >> >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne Chauchot< > > >> >> echauc...@gmail.com> > > >> >>>>>> > > >> >>>>> wrote: > > >> >>>>> > > >> >>>>>> Hi all, > > >> >>>>>>> I have started to work on this ticket > > >> >>>>>>> https://issues.apache.org/jira/browse/BEAM-135 > > >> >>>>>>> > > >> >>>>>>> As there where no vote since March 18th, is the issue still > > >> >>>>>>> relevant/needed? > > >> >>>>>>> > > >> >>>>>>> Regards, > > >> >>>>>>> > > >> >>>>>>> Etienne > > >> >>>>>>> > > >> >> > > >> >> > > >> > > > >> > > >> -- > > >> Jean-Baptiste Onofré > > >> jbono...@apache.org > > >> http://blog.nanthrax.net > > >> Talend - http://www.talend.com > > >> > > >