Hi Robert,
Le 26/01/2017 à 18:17, Robert Bradshaw a écrit :
First off, let me say that a *correctly* batching DoFn is a lot of
value, especially because it's (too) easy to (often unknowingly)
implement it incorrectly.
I definitely agree, I put a similar comment in another email. As an
example I recall a comment of someone in stackoverflow who said that he
would have forgotten to flush the batch in finishBundle.
My take is that a BatchingParDo should be a PTransform<PCollection<T>,
PCollection<O>> that takes a DoFn<? super Iterable<T>, ? extends
Iterable<O>> as a parameter, as well as some (optional?) batching
criteria (probably batch size and/or batch timeout).
This is how I implemented it plus another perElement function that
produces an intermediary type to allow the user to use another type than
InputType in perBatchFn (for ex convert elements to DTO and to call
external service in perBatchFn using DTOs) or to do any other
per-element computation before adding elements to the batch.
Besides I used SimpleFunctions
SimpleFunction<InputT, IntermediaryT> perElementFn;
SimpleFunction<ArrayList<IntermediaryT>, ArrayList<OutputT>> perBatchFn;
The input ArrayList in perBatchFn is the buffer of elements.
The DoFn should
map the set of inputs to a set of outputs of the same size and in the
same order as the input (or, possibly, an empty list would be
acceptable). Semantically, it should be defined as
public expand(PCollection<T> input) {
return input
.apply(e -> SingletonList.of(e))
.apply(parDo(batchDoFn))
.apply(es -> Iterables.onlyElement(es));
}
Getting this correct wrt timestamps and windowing is tricky. However,
even something that handles the most trivial case (e.g. GlobalWindows
only) and degenerates to batch sizes of 1 for other cases would allow
people to start using this code (rather than rolling their own) and we
could then continue to refine it.
Yes sure, right now the code handles only the global window case. This
is the very beginning, I'm still in the simple naive approach (no window
and no buffering trans-bundle support), I plan to use state API to
buffer trans-bundle and timer API (as Kenn pointed) to detect the end of
the window in the DoFn.
Thanks for your comments Robert.
More responses inline below.
On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot<echauc...@gmail.com> wrote:
Hi,
I have started to implement this ticket. For now it is implemented as a
PTransform that simply does ParDo.of(new DoFn) and all the processing
related to batching is done in the DoFn.
I'm starting to deal with windows and bundles (starting to take a look at
the State API to process trans-bundles, more questions about this to come).
My comments/questions are inline:
Le 17/01/2017 à 18:41, Ben Chambers a écrit :
We should start by understanding the goals. If elements are in different
windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?
Regarding timestamps: currently design is as so: the transform does not
group elements in the PCollection, so the "batch" does not exist as an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been processed.
This function takes an ArrayList as parameter. So elements keep their
original timestamps
Correct, elements must keep their original timestamps. This is one
reason @OnWindowExpiration is insufficient. The watermark needs to he
held back to the timestamp of the earliest element in the buffer.
Regarding windowing: I guess that if elements are not in the same window,
they are not expected to be in the same batch.
Batching should be possible across windows, as long as the
innerBatchDoFn does not take the Window (or window-dependent side
inputs) as parameters. Note in particular, if there is ever
non-trivial windowing, after a GBK each successive element is almost
certainly in a different window from its predecessor, which would make
emitting after each window change useless.
I'm just starting to work on these subjects, so I might lack a bit of
information;
what I am currently thinking about is that I need a way to know in the DoFn
that the window has expired so that I can call the perBatchFn even if
batchSize is not reached. This is the @OnWindowExpiration callback that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially timestamped
every 10 seconds (for simplicity of the example) and a fixed windowing of 1
minute. Then each window contains 6 elements. If we were to buffer the
elements by batches of 5 elements, then for each window we expect to get 2
batches (one of 5 elements, one of 1 element). For that to append, we need a
@OnWindowExpiration on the DoFn where we call perBatchFn
As a composite transform this will likely require a group by key which may
affect performance. Maybe within a dofn is better.
Yes, the processing is done with a DoFn indeed.
However, without a GBK it is unclear which key state would be stored
with respect to. (On that note, one should be able to batch across
keys, which makes using the state API as is difficult.)
Then it could be some annotation or API that informs the runner. Should
batch sizes be fixed in the annotation (element count or size) or should
the user have some method that lets them decide when to process a batch
based on the contents?
For now, the user passes batchSize as an argument to BatchParDo.via() it is
a number of elements. But batch based on content might be useful for the
user. Give hint to the runner might be more flexible for the runner. Thanks.
We should allow for runners to tune this parameter. We should also
allow for time-based batch expiration.
Another thing to think about is whether this should be connected to the
ability to run parts of the bundle in parallel.
Yes!
This is, in some sense, a "sliding batch" but many of the concerns
(e.g. holding the watermark, outputting with the correct timestamps
and windows) are similar. The semantics of MutliThreadedParDo.of(doFn)
should be identical to ParDo.of(doFn).
As with batching, there's a question of whether this should be
implemented as a PTransform<Pc<T>, Pc<O>> or a DoFn<T, O>.
Maybe each batch is an RPC
and you just want to start an async RPC for each batch. Then in addition
to
start the final RPC in finishBundle, you also need to wait for all the
RPCs
to complete.
Actually, currently each batch processing is whatever the user wants
(perBatchFn user defined function). If the user decides to issue an async
RPC in that function (call with the arrayList of input elements), IMHO he is
responsible for waiting for the response in that method if he needs the
response, but he can also do a send and forget, depending on his use case.
Besides, I have also included a perElementFn user function to allow the user
to do some processing on the elements before adding them to the batch
(example use case: convert a String to a DTO object to invoke an external
service)
I think a perElementFn belongs as a ParDo that proceeds this
PTransform, rather than as part of it.