100% - the contract should not change because things are in a
bundle. IIRC there are some open bugs in Beam around this that
really should be fixed.
My issue with GroupIntoBatches is different. This transform works
as follows:
if (this is the first element in batch - checked by reading a
count stored in a ValueState)
timer.offset(bufferingDuration).setRelative()
This makes it tricky to use setTimer.withOutputTimestamp. Inputs
are not guaranteed to be in order, so simply adding a
withOutputTimestamp would set the timestamp to be whatever the
first element happened to be; it really should be the minimum
timestamp of all elements in the buffer. If we started setting
the timer on every element, then timer.offset.setRelative would
keep bumping the (processing-time) timer into the future and it
would never expire.
One solution would be to store the timer timestamp in a
ValueState, and use Timer.set to set the timer to an absolute
timestamp. This would allow us to always reset the timer to the
same expiration target, just modifying the output timestamp each
time. However, this will break DirectRunner tests. The
DirectRunner allows the user to control the advancement of
processing time when using TestStream, but this facility doesn't
work well if the transform sets the processing-time timer using
absolute set() calls.
I'm not sure how to solve this using the existing Timer API.
On Wed, May 19, 2021 at 4:39 PM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
+1. It was my understanding as well that consensus was that
timers
must be delivered in timestamp order, and "within bundle"
resetting/clearing of timers should be respected (as if each
timer was
in its own bundle).
On Wed, May 19, 2021 at 3:01 PM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
>
> Reading over the other thread, there was consensus to
implement.
>
> Reading commentary on the PR, there were good questions
raised about the semantics. Questions which I feel able to
have an opinion about :-)
>
> The questions surrounded bundling and timers in the same
bundling clearing each other. Actually the same questions
apply to timers re-setting later timers and +Jan Lukavský has
raised this already (among other people) so we kind of know
the answer now, and I think +Boyuan Zhang code was good (from
my quick read). What has changed is that we have a better
idea of the contract with the runner. I'm not sure if
portability makes this more complex. I will share all my
thoughts on this:
>
> I think one key to the Beam model is that bundles are for
performance and also intended as the unit of commitment
(until FinishBundle is called, there may be unfinished work).
They can affect *behavior* (what the program does - including
what can be observed) ... but not *semantics* (what the
output means).
>
> So, for example, bundling affects how many files are
written but you are expected to read all the files and the
number or ordering you must not depend on. The behavior is
different, but the semantics are the same.
>
> When it comes to timers, behavior and semantics are very
tightly coupled; timers are like a self loop. The firing of a
timer is a behavior w.r.t. the output of the stateful ParDo
but it is semantics of the timer itself (my definitions don't
actually apply so simply so don't dig too hard :-). So to get
bundling-invariant semantics, we should try for
bundling-invariant behavior. When some clock moves from T to
T+D between bundles then all the timers in the range [T, T+D)
may fire so they are delivered in the bundle. I believe we
have in the prior re-setting issue agreed that timers should
always be called back in timestamp order. Bundling-invariance
then implies that earlier timers can clear later timers just
like they can re-set them. So even though a timer is
delivered in a bundle, the local state of the timer wins.
This matches how state works as well; no matter how things
are bundled, the state you read is always whatever was
written last.
>
> Kenn
>
> On Tue, May 11, 2021 at 1:24 PM Siyuan Chen
<syc...@google.com <mailto:syc...@google.com>> wrote:
>>
>>
>>
>> On Tue, May 11, 2021 at 11:08 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
>>>
>>>
>>>
>>> On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
>>>>
>>>>
>>>>
>>>> On Mon, May 10, 2021 at 7:40 PM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I've been looking at the implementation of
GroupIntoBatches (hoping to add support to group based on
byte size), and I have a few questions about the current
implementation.
>>>>>
>>>>> 1. I noticed that the transform does not preserve input
timestamps. The output timestamp is not explicitly set, so it
will be whatever the default output timestamp. Confusingly
this will depend on which path is taken. If the batch is
output while processing an element, then the timestamp of tha
element will be used. If the batch is output from the timer,
then the processing-time value of the timer will be used.
>>>>
>>>>
>>>> Seems like bugs.
>>>>
>>>>>
>>>>> - Should I start setting outputTimestamp
explicitly - ideally to the minimum timestamp in the current
batch of elements?
>>>>
>>>>
>>>> That's a sensible default. Could pass a
TimestampCombiner if that leads to watermark delays. Given
the nature of the transform, the simple approach will
probably be fine.
>>>
>>>
>>> This is a bit tricky to do. in order to update the output
timestamp, we need to call setTimer again. However since we
are calling timer.offset().setRelative(), this will keep
bumping the timer into the future and it will never fire.
>>>
>>> One solution would be for GIB to just store the current
timer ts in state, and make sure that keep setting the same
target until the timer fires. However that precludes us from
using setRelative (since that method does not allow the DoFn
to see what target time is being set). I think that this
approach won't play nice with TestStream.advanceProcessingTimeTo.
>>>
>>> We could also add a way to update _just_ the output time
for a timer without resetting the entire timer.
>>>
>>>>
>>>>
>>>>> - Should we have the option to preserve all
element timestamps? this could look something like this:
>>>>>
>>>>> PColllection<KV<K, TimestampedValue<V>>> batches =
>>>>>
input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues());
>>>>
>>>>
>>>> This seems useful.
>>>>
>>>>>
>>>>> 2. flushBatch always resets the timer, even after the
batch is processed. The only reason I can think of for doing
so is to update the watermark hold. TimerInternals contains a
deleteTimer method - is there any reason we shouldn't simply
implement Timer.clear and hook it up to the existing deleteTimer?
>>>>
>>>>
>>>> Makes sense to me. Prior thread on this seemed to have
lightweight consensus:
https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E
<https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E>
>>
>> There was another thread about adding Timer.clear in Java:
https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E
<https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E>
>> and a previous attempt in
https://github.com/apache/beam/pull/12836
<https://github.com/apache/beam/pull/12836>. Boyuan might
have more insights into this +Boyuan Zhang
>>>>
>>>>
>>>>>
>>>>> 3. This transform was implemented before
OnWindowExpiration was implemented. I think we should add a
window-expiration callback, and stop setting the end-of
window timer.
>>>>
>>>>
>>>> +1
>>>>
>>>> Kenn