It makes sense with the PTransform, less invasive, but the user has to define to define two functions (one perElement, the other perBatch). I like the DoFn approach with annotations, and I would do the same for the batch.

The "trigger/window" is a key part as well, as even if the batch is not complete, the perBatch Fn might be called.

Regards
JB

On 01/27/2017 03:55 PM, Etienne Chauchot wrote:
Hi Robert,

Le 26/01/2017 à 18:17, Robert Bradshaw a écrit :
First off, let me say that a *correctly* batching DoFn is a lot of
value, especially because it's (too) easy to (often unknowingly)
implement it incorrectly.
I definitely agree, I put a similar comment in another email. As an
example I recall a comment of someone in stackoverflow who said that he
would have forgotten to flush the batch in finishBundle.
My take is that a BatchingParDo should be a PTransform<PCollection<T>,
PCollection<O>> that takes a DoFn<? super Iterable<T>, ? extends
Iterable<O>> as a parameter, as well as some (optional?) batching
criteria (probably batch size and/or batch timeout).
This is how I implemented it plus another perElement function that
produces an intermediary type to allow the user to use another type than
InputType in perBatchFn (for ex convert elements to DTO  and to call
external service in perBatchFn using DTOs) or to do any other
per-element computation before adding elements to the batch.

Besides I used SimpleFunctions

SimpleFunction<InputT, IntermediaryT> perElementFn;
SimpleFunction<ArrayList<IntermediaryT>, ArrayList<OutputT>> perBatchFn;

The input ArrayList in perBatchFn is the buffer of elements.
The DoFn should
map the set of inputs to a set of outputs of the same size and in the
same order as the input (or, possibly, an empty list would be
acceptable). Semantically, it should be defined as

public expand(PCollection<T> input) {
   return input
     .apply(e -> SingletonList.of(e))
     .apply(parDo(batchDoFn))
     .apply(es -> Iterables.onlyElement(es));
}

Getting this correct wrt timestamps and windowing is tricky. However,
even something that handles the most trivial case (e.g. GlobalWindows
only) and degenerates to batch sizes of 1 for other cases would allow
people to start using this code (rather than rolling their own) and we
could then continue to refine it.
Yes sure, right now the code handles only the global window case. This
is the very beginning, I'm still in the simple naive approach (no window
and no buffering trans-bundle support), I plan to use state API to
buffer trans-bundle and timer API (as Kenn pointed) to detect the end of
the window in the DoFn.

Thanks for your comments Robert.
More responses inline below.

On Thu, Jan 26, 2017 at 1:48 AM, Etienne
Chauchot<echauc...@gmail.com>  wrote:
Hi,

I have started to implement this ticket. For now it is implemented as a
PTransform that simply does ParDo.of(new DoFn) and all the processing
related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a
look at
the State API to process trans-bundles, more questions about this to
come).
My comments/questions are inline:

Le 17/01/2017 à 18:41, Ben Chambers a écrit :
We should start by understanding the goals. If elements are in
different
windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?
Regarding timestamps: currently design is as so: the transform does not
group elements in the PCollection, so the "batch" does not exist as an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been
processed.
This function takes an ArrayList as parameter. So elements keep their
original timestamps
Correct, elements must keep their original timestamps. This is one
reason @OnWindowExpiration is insufficient. The watermark needs to he
held back to the timestamp of the earliest element in the buffer.

Regarding windowing: I guess that if elements are not in the same
window,
they are not expected to be in the same batch.
Batching should be possible across windows, as long as the
innerBatchDoFn does not take the Window (or window-dependent side
inputs) as parameters. Note in particular, if there is ever
non-trivial windowing, after a GBK each successive element is almost
certainly in a different window from its predecessor, which would make
emitting after each window change useless.

I'm just starting to work on these subjects, so I might lack a bit of
information;
what I am currently thinking about is that I need a way to know in
the DoFn
that the window has expired so that I can call the perBatchFn even if
batchSize is not reached.  This is the @OnWindowExpiration callback that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially
timestamped
every 10 seconds (for simplicity of the example) and a fixed
windowing of 1
minute. Then each window contains 6 elements. If we were to buffer the
elements by batches of 5 elements, then for each window we expect to
get 2
batches (one of 5 elements, one of 1 element). For that to append, we
need a
@OnWindowExpiration on the DoFn where we call perBatchFn

As a composite transform this will likely require a group by key
which may
affect performance. Maybe within a dofn is better.
Yes, the processing is done with a DoFn indeed.
However, without a GBK it is unclear which key state would be stored
with respect to. (On that note, one should be able to batch across
keys, which makes using the state API as is difficult.)

Then it could be some annotation or API that informs the runner. Should
batch sizes be fixed in the annotation (element count or size) or
should
the user have some method that lets them decide when to process a batch
based on the contents?
For now, the user passes batchSize as an argument to BatchParDo.via()
it is
a number of elements. But batch based on content might be useful for the
user. Give hint to the runner might be more flexible for the runner.
Thanks.
We should allow for runners to tune this parameter. We should also
allow for time-based batch expiration.

Another thing to think about is whether this should be connected to the
ability to run parts of the bundle in parallel.
Yes!
This is, in some sense, a "sliding batch" but many of the concerns
(e.g. holding the watermark, outputting with the correct timestamps
and windows) are similar. The semantics of MutliThreadedParDo.of(doFn)
should be identical to ParDo.of(doFn).

As with batching, there's a question of whether this should be
implemented as a PTransform<Pc<T>, Pc<O>> or a DoFn<T, O>.

Maybe each batch is an RPC
and you just want to start an async RPC for each batch. Then in
addition
to
start the final RPC in finishBundle, you also need to wait for all the
RPCs
to complete.
Actually, currently each batch processing is whatever the user wants
(perBatchFn user defined function). If the user decides to issue an
async
RPC in that function (call with the arrayList of input elements),
IMHO he is
responsible for waiting for the response in that method if he needs the
response, but he can also do a send and forget, depending on his use
case.

Besides, I have also included a perElementFn user function to allow
the user
to do some processing on the elements before adding them to the batch
(example use case: convert a String to a DTO object to invoke an
external
service)
I think a perElementFn belongs as a ParDo that proceeds this
PTransform, rather than as part of it.



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to