Hi,
Le 27/01/2017 à 19:44, Robert Bradshaw a écrit :
On Fri, Jan 27, 2017 at 6:55 AM, Etienne Chauchot<echauc...@gmail.com> wrote:
Hi Robert,
Le 26/01/2017 à 18:17, Robert Bradshaw a écrit :
First off, let me say that a *correctly* batching DoFn is a lot of
value, especially because it's (too) easy to (often unknowingly)
implement it incorrectly.
I definitely agree, I put a similar comment in another email. As an example
I recall a comment of someone in stackoverflow who said that he would have
forgotten to flush the batch in finishBundle.
My take is that a BatchingParDo should be a PTransform<PCollection<T>,
PCollection<O>> that takes a DoFn<? super Iterable<T>, ? extends
Iterable<O>> as a parameter, as well as some (optional?) batching
criteria (probably batch size and/or batch timeout).
This is how I implemented it plus another perElement function that produces
an intermediary type to allow the user to use another type than InputType in
perBatchFn (for ex convert elements to DTO and to call external service in
perBatchFn using DTOs) or to do any other per-element computation before
adding elements to the batch.
I think we should omit the perElement as part of this transform as
that can be done immediately prior to this one without any loss of
generality or utility. One can always wrap this composition in a new
PTransform if desired.
You're right, it is simpler to let the user do it as a pipeline step,
I'll remove the perElementFn.
Besides I used SimpleFunctions
SimpleFunction<InputT, IntermediaryT> perElementFn;
SimpleFunction<ArrayList<IntermediaryT>, ArrayList<OutputT>> perBatchFn;
The input ArrayList in perBatchFn is the buffer of elements.
We should be as general as possible, e.g. SimpleFunction<? super
Iterable<IntermediaryT>, ? extends Iterable<OutputT>>.
Yes sure, I've updated it.
Again, letting
this be a DoFn rather than SimpleFunction allows for things such as
setup, teardown, side inputs, etc. but forces complicated delegation
so this is probably a fine start.
Yes, actually, I hesitated, I have opted for the simpler as a start :)
I guess, as the list of possible use cases grow, we might change to DoFn
to leverage its possibilities.
The DoFn should
map the set of inputs to a set of outputs of the same size and in the
same order as the input (or, possibly, an empty list would be
acceptable). Semantically, it should be defined as
public expand(PCollection<T> input) {
return input
.apply(e -> SingletonList.of(e))
.apply(parDo(batchDoFn))
.apply(es -> Iterables.onlyElement(es));
}
Getting this correct wrt timestamps and windowing is tricky. However,
even something that handles the most trivial case (e.g. GlobalWindows
only) and degenerates to batch sizes of 1 for other cases would allow
people to start using this code (rather than rolling their own) and we
could then continue to refine it.
Yes sure, right now the code handles only the global window case. This is
the very beginning, I'm still in the simple naive approach (no window and no
buffering trans-bundle support),
+1. We should assert on construction that the windowing is global.
Even in the global window case, we'll want to avoid mangling element
timestamps.
I plan to use state API to buffer
trans-bundle and timer API (as Kenn pointed) to detect the end of the window
in the DoFn.
Makes sense. It'd be nice if we could figure out a way to do this
across keys (and windows, when the batch computation isn't sensitive
to this of course).
Thanks for your comments Robert.
Glad to help. Thanks for taking this on.
- Robert
Thanks for your comments
Etienne