Sounds like you could solve that using second event time timer, that
would be actually used only to hold the output timestamp (watermark
hold). Something like
eventTimer.withOutputTimestamp(currentBufferMinimalStamp).offset(Duration.ZERO).setRelative()
when the timer fires, you would only reset the minimum.
It is sort of ugly, though. It would be cool to have a way to get the
current timestamp a timer is set to (if any).
Jan
On 5/20/21 3:12 AM, Reuven Lax wrote:
100% - the contract should not change because things are in a bundle.
IIRC there are some open bugs in Beam around this that really should
be fixed.
My issue with GroupIntoBatches is different. This transform works as
follows:
if (this is the first element in batch - checked by reading a count
stored in a ValueState)
timer.offset(bufferingDuration).setRelative()
This makes it tricky to use setTimer.withOutputTimestamp. Inputs are
not guaranteed to be in order, so simply adding a withOutputTimestamp
would set the timestamp to be whatever the first element happened to
be; it really should be the minimum timestamp of all elements in the
buffer. If we started setting the timer on every element, then
timer.offset.setRelative would keep bumping the (processing-time)
timer into the future and it would never expire.
One solution would be to store the timer timestamp in a ValueState,
and use Timer.set to set the timer to an absolute timestamp. This
would allow us to always reset the timer to the same expiration
target, just modifying the output timestamp each time. However, this
will break DirectRunner tests. The DirectRunner allows the user to
control the advancement of processing time when using TestStream, but
this facility doesn't work well if the transform sets the
processing-time timer using absolute set() calls.
I'm not sure how to solve this using the existing Timer API.
On Wed, May 19, 2021 at 4:39 PM Robert Bradshaw <rober...@google.com
<mailto:rober...@google.com>> wrote:
+1. It was my understanding as well that consensus was that timers
must be delivered in timestamp order, and "within bundle"
resetting/clearing of timers should be respected (as if each timer was
in its own bundle).
On Wed, May 19, 2021 at 3:01 PM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
>
> Reading over the other thread, there was consensus to implement.
>
> Reading commentary on the PR, there were good questions raised
about the semantics. Questions which I feel able to have an
opinion about :-)
>
> The questions surrounded bundling and timers in the same
bundling clearing each other. Actually the same questions apply to
timers re-setting later timers and +Jan Lukavský has raised this
already (among other people) so we kind of know the answer now,
and I think +Boyuan Zhang code was good (from my quick read). What
has changed is that we have a better idea of the contract with the
runner. I'm not sure if portability makes this more complex. I
will share all my thoughts on this:
>
> I think one key to the Beam model is that bundles are for
performance and also intended as the unit of commitment (until
FinishBundle is called, there may be unfinished work). They can
affect *behavior* (what the program does - including what can be
observed) ... but not *semantics* (what the output means).
>
> So, for example, bundling affects how many files are written but
you are expected to read all the files and the number or ordering
you must not depend on. The behavior is different, but the
semantics are the same.
>
> When it comes to timers, behavior and semantics are very tightly
coupled; timers are like a self loop. The firing of a timer is a
behavior w.r.t. the output of the stateful ParDo but it is
semantics of the timer itself (my definitions don't actually apply
so simply so don't dig too hard :-). So to get bundling-invariant
semantics, we should try for bundling-invariant behavior. When
some clock moves from T to T+D between bundles then all the timers
in the range [T, T+D) may fire so they are delivered in the
bundle. I believe we have in the prior re-setting issue agreed
that timers should always be called back in timestamp order.
Bundling-invariance then implies that earlier timers can clear
later timers just like they can re-set them. So even though a
timer is delivered in a bundle, the local state of the timer wins.
This matches how state works as well; no matter how things are
bundled, the state you read is always whatever was written last.
>
> Kenn
>
> On Tue, May 11, 2021 at 1:24 PM Siyuan Chen <syc...@google.com
<mailto:syc...@google.com>> wrote:
>>
>>
>>
>> On Tue, May 11, 2021 at 11:08 AM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
>>>
>>>
>>>
>>> On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
>>>>
>>>>
>>>>
>>>> On Mon, May 10, 2021 at 7:40 PM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I've been looking at the implementation of GroupIntoBatches
(hoping to add support to group based on byte size), and I have a
few questions about the current implementation.
>>>>>
>>>>> 1. I noticed that the transform does not preserve input
timestamps. The output timestamp is not explicitly set, so it will
be whatever the default output timestamp. Confusingly this will
depend on which path is taken. If the batch is output while
processing an element, then the timestamp of tha element will be
used. If the batch is output from the timer, then the
processing-time value of the timer will be used.
>>>>
>>>>
>>>> Seems like bugs.
>>>>
>>>>>
>>>>> - Should I start setting outputTimestamp explicitly -
ideally to the minimum timestamp in the current batch of elements?
>>>>
>>>>
>>>> That's a sensible default. Could pass a TimestampCombiner if
that leads to watermark delays. Given the nature of the transform,
the simple approach will probably be fine.
>>>
>>>
>>> This is a bit tricky to do. in order to update the output
timestamp, we need to call setTimer again. However since we are
calling timer.offset().setRelative(), this will keep bumping the
timer into the future and it will never fire.
>>>
>>> One solution would be for GIB to just store the current timer
ts in state, and make sure that keep setting the same target until
the timer fires. However that precludes us from using setRelative
(since that method does not allow the DoFn to see what target time
is being set). I think that this approach won't play nice with
TestStream.advanceProcessingTimeTo.
>>>
>>> We could also add a way to update _just_ the output time for a
timer without resetting the entire timer.
>>>
>>>>
>>>>
>>>>> - Should we have the option to preserve all element
timestamps? this could look something like this:
>>>>>
>>>>> PColllection<KV<K, TimestampedValue<V>>> batches =
>>>>>
input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues());
>>>>
>>>>
>>>> This seems useful.
>>>>
>>>>>
>>>>> 2. flushBatch always resets the timer, even after the batch
is processed. The only reason I can think of for doing so is to
update the watermark hold. TimerInternals contains a deleteTimer
method - is there any reason we shouldn't simply implement
Timer.clear and hook it up to the existing deleteTimer?
>>>>
>>>>
>>>> Makes sense to me. Prior thread on this seemed to have
lightweight consensus:
https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E
<https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E>
>>
>> There was another thread about adding Timer.clear in Java:
https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E
<https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E>
>> and a previous attempt in
https://github.com/apache/beam/pull/12836
<https://github.com/apache/beam/pull/12836>. Boyuan might have
more insights into this +Boyuan Zhang
>>>>
>>>>
>>>>>
>>>>> 3. This transform was implemented before OnWindowExpiration
was implemented. I think we should add a window-expiration
callback, and stop setting the end-of window timer.
>>>>
>>>>
>>>> +1
>>>>
>>>> Kenn