100% - the contract should not change because things are in
a bundle. IIRC there are some open bugs in Beam around this
that really should be fixed.
My issue with GroupIntoBatches is different. This transform
works as follows:
if (this is the first element in batch - checked by reading
a count stored in a ValueState)
timer.offset(bufferingDuration).setRelative()
This makes it tricky to use setTimer.withOutputTimestamp.
Inputs are not guaranteed to be in order, so simply adding a
withOutputTimestamp would set the timestamp to be whatever
the first element happened to be; it really should be the
minimum timestamp of all elements in the buffer. If we
started setting the timer on every element, then
timer.offset.setRelative would keep bumping the
(processing-time) timer into the future and it would never
expire.
One solution would be to store the timer timestamp in a
ValueState, and use Timer.set to set the timer to an
absolute timestamp. This would allow us to always reset the
timer to the same expiration target, just modifying the
output timestamp each time. However, this will break
DirectRunner tests. The DirectRunner allows the user to
control the advancement of processing time when using
TestStream, but this facility doesn't work well if the
transform sets the processing-time timer using absolute
set() calls.
I'm not sure how to solve this using the existing Timer API.
On Wed, May 19, 2021 at 4:39 PM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
+1. It was my understanding as well that consensus was
that timers
must be delivered in timestamp order, and "within bundle"
resetting/clearing of timers should be respected (as if
each timer was
in its own bundle).
On Wed, May 19, 2021 at 3:01 PM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
>
> Reading over the other thread, there was consensus to
implement.
>
> Reading commentary on the PR, there were good
questions raised about the semantics. Questions which I
feel able to have an opinion about :-)
>
> The questions surrounded bundling and timers in the
same bundling clearing each other. Actually the same
questions apply to timers re-setting later timers and
+Jan Lukavský has raised this already (among other
people) so we kind of know the answer now, and I think
+Boyuan Zhang code was good (from my quick read). What
has changed is that we have a better idea of the
contract with the runner. I'm not sure if portability
makes this more complex. I will share all my thoughts on
this:
>
> I think one key to the Beam model is that bundles are
for performance and also intended as the unit of
commitment (until FinishBundle is called, there may be
unfinished work). They can affect *behavior* (what the
program does - including what can be observed) ... but
not *semantics* (what the output means).
>
> So, for example, bundling affects how many files are
written but you are expected to read all the files and
the number or ordering you must not depend on. The
behavior is different, but the semantics are the same.
>
> When it comes to timers, behavior and semantics are
very tightly coupled; timers are like a self loop. The
firing of a timer is a behavior w.r.t. the output of the
stateful ParDo but it is semantics of the timer itself
(my definitions don't actually apply so simply so don't
dig too hard :-). So to get bundling-invariant
semantics, we should try for bundling-invariant
behavior. When some clock moves from T to T+D between
bundles then all the timers in the range [T, T+D) may
fire so they are delivered in the bundle. I believe we
have in the prior re-setting issue agreed that timers
should always be called back in timestamp order.
Bundling-invariance then implies that earlier timers can
clear later timers just like they can re-set them. So
even though a timer is delivered in a bundle, the local
state of the timer wins. This matches how state works as
well; no matter how things are bundled, the state you
read is always whatever was written last.
>
> Kenn
>
> On Tue, May 11, 2021 at 1:24 PM Siyuan Chen
<syc...@google.com <mailto:syc...@google.com>> wrote:
>>
>>
>>
>> On Tue, May 11, 2021 at 11:08 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
>>>
>>>
>>>
>>> On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
>>>>
>>>>
>>>>
>>>> On Mon, May 10, 2021 at 7:40 PM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I've been looking at the implementation of
GroupIntoBatches (hoping to add support to group based
on byte size), and I have a few questions about the
current implementation.
>>>>>
>>>>> 1. I noticed that the transform does not preserve
input timestamps. The output timestamp is not explicitly
set, so it will be whatever the default output
timestamp. Confusingly this will depend on which path is
taken. If the batch is output while processing an
element, then the timestamp of tha element will be used.
If the batch is output from the timer, then the
processing-time value of the timer will be used.
>>>>
>>>>
>>>> Seems like bugs.
>>>>
>>>>>
>>>>> - Should I start setting outputTimestamp
explicitly - ideally to the minimum timestamp in the
current batch of elements?
>>>>
>>>>
>>>> That's a sensible default. Could pass a
TimestampCombiner if that leads to watermark delays.
Given the nature of the transform, the simple approach
will probably be fine.
>>>
>>>
>>> This is a bit tricky to do. in order to update the
output timestamp, we need to call setTimer again.
However since we are calling
timer.offset().setRelative(), this will keep bumping the
timer into the future and it will never fire.
>>>
>>> One solution would be for GIB to just store the
current timer ts in state, and make sure that keep
setting the same target until the timer fires. However
that precludes us from using setRelative (since that
method does not allow the DoFn to see what target time
is being set). I think that this approach won't play
nice with TestStream.advanceProcessingTimeTo.
>>>
>>> We could also add a way to update _just_ the output
time for a timer without resetting the entire timer.
>>>
>>>>
>>>>
>>>>> - Should we have the option to preserve all
element timestamps? this could look something like this:
>>>>>
>>>>> PColllection<KV<K, TimestampedValue<V>>> batches =
>>>>>
input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues());
>>>>
>>>>
>>>> This seems useful.
>>>>
>>>>>
>>>>> 2. flushBatch always resets the timer, even after
the batch is processed. The only reason I can think of
for doing so is to update the watermark hold.
TimerInternals contains a deleteTimer method - is there
any reason we shouldn't simply implement Timer.clear and
hook it up to the existing deleteTimer?
>>>>
>>>>
>>>> Makes sense to me. Prior thread on this seemed to
have lightweight consensus:
https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E
<https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E>
>>
>> There was another thread about adding Timer.clear in
Java:
https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E
<https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E>
>> and a previous attempt in
https://github.com/apache/beam/pull/12836
<https://github.com/apache/beam/pull/12836>. Boyuan
might have more insights into this +Boyuan Zhang
>>>>
>>>>
>>>>>
>>>>> 3. This transform was implemented before
OnWindowExpiration was implemented. I think we should
add a window-expiration callback, and stop setting the
end-of window timer.
>>>>
>>>>
>>>> +1
>>>>
>>>> Kenn