On Thu, Jan 16, 2020 at 11:38 AM Robert Bradshaw <rober...@google.com> wrote:
> On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles <k...@apache.org> wrote: > > > > IIRC in Java it is forbidden to output an element with a timestamp > outside its current window. > > I don't think this is checked anywhere. (Not sure how you would check > it, as there's not generic window containment function--I suppose you > could check if it's past the end of the window (and of course skew > limits how far you can go back). I suppose you could try re-windowing > and then fail if it didn't agree with what was already there. > I think you are right. This is governed by how a runner invoked utilities from runners-core (output ultimately reaches this point without validation: https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L258 ) > > An exception is outputs from @FinishBundle, where the output timestamp > is required and the window is applied. TBH it seems more of an artifact of > a mismatch between the pre-windowing and post-windowing worlds. > > Elements are always in some window, even if just the global window. > I mean that the existence of a window-unaware @FinishBundle method is an artifact of the method existing prior to windowing as a concept. The idea that a user can use a DoFn's local variables to buffer stuff and then output in @FinishBundle predates the existence of windowing. > Most of the time, mixing processing across windows is simply wrong. But > there are fears that calling @FinishBundle once per window would be a > performance problem. On the other hand, don't most correct implementations > have to separate processing for each window anyhow? > > Processing needs to be done per window iff the result depends on the > window or if there are side effects. > > > Anyhow I think the Java behavior is better, so window assignment happens > exactly and only at window transforms. > > But then one ends up with timestamps that are unrelated to the windows, > right? > As far as the model goes, I think windows provide an upper bound but not a lower bound. If we take the approach that windows are a "secondary key with a max timestamp" then the timestamps should be related to the window in the sense that they are <= the window's max timestamp. Kenn > > Kenn > > > > On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka <goe...@google.com> wrote: > >> > >> The case where a plan vanilla value or a windowed value is emitted > seems as expected as the user intent is honored without any surprises. > >> > >> If I understand correctly in the case when timestamp is changed then > applying window function again can have unintended behavior in following > cases > >> * Custom windows: User code can be executed in unintended order. > >> * User emit a windowed value in a previous transform: Timestamping the > value in this case would overwrite the user assigned window in earlier step > even when the actual timestamp is the same. Semantically, emitting an > element or a timestamped value with the same timestamp should have the same > behaviour. > >> > >> What do you think? > >> > >> > >> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw <rober...@google.com> > wrote: > >>> > >>> If an element is emitted with a timestamp, the window assignment is > >>> re-applied at that time. At least that's how it is in Python. You can > >>> emit the full windowed value (accepted without checking...), a > >>> timestamped value (in which case the window will be computed), or a > >>> plain old element (in which case the window and timestamp will be > >>> computed (really, propagated)). > >>> > >>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka <goe...@google.com> > wrote: > >>> > > >>> > Yup, This might result in unintended behavior as timestamp is > changed after the window assignment as elements in windows do not have > timestamp in the window time range. > >>> > > >>> > Shall we start validating atleast one window assignment between > timestamp assignment and GBK/triggers to avoid unintended behaviors > mentioned above? > >>> > > >>> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik <lc...@google.com> wrote: > >>> >> > >>> >> Window assignment happens at the point in the pipeline the > WindowInto transform was applied. So in this case the window would have > been assigned using the original timestamp. > >>> >> > >>> >> Grouping is by key and window. > >>> >> > >>> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka <goe...@google.com> > wrote: > >>> >>> > >>> >>> Hi, > >>> >>> > >>> >>> I am not sure about the effect of the order of element timestamp > change and window association has on a group by key. > >>> >>> More specifically, what would be the behavior if we apply window > -> change element timestamp -> Group By key. > >>> >>> I think we should always apply window function after changing the > timestamp of elements. Though this is neither checked nor a recommended > practice in Beam. > >>> >>> > >>> >>> Example pipeline would look like this: > >>> >>> > >>> >>> def applyTimestamp(value): > >>> >>> return window.TimestampedValue((key, value), > int(time.time()) > >>> >>> > >>> >>> p \ > >>> >>> | 'Create' >> beam.Create(range(0, 10)) \ > >>> >>> | 'Fixed Window' >> > beam.WindowInto(window.FixedWindows(5)) \ > >>> >>> | 'Apply Timestamp' >> beam.Map(applyTimestamp) \ # > Timestamp is changed after windowing and before GBK > >>> >>> | 'Group By Key' >> beam.GroupByKey() \ > >>> >>> | 'Print' >> beam.Map(print) > >>> >>> > >>> >>> Thanks, > >>> >>> Ankur >