Hi,

Indeed, I did not want to be invasive in DoFn either. So I chose to implement it as a PTransform.

Please be aware that it is just the very beginning, it is still in the simple naive approach (no window and no buffering trans-bundle support), I plan to use state API to buffer trans-bundle and timer API (as Kenn pointed) to detect the end of the window in the DoFn.

Here is the branch, it is too early to show it but it could provide a base for discussions as Eugene said.

https://github.com/echauchot/beam/commits/BEAM-135-BATCHING-PARDO

take a look at BatchingParDo and BatchingParDoTest.

You will find a user snipet as pseudo code in the javadoc of BatchingParDo.

There is also client code in the test but it is not close to a use case, It just allows to test the inner DoFn.

Thanks guys.

Etienne


Le 27/01/2017 à 00:00, Robert Bradshaw a écrit :
On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
<kirpic...@google.com.invalid> wrote:
I don't think we should make batching a core feature of the Beam
programming model (by adding it to DoFn as this code snippet implies). I'm
reasonably sure there are less invasive ways of implementing it.
+1, either as a PTransform<Pc<T>, Pc<O>> or a DoFn<T, O> that
wraps/delegates to a DoFn<Iterable<T>, Iterable<O>>.

On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

Agree, I'm curious as well.

I guess it would be something like:

.apply(ParDo(new DoFn() {

     @Override
     public long batchSize() {
       return 1000;
     }

     @ProcessElement
     public void processElement(ProcessContext context) {
       ...
     }
}));

If batchSize (overrided by user) returns a positive long, then DoFn can
batch with this size.

Regards
JB

On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
Hi Etienne,

Could you post some snippets of how your transform is to be used in a
pipeline? I think that would make it easier to discuss on this thread and
could save a lot of churn if the discussion ends up leading to a
different
API.

On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot <echauc...@gmail.com>
wrote:

Wonderful !

Thanks Kenn !

Etienne


Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
Hi Etienne,

I was drafting a proposal about @OnWindowExpiration when this email
arrived. I thought I would try to quickly unblock you by responding
with
a
TL;DR: you can achieve your goals with state & timers as they currently
exist. You'll set a timer for
window.maxTimestamp().plus(allowedLateness)
precisely - when this timer fires, you are guaranteed that the input
watermark has exceeded this point (so all new data is droppable) while
the
output timestamp is held to this point (so you can safely output into
the
window).

@OnWindowExpiration is (1) a convenience to save you from needing a
handle
on the allowed lateness (not a problem in your case) and (2) actually
meaningful and potentially less expensive to implement in the absence
of
state (this is why it needs a design discussion at all, really).

Caveat: these APIs are new and not supported in every runner and
windowing
configuration.

Kenn

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <echauc...@gmail.com
wrote:

Hi,

I have started to implement this ticket. For now it is implemented as
a
PTransform that simply does ParDo.of(new DoFn) and all the processing
related to batching is done in the DoFn.

I'm starting to deal with windows and bundles (starting to take a look
at
the State API to process trans-bundles, more questions about this to
come).
My comments/questions are inline:


Le 17/01/2017 à 18:41, Ben Chambers a écrit :

We should start by understanding the goals. If elements are in
different
windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?

Regarding timestamps: currently design is as so: the transform does
not
group elements in the PCollection, so the "batch" does not exist as an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been
processed.
This function takes an ArrayList as parameter. So elements keep their
original timestamps


Regarding windowing: I guess that if elements are not in the same
window,
they are not expected to be in the same batch.
I'm just starting to work on these subjects, so I might lack a bit of
information;
what I am currently thinking about is that I need a way to know in the
DoFn that the window has expired so that I can call the perBatchFn
even
if
batchSize is not reached.  This is the @OnWindowExpiration callback
that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially
timestamped every 10 seconds (for simplicity of the example) and a
fixed
windowing of 1 minute. Then each window contains 6 elements. If we
were
to
buffer the elements by batches of 5 elements, then for each window we
expect to get 2 batches (one of 5 elements, one of 1 element). For
that
to
append, we need a @OnWindowExpiration on the DoFn where we call
perBatchFn
As a composite transform this will likely require a group by key which
may
affect performance. Maybe within a dofn is better.

Yes, the processing is done with a DoFn indeed.

Then it could be some annotation or API that informs the runner.
Should
batch sizes be fixed in the annotation (element count or size) or
should
the user have some method that lets them decide when to process a
batch
based on the contents?

For now, the user passes batchSize as an argument to BatchParDo.via()
it
is a number of elements. But batch based on content might be useful
for
the
user. Give hint to the runner might be more flexible for the runner.
Thanks.
Another thing to think about is whether this should be connected to
the
ability to run parts of the bundle in parallel.

Yes!

Maybe each batch is an RPC
and you just want to start an async RPC for each batch. Then in
addition
to
start the final RPC in finishBundle, you also need to wait for all
the
RPCs
to complete.

Actually, currently each batch processing is whatever the user wants
(perBatchFn user defined function). If the user decides to issue an
async
RPC in that function (call with the arrayList of input elements), IMHO
he
is responsible for waiting for the response in that method if he needs
the
response, but he can also do a send and forget, depending on his use
case.
Besides, I have also included a perElementFn user function to allow
the
user to do some processing on the elements before adding them to the
batch
(example use case: convert a String to a DTO object to invoke an
external
service)

Etienne

On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com>
wrote:

Hi JB,

I meant jira vote but discussion on the ML works also :)

As I understand the need (see stackoverflow links in jira ticket) the
aim is to avoid the user having to code the batching logic in his own
DoFn.processElement() and DoFn.finishBundle() regardless of the
bundles.
For example, possible use case is to batch a call to an external
service
(for performance).

I was thinking about providing a PTransform that implements the
batching
in its own DoFn and that takes user defined functions for
customization.
Etienne

Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :

Hi

I guess you mean discussion on the mailing list about that, right ?

AFAIR the ide⁣a is to provide a utility class to deal with

pooling/batching. However not sure it's required as with @StartBundle
etc
in DoFn and batching depends of the end user "logic".

Regards
JB

On Jan 17, 2017, 08:26, at 08:26, Etienne Chauchot<
echauc...@gmail.com>
wrote:

Hi all,
I have started to work on this ticket
https://issues.apache.org/jira/browse/BEAM-135

As there where no vote since March 18th, is the issue still
relevant/needed?

Regards,

Etienne


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Reply via email to