On Thu, Jan 16, 2020 at 11:38 AM Robert Bradshaw <rober...@google.com>
wrote:

> On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles <k...@apache.org> wrote:
> >
> > IIRC in Java it is forbidden to output an element with a timestamp
> outside its current window.
>
> I don't think this is checked anywhere. (Not sure how you would check
> it, as there's not generic window containment function--I suppose you
> could check if it's past the end of the window (and of course skew
> limits how far you can go back). I suppose you could try re-windowing
> and then fail if it didn't agree with what was already there.
>

I think you are right. This is governed by how a runner invoked utilities
from runners-core (output ultimately reaches this point without validation:
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L258
)


> > An exception is outputs from @FinishBundle, where the output timestamp
> is required and the window is applied. TBH it seems more of an artifact of
> a mismatch between the pre-windowing and post-windowing worlds.
>
> Elements are always in some window, even if just the global window.
>

I mean that the existence of a window-unaware @FinishBundle method is an
artifact of the method existing prior to windowing as a concept. The idea
that a user can use a DoFn's local variables to buffer stuff and then
output in @FinishBundle predates the existence of windowing.

> Most of the time, mixing processing across windows is simply wrong. But
> there are fears that calling @FinishBundle once per window would be a
> performance problem. On the other hand, don't most correct implementations
> have to separate processing for each window anyhow?
>
> Processing needs to be done per window iff the result depends on the
> window or if there are side effects.
>
> > Anyhow I think the Java behavior is better, so window assignment happens
> exactly and only at window transforms.
>
> But then one ends up with timestamps that are unrelated to the windows,
> right?
>

As far as the model goes, I think windows provide an upper bound but not a
lower bound. If we take the approach that windows are a "secondary key with
a max timestamp" then the timestamps should be related to the window in the
sense that they are <= the window's max timestamp.

Kenn



> > Kenn
> >
> > On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka <goe...@google.com> wrote:
> >>
> >> The case where a plan vanilla value or a windowed value is emitted
> seems as expected as the user intent is honored without any surprises.
> >>
> >> If I understand correctly in the case when timestamp is changed then
> applying window function again can have unintended behavior in following
> cases
> >> * Custom windows: User code can be executed in unintended order.
> >> * User emit a windowed value in a previous transform: Timestamping the
> value in this case would overwrite the user assigned window in earlier step
> even when the actual timestamp is the same. Semantically, emitting an
> element or a timestamped value with the same timestamp should have the same
> behaviour.
> >>
> >> What do you think?
> >>
> >>
> >> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw <rober...@google.com>
> wrote:
> >>>
> >>> If an element is emitted with a timestamp, the window assignment is
> >>> re-applied at that time. At least that's how it is in Python. You can
> >>> emit the full windowed value (accepted without checking...), a
> >>> timestamped value (in which case the window will be computed), or a
> >>> plain old element (in which case the window and timestamp will be
> >>> computed (really, propagated)).
> >>>
> >>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka <goe...@google.com>
> wrote:
> >>> >
> >>> > Yup, This might result in unintended behavior as timestamp is
> changed after the window assignment as elements in windows do not have
> timestamp in the window time range.
> >>> >
> >>> > Shall we start validating atleast one window assignment between
> timestamp assignment and GBK/triggers to avoid unintended behaviors
> mentioned above?
> >>> >
> >>> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik <lc...@google.com> wrote:
> >>> >>
> >>> >> Window assignment happens at the point in the pipeline the
> WindowInto transform was applied. So in this case the window would have
> been assigned using the original timestamp.
> >>> >>
> >>> >> Grouping is by key and window.
> >>> >>
> >>> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka <goe...@google.com>
> wrote:
> >>> >>>
> >>> >>> Hi,
> >>> >>>
> >>> >>> I am not sure about the effect of the order of element timestamp
> change and window association has on a group by key.
> >>> >>> More specifically, what would be the behavior if we apply window
> -> change element timestamp -> Group By key.
> >>> >>> I think we should always apply window function after changing the
> timestamp of elements. Though this is neither checked nor a recommended
> practice in Beam.
> >>> >>>
> >>> >>> Example pipeline would look like this:
> >>> >>>
> >>> >>>       def applyTimestamp(value):
> >>> >>>             return window.TimestampedValue((key, value),
> int(time.time())
> >>> >>>
> >>> >>>         p \
> >>> >>>             | 'Create' >> beam.Create(range(0, 10)) \
> >>> >>>             | 'Fixed Window' >>
> beam.WindowInto(window.FixedWindows(5)) \
> >>> >>>             | 'Apply Timestamp' >> beam.Map(applyTimestamp) \ #
> Timestamp is changed after windowing and before GBK
> >>> >>>             | 'Group By Key' >> beam.GroupByKey() \
> >>> >>>             | 'Print' >> beam.Map(print)
> >>> >>>
> >>> >>> Thanks,
> >>> >>> Ankur
>

Reply via email to