On Wed, Feb 8, 2017 at 10:48 AM, Kenneth Knowles <k...@google.com.invalid> wrote:
> Hi Etienne, > > If the timer is firing n times for n elements, that's a bug in the runner / > shared runner code. It should be deduped. Which runner? Can you file a JIRA > against me to investigate? I'm still in the process of fleshing out more > and more RunnableOnService (aka ValidatesRunner) tests so I will surely add > one (existing tests already OOMed without deduping, so it wasn't at the top > of my priority list) > > If the end user doesn't have a natural key, I would just add one and remove > it within your transform. Not sure how easy this will be - you might need > user intervention. Of course, you still do need to shard or you'll be > processing the whole PCollection serially. > We should also provide the algorithm without using timers/state, as adding (then sharding by) the key is likely to be expensive. > On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > > > Hi > > > > AFAIR the timer per function is in the "roadmap" (remembering discussion > > we had with Kenn). > > > > I will take a deeper look next week on your branch. > > > > Regards > > JB > > > > On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot <echauc...@gmail.com> > > wrote: > > >Hi Kenn, > > > > > >I have started using state and timer APIs, they seem awesome! > > > > > >Please take a look at > > >https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO > > > > > >It contains a PTransform that does the batching trans-bundles and > > >respecting the windows (even if tests are not finished yet, see @Ignore > > > > > >and TODOs) > > > > > > I have some questions: > > > > > >- I use the timer to detect the end of the window like you suggested. > > >But the timer can only be set in @ProcessElement and @Ontimer. Javadoc > > >says that timers are implicitly scoped to a key/window and that a timer > > > > > >can be set only for a single time per scope. I noticed that if I call > > >timer.setForNowPlus in the @ProcessElement method, it seems that the > > >timer is set n times for n elements. So I just created a state with > > >boolean to prevent setting the timer more than once per key/window. > > >=> Would it be good maybe to have a end user way of indicating that the > > > > > >timer will be set only once per key/window. Something analogous to > > >@Setup, to avoid the user having to use a state boolean? > > > > > >- I understand that state and timers need to be per-key, but if the end > > > > > >user does not need a key (lets say he just needs a > > >PCollection<String>). > > >Then, do we tell him to use a PCollection<KV> anyway like I wrote in > > >the > > >javadoc of BatchingParDo? > > > > > >WDYT? > > > > > >Thanks, > > > > > >Etienne > > > > > > > > >Le 26/01/2017 à 17:28, Etienne Chauchot a écrit : > > >> Wonderful ! > > >> > > >> Thanks Kenn ! > > >> > > >> Etienne > > >> > > >> > > >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : > > >>> Hi Etienne, > > >>> > > >>> I was drafting a proposal about @OnWindowExpiration when this email > > >>> arrived. I thought I would try to quickly unblock you by responding > > >>> with a > > >>> TL;DR: you can achieve your goals with state & timers as they > > >currently > > >>> exist. You'll set a timer for > > >>> window.maxTimestamp().plus(allowedLateness) > > >>> precisely - when this timer fires, you are guaranteed that the input > > >>> watermark has exceeded this point (so all new data is droppable) > > >>> while the > > >>> output timestamp is held to this point (so you can safely output > > >into > > >>> the > > >>> window). > > >>> > > >>> @OnWindowExpiration is (1) a convenience to save you from needing a > > >>> handle > > >>> on the allowed lateness (not a problem in your case) and (2) > > >actually > > >>> meaningful and potentially less expensive to implement in the > > >absence of > > >>> state (this is why it needs a design discussion at all, really). > > >>> > > >>> Caveat: these APIs are new and not supported in every runner and > > >>> windowing > > >>> configuration. > > >>> > > >>> Kenn > > >>> > > >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot > > ><echauc...@gmail.com> > > >>> wrote: > > >>> > > >>>> Hi, > > >>>> > > >>>> I have started to implement this ticket. For now it is implemented > > >as a > > >>>> PTransform that simply does ParDo.of(new DoFn) and all the > > >processing > > >>>> related to batching is done in the DoFn. > > >>>> > > >>>> I'm starting to deal with windows and bundles (starting to take a > > >>>> look at > > >>>> the State API to process trans-bundles, more questions about this > > >to > > >>>> come). > > >>>> My comments/questions are inline: > > >>>> > > >>>> > > >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit : > > >>>> > > >>>>> We should start by understanding the goals. If elements are in > > >>>>> different > > >>>>> windows can they be out in the same batch? If they have different > > >>>>> timestamps what timestamp should the batch have? > > >>>>> > > >>>> Regarding timestamps: currently design is as so: the transform does > > >not > > >>>> group elements in the PCollection, so the "batch" does not exist as > > >an > > >>>> element in the PCollection. There is only a user defined function > > >>>> (perBatchFn) that gets called when batchSize elements have been > > >>>> processed. > > >>>> This function takes an ArrayList as parameter. So elements keep > > >their > > >>>> original timestamps > > >>>> > > >>>> > > >>>> Regarding windowing: I guess that if elements are not in the same > > >>>> window, > > >>>> they are not expected to be in the same batch. > > >>>> I'm just starting to work on these subjects, so I might lack a bit > > >of > > >>>> information; > > >>>> what I am currently thinking about is that I need a way to know in > > >the > > >>>> DoFn that the window has expired so that I can call the perBatchFn > > >>>> even if > > >>>> batchSize is not reached. This is the @OnWindowExpiration callback > > > > > >>>> that > > >>>> Kenneth mentioned in an email about bundles. > > >>>> Lets imagine that we have a collection of elements artificially > > >>>> timestamped every 10 seconds (for simplicity of the example) and a > > >>>> fixed > > >>>> windowing of 1 minute. Then each window contains 6 elements. If we > > >>>> were to > > >>>> buffer the elements by batches of 5 elements, then for each window > > >we > > >>>> expect to get 2 batches (one of 5 elements, one of 1 element). For > > >>>> that to > > >>>> append, we need a @OnWindowExpiration on the DoFn where we call > > >>>> perBatchFn > > >>>> > > >>>> As a composite transform this will likely require a group by key > > >>>> which may > > >>>>> affect performance. Maybe within a dofn is better. > > >>>>> > > >>>> Yes, the processing is done with a DoFn indeed. > > >>>> > > >>>>> Then it could be some annotation or API that informs the runner. > > >>>>> Should > > >>>>> batch sizes be fixed in the annotation (element count or size) or > > >>>>> should > > >>>>> the user have some method that lets them decide when to process a > > >>>>> batch > > >>>>> based on the contents? > > >>>>> > > >>>> For now, the user passes batchSize as an argument to > > >>>> BatchParDo.via() it > > >>>> is a number of elements. But batch based on content might be useful > > > > > >>>> for the > > >>>> user. Give hint to the runner might be more flexible for the > > >runner. > > >>>> Thanks. > > >>>> > > >>>>> Another thing to think about is whether this should be connected > > >to > > >>>>> the > > >>>>> ability to run parts of the bundle in parallel. > > >>>>> > > >>>> Yes! > > >>>> > > >>>>> Maybe each batch is an RPC > > >>>>> and you just want to start an async RPC for each batch. Then in > > >>>>> addition > > >>>>> to > > >>>>> start the final RPC in finishBundle, you also need to wait for all > > >the > > >>>>> RPCs > > >>>>> to complete. > > >>>>> > > >>>> Actually, currently each batch processing is whatever the user > > >wants > > >>>> (perBatchFn user defined function). If the user decides to issue an > > > > > >>>> async > > >>>> RPC in that function (call with the arrayList of input elements), > > >>>> IMHO he > > >>>> is responsible for waiting for the response in that method if he > > >>>> needs the > > >>>> response, but he can also do a send and forget, depending on his > > >use > > >>>> case. > > >>>> > > >>>> Besides, I have also included a perElementFn user function to allow > > >the > > >>>> user to do some processing on the elements before adding them to > > >the > > >>>> batch > > >>>> (example use case: convert a String to a DTO object to invoke an > > >>>> external > > >>>> service) > > >>>> > > >>>> Etienne > > >>>> > > >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com> > > >>>>> wrote: > > >>>>> > > >>>>> Hi JB, > > >>>>> > > >>>>> I meant jira vote but discussion on the ML works also :) > > >>>>> > > >>>>> As I understand the need (see stackoverflow links in jira ticket) > > >the > > >>>>> aim is to avoid the user having to code the batching logic in his > > >own > > >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the > > >>>>> bundles. > > >>>>> For example, possible use case is to batch a call to an external > > >>>>> service > > >>>>> (for performance). > > >>>>> > > >>>>> I was thinking about providing a PTransform that implements the > > >>>>> batching > > >>>>> in its own DoFn and that takes user defined functions for > > >>>>> customization. > > >>>>> > > >>>>> Etienne > > >>>>> > > >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit : > > >>>>> > > >>>>>> Hi > > >>>>>> > > >>>>>> I guess you mean discussion on the mailing list about that, right > > >? > > >>>>>> > > >>>>>> AFAIR the idea is to provide a utility class to deal with > > >>>>>> > > >>>>> pooling/batching. However not sure it's required as with > > >>>>> @StartBundle etc > > >>>>> in DoFn and batching depends of the end user "logic". > > >>>>> > > >>>>>> Regards > > >>>>>> JB > > >>>>>> > > >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne > > >>>>>> Chauchot<echauc...@gmail.com> > > >>>>>> > > >>>>> wrote: > > >>>>> > > >>>>>> Hi all, > > >>>>>>> I have started to work on this ticket > > >>>>>>> https://issues.apache.org/jira/browse/BEAM-135 > > >>>>>>> > > >>>>>>> As there where no vote since March 18th, is the issue still > > >>>>>>> relevant/needed? > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> > > >>>>>>> Etienne > > >>>>>>> > > >> > > >