On Fri, Jan 27, 2017 at 6:55 AM, Etienne Chauchot <[email protected]> wrote:
> Hi Robert,
>
> Le 26/01/2017 à 18:17, Robert Bradshaw a écrit :
>>
>> First off, let me say that a *correctly* batching DoFn is a lot of
>> value, especially because it's (too) easy to (often unknowingly)
>> implement it incorrectly.
>
> I definitely agree, I put a similar comment in another email. As an example
> I recall a comment of someone in stackoverflow who said that he would have
> forgotten to flush the batch in finishBundle.
>>
>> My take is that a BatchingParDo should be a PTransform<PCollection<T>,
>> PCollection<O>> that takes a DoFn<? super Iterable<T>, ? extends
>> Iterable<O>> as a parameter, as well as some (optional?) batching
>> criteria (probably batch size and/or batch timeout).
>
> This is how I implemented it plus another perElement function that produces
> an intermediary type to allow the user to use another type than InputType in
> perBatchFn (for ex convert elements to DTO and to call external service in
> perBatchFn using DTOs) or to do any other per-element computation before
> adding elements to the batch.
I think we should omit the perElement as part of this transform as
that can be done immediately prior to this one without any loss of
generality or utility. One can always wrap this composition in a new
PTransform if desired.
> Besides I used SimpleFunctions
>
> SimpleFunction<InputT, IntermediaryT> perElementFn;
> SimpleFunction<ArrayList<IntermediaryT>, ArrayList<OutputT>> perBatchFn;
>
> The input ArrayList in perBatchFn is the buffer of elements.
We should be as general as possible, e.g. SimpleFunction<? super
Iterable<IntermediaryT>, ? extends Iterable<OutputT>>. Again, letting
this be a DoFn rather than SimpleFunction allows for things such as
setup, teardown, side inputs, etc. but forces complicated delegation
so this is probably a fine start.
>> The DoFn should
>> map the set of inputs to a set of outputs of the same size and in the
>> same order as the input (or, possibly, an empty list would be
>> acceptable). Semantically, it should be defined as
>>
>> public expand(PCollection<T> input) {
>> return input
>> .apply(e -> SingletonList.of(e))
>> .apply(parDo(batchDoFn))
>> .apply(es -> Iterables.onlyElement(es));
>> }
>>
>> Getting this correct wrt timestamps and windowing is tricky. However,
>> even something that handles the most trivial case (e.g. GlobalWindows
>> only) and degenerates to batch sizes of 1 for other cases would allow
>> people to start using this code (rather than rolling their own) and we
>> could then continue to refine it.
>
> Yes sure, right now the code handles only the global window case. This is
> the very beginning, I'm still in the simple naive approach (no window and no
> buffering trans-bundle support),
+1. We should assert on construction that the windowing is global.
Even in the global window case, we'll want to avoid mangling element
timestamps.
> I plan to use state API to buffer
> trans-bundle and timer API (as Kenn pointed) to detect the end of the window
> in the DoFn.
Makes sense. It'd be nice if we could figure out a way to do this
across keys (and windows, when the batch computation isn't sensitive
to this of course).
> Thanks for your comments Robert.
Glad to help. Thanks for taking this on.
- Robert