Hi Etienne, Could you post some snippets of how your transform is to be used in a pipeline? I think that would make it easier to discuss on this thread and could save a lot of churn if the discussion ends up leading to a different API.
On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot <echauc...@gmail.com> wrote: > Wonderful ! > > Thanks Kenn ! > > Etienne > > > Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : > > Hi Etienne, > > > > I was drafting a proposal about @OnWindowExpiration when this email > > arrived. I thought I would try to quickly unblock you by responding with > a > > TL;DR: you can achieve your goals with state & timers as they currently > > exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness) > > precisely - when this timer fires, you are guaranteed that the input > > watermark has exceeded this point (so all new data is droppable) while > the > > output timestamp is held to this point (so you can safely output into the > > window). > > > > @OnWindowExpiration is (1) a convenience to save you from needing a > handle > > on the allowed lateness (not a problem in your case) and (2) actually > > meaningful and potentially less expensive to implement in the absence of > > state (this is why it needs a design discussion at all, really). > > > > Caveat: these APIs are new and not supported in every runner and > windowing > > configuration. > > > > Kenn > > > > On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <echauc...@gmail.com> > > wrote: > > > >> Hi, > >> > >> I have started to implement this ticket. For now it is implemented as a > >> PTransform that simply does ParDo.of(new DoFn) and all the processing > >> related to batching is done in the DoFn. > >> > >> I'm starting to deal with windows and bundles (starting to take a look > at > >> the State API to process trans-bundles, more questions about this to > come). > >> My comments/questions are inline: > >> > >> > >> Le 17/01/2017 à 18:41, Ben Chambers a écrit : > >> > >>> We should start by understanding the goals. If elements are in > different > >>> windows can they be out in the same batch? If they have different > >>> timestamps what timestamp should the batch have? > >>> > >> Regarding timestamps: currently design is as so: the transform does not > >> group elements in the PCollection, so the "batch" does not exist as an > >> element in the PCollection. There is only a user defined function > >> (perBatchFn) that gets called when batchSize elements have been > processed. > >> This function takes an ArrayList as parameter. So elements keep their > >> original timestamps > >> > >> > >> Regarding windowing: I guess that if elements are not in the same > window, > >> they are not expected to be in the same batch. > >> I'm just starting to work on these subjects, so I might lack a bit of > >> information; > >> what I am currently thinking about is that I need a way to know in the > >> DoFn that the window has expired so that I can call the perBatchFn even > if > >> batchSize is not reached. This is the @OnWindowExpiration callback that > >> Kenneth mentioned in an email about bundles. > >> Lets imagine that we have a collection of elements artificially > >> timestamped every 10 seconds (for simplicity of the example) and a fixed > >> windowing of 1 minute. Then each window contains 6 elements. If we were > to > >> buffer the elements by batches of 5 elements, then for each window we > >> expect to get 2 batches (one of 5 elements, one of 1 element). For that > to > >> append, we need a @OnWindowExpiration on the DoFn where we call > perBatchFn > >> > >> As a composite transform this will likely require a group by key which > may > >>> affect performance. Maybe within a dofn is better. > >>> > >> Yes, the processing is done with a DoFn indeed. > >> > >>> Then it could be some annotation or API that informs the runner. Should > >>> batch sizes be fixed in the annotation (element count or size) or > should > >>> the user have some method that lets them decide when to process a batch > >>> based on the contents? > >>> > >> For now, the user passes batchSize as an argument to BatchParDo.via() it > >> is a number of elements. But batch based on content might be useful for > the > >> user. Give hint to the runner might be more flexible for the runner. > Thanks. > >> > >>> Another thing to think about is whether this should be connected to the > >>> ability to run parts of the bundle in parallel. > >>> > >> Yes! > >> > >>> Maybe each batch is an RPC > >>> and you just want to start an async RPC for each batch. Then in > addition > >>> to > >>> start the final RPC in finishBundle, you also need to wait for all the > >>> RPCs > >>> to complete. > >>> > >> Actually, currently each batch processing is whatever the user wants > >> (perBatchFn user defined function). If the user decides to issue an > async > >> RPC in that function (call with the arrayList of input elements), IMHO > he > >> is responsible for waiting for the response in that method if he needs > the > >> response, but he can also do a send and forget, depending on his use > case. > >> > >> Besides, I have also included a perElementFn user function to allow the > >> user to do some processing on the elements before adding them to the > batch > >> (example use case: convert a String to a DTO object to invoke an > external > >> service) > >> > >> Etienne > >> > >> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com> > >>> wrote: > >>> > >>> Hi JB, > >>> > >>> I meant jira vote but discussion on the ML works also :) > >>> > >>> As I understand the need (see stackoverflow links in jira ticket) the > >>> aim is to avoid the user having to code the batching logic in his own > >>> DoFn.processElement() and DoFn.finishBundle() regardless of the > bundles. > >>> For example, possible use case is to batch a call to an external > service > >>> (for performance). > >>> > >>> I was thinking about providing a PTransform that implements the > batching > >>> in its own DoFn and that takes user defined functions for > customization. > >>> > >>> Etienne > >>> > >>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit : > >>> > >>>> Hi > >>>> > >>>> I guess you mean discussion on the mailing list about that, right ? > >>>> > >>>> AFAIR the idea is to provide a utility class to deal with > >>>> > >>> pooling/batching. However not sure it's required as with @StartBundle > etc > >>> in DoFn and batching depends of the end user "logic". > >>> > >>>> Regards > >>>> JB > >>>> > >>>> On Jan 17, 2017, 08:26, at 08:26, Etienne Chauchot< > echauc...@gmail.com> > >>>> > >>> wrote: > >>> > >>>> Hi all, > >>>>> I have started to work on this ticket > >>>>> https://issues.apache.org/jira/browse/BEAM-135 > >>>>> > >>>>> As there where no vote since March 18th, is the issue still > >>>>> relevant/needed? > >>>>> > >>>>> Regards, > >>>>> > >>>>> Etienne > >>>>> > >