Hi all,
We had a discussion with Kenn yesterday about point 1 bellow, I would
like to note it here on the ML:
Using new method timer.set() instead of timer.setForNowPlus() makes the
timer fire at the right time.
Another thing, regarding point 2: if I inject the window in the @Ontimer
method and print it, I see that at the moment the timer fires (at last
timestamp of the window), the window is the GlobalWindow. I guess that
is because the fixed window has just ended. Maybe the empty bagState
that I get here is due to the end of window (passing to the
GlobalWindow). I mean, as the states are scoped per window, and the
window is different, then another bagState instance gets injected. Hence
the empty bagState
WDYT?
I will open a PR even if this work is not finished yet, that way, we
will have a convenient environment for discussing this code.
Etienne
Le 03/03/2017 à 11:48, Etienne Chauchot a écrit :
Hi all,
@Kenn: I have enhanced my streaming test in
https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in
particular to check that BatchingParDo doesn't mess up windows. It
seems that it actually does :)
The input collection contains 10 elements timestamped at 1s interval
and it is divided into fixed windows of 5s duration (so 2 windows).
startTime is epoch. The timer is used to detect the end of the window
and output the content of the batch (buffer) then.
I added some logs and I noticed two strange things (that might be
linked):
1-The timer is set twice, and it is set correctly
INFOS: ***** SET TIMER ***** Delay of 4999 ms added to timestamp
1970-01-01T00:00:00.000Z set for window
[1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
INFOS: ***** SET TIMER ***** Delay of 4999 ms added to timestamp
1970-01-01T00:00:05.000Z set for window
[1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z)
It correctly fires twice but not at the right timeStamp:
INFOS: ***** END OF WINDOW ***** for timer timestamp
1970-01-01T00:00:04.999Z
=>Correct
INFOS: ***** END OF WINDOW ***** for timer timestamp
1970-01-01T00:00:04.999Z
=> Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z)
Do I need to call timer.cancel() after the timer has fired ? But
timer.cancel() is not supported by DirectRunner yet.
2- in @OnTimer method the injected batch bagState parameter is empty
whereas it was added some elements since last batch.clear() while
processing the same window
INFOS: ***** BATCH ***** clear
INFOS: ***** BATCH ***** Add element for window
[1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
INFOS: ***** BATCH ***** Add element for window
[1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
..
INFOS: ***** END OF WINDOW ***** for timer timestamp
1970-01-01T00:00:04.999Z
INFOS: ***** IN ONTIMER ***** batch size 0
Am I doing something wrong with timers or is there something not
totally finished with them (as you noticed they are quite new)?
WDYT?
Thanks
Etienne
Le 09/02/2017 à 09:55, Etienne Chauchot a écrit :
Hi,
@JB: good to know for the roadmap! thanks
@Kenn: just to be clear: the timer fires fine. What I noticed is that
it seems to be SET more than once because timer.setForNowPlus in
called the @ProcessElement method. I am not 100% sure of it, what I
noticed is that it started to work fine when I ensured to call
timer.setForNowPlus only once. I don't say it's a bug, this is just
not what I understood when I read the javadoc, I understood that it
would be set only once per window, see javadoc bellow:
An implementation of Timer is implicitly scoped - it may be scoped to
a key and window, or a key, window, and trigger, etc.
A timer exists in one of two states: set or unset. A timer can be set
only for a single time per scope.
I use the DirectRunner.
For the key part: ok, makes sense.
Thanks for your comments
I'm leaving on vacation tonight for a little more than two weeks,
I'll resume work then, maybe start a PR when it's ready.
Etienne
Le 08/02/2017 à 19:48, Kenneth Knowles a écrit :
Hi Etienne,
If the timer is firing n times for n elements, that's a bug in the
runner /
shared runner code. It should be deduped. Which runner? Can you file
a JIRA
against me to investigate? I'm still in the process of fleshing out
more
and more RunnableOnService (aka ValidatesRunner) tests so I will
surely add
one (existing tests already OOMed without deduping, so it wasn't at
the top
of my priority list)
If the end user doesn't have a natural key, I would just add one and
remove
it within your transform. Not sure how easy this will be - you might
need
user intervention. Of course, you still do need to shard or you'll be
processing the whole PCollection serially.
Kenn
On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:
Hi
AFAIR the timer per function is in the "roadmap" (remembering
discussion
we had with Kenn).
I will take a deeper look next week on your branch.
Regards
JB
On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot
<echauc...@gmail.com>
wrote:
Hi Kenn,
I have started using state and timer APIs, they seem awesome!
Please take a look at
https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO
It contains a PTransform that does the batching trans-bundles and
respecting the windows (even if tests are not finished yet, see
@Ignore
and TODOs)
I have some questions:
- I use the timer to detect the end of the window like you suggested.
But the timer can only be set in @ProcessElement and @Ontimer.
Javadoc
says that timers are implicitly scoped to a key/window and that a
timer
can be set only for a single time per scope. I noticed that if I call
timer.setForNowPlus in the @ProcessElement method, it seems that the
timer is set n times for n elements. So I just created a state with
boolean to prevent setting the timer more than once per key/window.
=> Would it be good maybe to have a end user way of indicating
that the
timer will be set only once per key/window. Something analogous to
@Setup, to avoid the user having to use a state boolean?
- I understand that state and timers need to be per-key, but if
the end
user does not need a key (lets say he just needs a
PCollection<String>).
Then, do we tell him to use a PCollection<KV> anyway like I wrote in
the
javadoc of BatchingParDo?
WDYT?
Thanks,
Etienne
Le 26/01/2017 à 17:28, Etienne Chauchot a écrit :
Wonderful !
Thanks Kenn !
Etienne
Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
Hi Etienne,
I was drafting a proposal about @OnWindowExpiration when this email
arrived. I thought I would try to quickly unblock you by responding
with a
TL;DR: you can achieve your goals with state & timers as they
currently
exist. You'll set a timer for
window.maxTimestamp().plus(allowedLateness)
precisely - when this timer fires, you are guaranteed that the
input
watermark has exceeded this point (so all new data is droppable)
while the
output timestamp is held to this point (so you can safely output
into
the
window).
@OnWindowExpiration is (1) a convenience to save you from needing a
handle
on the allowed lateness (not a problem in your case) and (2)
actually
meaningful and potentially less expensive to implement in the
absence of
state (this is why it needs a design discussion at all, really).
Caveat: these APIs are new and not supported in every runner and
windowing
configuration.
Kenn
On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot
<echauc...@gmail.com>
wrote:
Hi,
I have started to implement this ticket. For now it is implemented
as a
PTransform that simply does ParDo.of(new DoFn) and all the
processing
related to batching is done in the DoFn.
I'm starting to deal with windows and bundles (starting to take a
look at
the State API to process trans-bundles, more questions about this
to
come).
My comments/questions are inline:
Le 17/01/2017 à 18:41, Ben Chambers a écrit :
We should start by understanding the goals. If elements are in
different
windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?
Regarding timestamps: currently design is as so: the transform
does
not
group elements in the PCollection, so the "batch" does not
exist as
an
element in the PCollection. There is only a user defined function
(perBatchFn) that gets called when batchSize elements have been
processed.
This function takes an ArrayList as parameter. So elements keep
their
original timestamps
Regarding windowing: I guess that if elements are not in the same
window,
they are not expected to be in the same batch.
I'm just starting to work on these subjects, so I might lack a bit
of
information;
what I am currently thinking about is that I need a way to know in
the
DoFn that the window has expired so that I can call the perBatchFn
even if
batchSize is not reached. This is the @OnWindowExpiration
callback
that
Kenneth mentioned in an email about bundles.
Lets imagine that we have a collection of elements artificially
timestamped every 10 seconds (for simplicity of the example) and a
fixed
windowing of 1 minute. Then each window contains 6 elements. If we
were to
buffer the elements by batches of 5 elements, then for each window
we
expect to get 2 batches (one of 5 elements, one of 1 element). For
that to
append, we need a @OnWindowExpiration on the DoFn where we call
perBatchFn
As a composite transform this will likely require a group by key
which may
affect performance. Maybe within a dofn is better.
Yes, the processing is done with a DoFn indeed.
Then it could be some annotation or API that informs the runner.
Should
batch sizes be fixed in the annotation (element count or size) or
should
the user have some method that lets them decide when to process a
batch
based on the contents?
For now, the user passes batchSize as an argument to
BatchParDo.via() it
is a number of elements. But batch based on content might be
useful
for the
user. Give hint to the runner might be more flexible for the
runner.
Thanks.
Another thing to think about is whether this should be connected
to
the
ability to run parts of the bundle in parallel.
Yes!
Maybe each batch is an RPC
and you just want to start an async RPC for each batch. Then in
addition
to
start the final RPC in finishBundle, you also need to wait for
all
the
RPCs
to complete.
Actually, currently each batch processing is whatever the user
wants
(perBatchFn user defined function). If the user decides to
issue an
async
RPC in that function (call with the arrayList of input elements),
IMHO he
is responsible for waiting for the response in that method if he
needs the
response, but he can also do a send and forget, depending on his
use
case.
Besides, I have also included a perElementFn user function to
allow
the
user to do some processing on the elements before adding them to
the
batch
(example use case: convert a String to a DTO object to invoke an
external
service)
Etienne
On Tue, Jan 17, 2017, 8:48 AM Etienne
Chauchot<echauc...@gmail.com>
wrote:
Hi JB,
I meant jira vote but discussion on the ML works also :)
As I understand the need (see stackoverflow links in jira ticket)
the
aim is to avoid the user having to code the batching logic in his
own
DoFn.processElement() and DoFn.finishBundle() regardless of the
bundles.
For example, possible use case is to batch a call to an external
service
(for performance).
I was thinking about providing a PTransform that implements the
batching
in its own DoFn and that takes user defined functions for
customization.
Etienne
Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
Hi
I guess you mean discussion on the mailing list about that,
right
?
AFAIR the idea is to provide a utility class to deal with
pooling/batching. However not sure it's required as with
@StartBundle etc
in DoFn and batching depends of the end user "logic".
Regards
JB
On Jan 17, 2017, 08:26, at 08:26, Etienne
Chauchot<echauc...@gmail.com>
wrote:
Hi all,
I have started to work on this ticket
https://issues.apache.org/jira/browse/BEAM-135
As there where no vote since March 18th, is the issue still
relevant/needed?
Regards,
Etienne