Hi Etienne,

Could you post some snippets of how your transform is to be used in a
pipeline? I think that would make it easier to discuss on this thread and
could save a lot of churn if the discussion ends up leading to a different
API.

On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot <echauc...@gmail.com>
wrote:

> Wonderful !
>
> Thanks Kenn !
>
> Etienne
>
>
> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> > Hi Etienne,
> >
> > I was drafting a proposal about @OnWindowExpiration when this email
> > arrived. I thought I would try to quickly unblock you by responding with
> a
> > TL;DR: you can achieve your goals with state & timers as they currently
> > exist. You'll set a timer for window.maxTimestamp().plus(allowedLateness)
> > precisely - when this timer fires, you are guaranteed that the input
> > watermark has exceeded this point (so all new data is droppable) while
> the
> > output timestamp is held to this point (so you can safely output into the
> > window).
> >
> > @OnWindowExpiration is (1) a convenience to save you from needing a
> handle
> > on the allowed lateness (not a problem in your case) and (2) actually
> > meaningful and potentially less expensive to implement in the absence of
> > state (this is why it needs a design discussion at all, really).
> >
> > Caveat: these APIs are new and not supported in every runner and
> windowing
> > configuration.
> >
> > Kenn
> >
> > On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <echauc...@gmail.com>
> > wrote:
> >
> >> Hi,
> >>
> >> I have started to implement this ticket. For now it is implemented as a
> >> PTransform that simply does ParDo.of(new DoFn) and all the processing
> >> related to batching is done in the DoFn.
> >>
> >> I'm starting to deal with windows and bundles (starting to take a look
> at
> >> the State API to process trans-bundles, more questions about this to
> come).
> >> My comments/questions are inline:
> >>
> >>
> >> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
> >>
> >>> We should start by understanding the goals. If elements are in
> different
> >>> windows can they be out in the same batch? If they have different
> >>> timestamps what timestamp should the batch have?
> >>>
> >> Regarding timestamps: currently design is as so: the transform does not
> >> group elements in the PCollection, so the "batch" does not exist as an
> >> element in the PCollection. There is only a user defined function
> >> (perBatchFn) that gets called when batchSize elements have been
> processed.
> >> This function takes an ArrayList as parameter. So elements keep their
> >> original timestamps
> >>
> >>
> >> Regarding windowing: I guess that if elements are not in the same
> window,
> >> they are not expected to be in the same batch.
> >> I'm just starting to work on these subjects, so I might lack a bit of
> >> information;
> >> what I am currently thinking about is that I need a way to know in the
> >> DoFn that the window has expired so that I can call the perBatchFn even
> if
> >> batchSize is not reached.  This is the @OnWindowExpiration callback that
> >> Kenneth mentioned in an email about bundles.
> >> Lets imagine that we have a collection of elements artificially
> >> timestamped every 10 seconds (for simplicity of the example) and a fixed
> >> windowing of 1 minute. Then each window contains 6 elements. If we were
> to
> >> buffer the elements by batches of 5 elements, then for each window we
> >> expect to get 2 batches (one of 5 elements, one of 1 element). For that
> to
> >> append, we need a @OnWindowExpiration on the DoFn where we call
> perBatchFn
> >>
> >> As a composite transform this will likely require a group by key which
> may
> >>> affect performance. Maybe within a dofn is better.
> >>>
> >> Yes, the processing is done with a DoFn indeed.
> >>
> >>> Then it could be some annotation or API that informs the runner. Should
> >>> batch sizes be fixed in the annotation (element count or size) or
> should
> >>> the user have some method that lets them decide when to process a batch
> >>> based on the contents?
> >>>
> >> For now, the user passes batchSize as an argument to BatchParDo.via() it
> >> is a number of elements. But batch based on content might be useful for
> the
> >> user. Give hint to the runner might be more flexible for the runner.
> Thanks.
> >>
> >>> Another thing to think about is whether this should be connected to the
> >>> ability to run parts of the bundle in parallel.
> >>>
> >> Yes!
> >>
> >>> Maybe each batch is an RPC
> >>> and you just want to start an async RPC for each batch. Then in
> addition
> >>> to
> >>> start the final RPC in finishBundle, you also need to wait for all the
> >>> RPCs
> >>> to complete.
> >>>
> >> Actually, currently each batch processing is whatever the user wants
> >> (perBatchFn user defined function). If the user decides to issue an
> async
> >> RPC in that function (call with the arrayList of input elements), IMHO
> he
> >> is responsible for waiting for the response in that method if he needs
> the
> >> response, but he can also do a send and forget, depending on his use
> case.
> >>
> >> Besides, I have also included a perElementFn user function to allow the
> >> user to do some processing on the elements before adding them to the
> batch
> >> (example use case: convert a String to a DTO object to invoke an
> external
> >> service)
> >>
> >> Etienne
> >>
> >> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com>
> >>> wrote:
> >>>
> >>> Hi JB,
> >>>
> >>> I meant jira vote but discussion on the ML works also :)
> >>>
> >>> As I understand the need (see stackoverflow links in jira ticket) the
> >>> aim is to avoid the user having to code the batching logic in his own
> >>> DoFn.processElement() and DoFn.finishBundle() regardless of the
> bundles.
> >>> For example, possible use case is to batch a call to an external
> service
> >>> (for performance).
> >>>
> >>> I was thinking about providing a PTransform that implements the
> batching
> >>> in its own DoFn and that takes user defined functions for
> customization.
> >>>
> >>> Etienne
> >>>
> >>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
> >>>
> >>>> Hi
> >>>>
> >>>> I guess you mean discussion on the mailing list about that, right ?
> >>>>
> >>>> AFAIR the ide⁣​a is to provide a utility class to deal with
> >>>>
> >>> pooling/batching. However not sure it's required as with @StartBundle
> etc
> >>> in DoFn and batching depends of the end user "logic".
> >>>
> >>>> Regards
> >>>> JB
> >>>>
> >>>> On Jan 17, 2017, 08:26, at 08:26, Etienne Chauchot<
> echauc...@gmail.com>
> >>>>
> >>> wrote:
> >>>
> >>>> Hi all,
> >>>>> I have started to work on this ticket
> >>>>> https://issues.apache.org/jira/browse/BEAM-135
> >>>>>
> >>>>> As there where no vote since March 18th, is the issue still
> >>>>> relevant/needed?
> >>>>>
> >>>>> Regards,
> >>>>>
> >>>>> Etienne
> >>>>>
>
>

Reply via email to