I sense this discussion might be (remotely) related to [1] (and
especially [2]). The common ground here is that we need a sound
definition of window. I think people might be currently having different
definitions, which leads to this sort of misunderstandings. The
definition should be created in terms of stateful dofn (not GBK, which
might probably be the case today), because that is the most low level
transform, all the others are being built upon it. Looking at this with
this optics, it seems that window actually scopes state of stateful
dofn. The scope can be:
(a) one sided (having only defined max timestamp)
(b) both sided (having minimum and maximum)
We have currently approach (a), which results in ability to move
timestamp *arbitrarily far to the past*, which moving timestamp to
future is limited by window's maxTimestamp. If we extend this to (b),
then windowFn starts to create something like universe (actually
multiverse, because it can return multiple windows). It should be
invalid for element to escape its universe, that would be counter
intuitive. If we disallow emission of data elements that are _late even
when created_ (i.e. are emitted with timestamp less than output
watermark) and we disallow setting timers with timestamp higher than
window.maxTimestamp (which we currently do), then we have disallowed any
element to escape its window (universe, range of validity). It would
also require the output watermark of stateful dofn to be keyed and set
to at least window.minTimestamp when window is opened. This would remove
a sort of asymmetry (why to know maxTimestamp and not minTimestamp?).
Also note that (a) is equal to (b) if and only if we disallow shifting
time to past.
Jan
[1]
https://lists.apache.org/thread.html/c37dfb6c545fba7d794a13c507dccebb654bbd8b317dab748a6775dc%40%3Cdev.beam.apache.org%3E
[2]
https://lists.apache.org/thread.html/r7f38860557d6571869e8e0989275f6ed610cf8c99b2f56fc6418a1d1%40%3Cdev.beam.apache.org%3E
On 1/21/20 10:08 PM, Ankur Goenka wrote:
On Thu, Jan 16, 2020 at 9:52 PM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
On Thu, Jan 16, 2020 at 11:38 AM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
>
> IIRC in Java it is forbidden to output an element with a
timestamp outside its current window.
I don't think this is checked anywhere. (Not sure how you
would check
it, as there's not generic window containment function--I
suppose you
could check if it's past the end of the window (and of course skew
limits how far you can go back). I suppose you could try
re-windowing
and then fail if it didn't agree with what was already there.
I think you are right. This is governed by how a runner invoked
utilities from runners-core (output ultimately reaches this point
without validation:
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L258)
> An exception is outputs from @FinishBundle, where the output
timestamp is required and the window is applied. TBH it seems
more of an artifact of a mismatch between the pre-windowing
and post-windowing worlds.
Elements are always in some window, even if just the global
window.
I mean that the existence of a window-unaware @FinishBundle method
is an artifact of the method existing prior to windowing as a
concept. The idea that a user can use a DoFn's local variables to
buffer stuff and then output in @FinishBundle predates the
existence of windowing.
> Most of the time, mixing processing across windows is simply
wrong. But there are fears that calling @FinishBundle once per
window would be a performance problem. On the other hand,
don't most correct implementations have to separate processing
for each window anyhow?
Processing needs to be done per window iff the result depends
on the
window or if there are side effects.
> Anyhow I think the Java behavior is better, so window
assignment happens exactly and only at window transforms.
But then one ends up with timestamps that are unrelated to the
windows, right?
As far as the model goes, I think windows provide an upper bound
but not a lower bound. If we take the approach that windows are a
"secondary key with a max timestamp" then the timestamps should be
related to the window in the sense that they are <= the window's
max timestamp.
A window only makes sense when a trigger or timer is fired. And the
timestamp of the elements in the window should be within the window's
time range when a trigger is set. For consistency, I think element
timestamp should remain within the corresponding time range at every
stage of the graph.
IIUC based on the discussion, users can violate this requirement
easily in the pipeline code which might give inconsistent behavior
across runners.
I think we should stick to a consistent behavior across languages and
runners. We have multiple options here like
1. Don't have any promised correlation between element timestamp and
window. Window will just behave like a secondary key for the element.
2. Making it explicit that the last window function can be applied out
of order anytime on the elements.
3. Not letting users change the timestamp without applying a windowing
function after the changed timestamp and before a trigger. Though,
this can only be validated at the runtime in python.
4. Revalidating the window after changing the timestamp. Also provide
additional methods to explicitly change the timestamp and window in
oneshot.
5. etc....
Kenn
> Kenn
>
> On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka
<goe...@google.com <mailto:goe...@google.com>> wrote:
>>
>> The case where a plan vanilla value or a windowed value is
emitted seems as expected as the user intent is honored
without any surprises.
>>
>> If I understand correctly in the case when timestamp is
changed then applying window function again can have
unintended behavior in following cases
>> * Custom windows: User code can be executed in unintended
order.
>> * User emit a windowed value in a previous transform:
Timestamping the value in this case would overwrite the user
assigned window in earlier step even when the actual timestamp
is the same. Semantically, emitting an element or a
timestamped value with the same timestamp should have the same
behaviour.
>>
>> What do you think?
>>
>>
>> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
>>>
>>> If an element is emitted with a timestamp, the window
assignment is
>>> re-applied at that time. At least that's how it is in
Python. You can
>>> emit the full windowed value (accepted without checking...), a
>>> timestamped value (in which case the window will be
computed), or a
>>> plain old element (in which case the window and timestamp
will be
>>> computed (really, propagated)).
>>>
>>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka
<goe...@google.com <mailto:goe...@google.com>> wrote:
>>> >
>>> > Yup, This might result in unintended behavior as
timestamp is changed after the window assignment as elements
in windows do not have timestamp in the window time range.
>>> >
>>> > Shall we start validating atleast one window assignment
between timestamp assignment and GBK/triggers to avoid
unintended behaviors mentioned above?
>>> >
>>> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik
<lc...@google.com <mailto:lc...@google.com>> wrote:
>>> >>
>>> >> Window assignment happens at the point in the pipeline
the WindowInto transform was applied. So in this case the
window would have been assigned using the original timestamp.
>>> >>
>>> >> Grouping is by key and window.
>>> >>
>>> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka
<goe...@google.com <mailto:goe...@google.com>> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I am not sure about the effect of the order of element
timestamp change and window association has on a group by key.
>>> >>> More specifically, what would be the behavior if we
apply window -> change element timestamp -> Group By key.
>>> >>> I think we should always apply window function after
changing the timestamp of elements. Though this is neither
checked nor a recommended practice in Beam.
>>> >>>
>>> >>> Example pipeline would look like this:
>>> >>>
>>> >>> def applyTimestamp(value):
>>> >>> return window.TimestampedValue((key,
value), int(time.time())
>>> >>>
>>> >>> p \
>>> >>> | 'Create' >> beam.Create(range(0, 10)) \
>>> >>> | 'Fixed Window' >>
beam.WindowInto(window.FixedWindows(5)) \
>>> >>> | 'Apply Timestamp' >>
beam.Map(applyTimestamp) \ # Timestamp is changed after
windowing and before GBK
>>> >>> | 'Group By Key' >> beam.GroupByKey() \
>>> >>> | 'Print' >> beam.Map(print)
>>> >>>
>>> >>> Thanks,
>>> >>> Ankur