On 4/5/22 21:19, Kenneth Knowles wrote:
Coming to this a bit late, but to remind that we switched to a whole-window definition of lateness/droppability. Elements in front of / behind a watermark or other elements should hardly matter for lateness. Anything output within a window should be fine as long as the window itself is treated consistently, no?

I don't think this is 100% the case. What is important is preserving causality and "happens-before" semantics. Yes, if we constraint ourselves on windowed GBK-like transforms then there are only two instants that matter - window.maxTimestamp and window GC time.

Generic event streams can have intrinsic happens-before relationships baked inside the data. An example is @RequiresTimeSortedInput where *all* events in a stream are treated as if there was a causal relationship if T1 < T2, therefore *every* watermark move creates an instant in event-time where droppable elements *must* be dropped, because otherwise it would violate the contract. At least until we have full retractions (which is a way of moving back in time).


I think the idea is to allow runners to have as much flexibility as possible by making things as unobservable as possible. For example a timer firing only allows you to bound the input elements' watermark on one side.

Regarding the pure 1:1 mapping transform, we deliberately can observe if a vanilla DoFn has a startbundle/finishbundle method, by design. So if the runner knows that the user code cannot possibly observe bundle boundaries, the runner can have even more flexibility.
+1

Kenn

On Wed, Mar 30, 2022 at 8:43 AM Robert Bradshaw <rober...@google.com> wrote:

    On Tue, Mar 29, 2022 at 1:11 AM Jan Lukavský <je...@seznam.cz> wrote:
    >
    > > There's another interesting API, which is being discussed for the
    > > internal variant of Dataflow, which is that rather than
    allowing one
    > > to fabricate timestamps (or windows) ex nihilo one would
    instead need
    > > ot ask for a "timestamped" or "windowed" element in the Process
    > > method, from which one could construct a new timestamped/windowed
    > > element (with a new value, but the same timestamp/window/paneinfo)
    > > that could then be safely emitted. I'm curious how
    constraining this
    > > would be.
    >
    > I'm not sure I follow. Do you suggest that - for the case of
    in-memory
    > batching - one would store a TimestampedValue in the buffer and when
    > flushing the buffer one would say "I'm emitting this value, that was
    > created based on this input element"?

    Correct. The "that was created based on this input element" is
    implicit--it's just that the only way to get timestamps/windows (in an
    ordinary DoFn) is to get them from an input element. (One would also
    separately have "timestamping" and "windowing" operations that do
    assignment to timestamp/window.)

    > That seems to work fine, though I
    > suppose this is probably not the main motivation for such API. :)

    Actually, this is the motivation for the API. In order to correctly
    buffer, one needs to keep the timestamp/window/paneinfo information
    associated with each element around, so this encourages (or even
    forces) one to do so in the correct way.

    > On 3/28/22 20:54, Robert Bradshaw wrote:
    > > On Mon, Mar 28, 2022 at 11:45 AM Jan Lukavský
    <je...@seznam.cz> wrote:
    > >> On 3/28/22 20:17, Reuven Lax wrote:
    > >>
    > >> On Mon, Mar 28, 2022 at 11:08 AM Robert Bradshaw
    <rober...@google.com> wrote:
    > >>> On Mon, Mar 28, 2022 at 11:04 AM Reuven Lax
    <re...@google.com> wrote:
    > >>>> On Mon, Mar 28, 2022 at 10:59 AM Evan Galpin
    <evan.gal...@gmail.com> wrote:
    > >>>>> I don't believe that the issue is Flink specific but
    rather that Flink is one example of many potential examples. 
    Enforcing that watermark updates can only happen at bundle
    boundaries would ensure that any data buffered while processing a
    single bundle in a DoFn could be output ON_TIME, especially
    without any need for a TimerSpec to explicitly hold the watermark
    for that purpose.  This is in reference to data buffered within a
    single bundle, and not cross-bundle buffering such as in the case
    of GroupIntoBatches.
    > >>>>
    > >>>> Any in-flight data (i.e. data being processed that is not
    yet committed back to the runner) must hold up the output
    watermark. Since in the Beam model all records in a bundle are
    somewhat atomic (e.g. if the bundle succeeds, none of of them
    should be replayed in a proper exactly-once runner), I think this
    implicitly means that any elements in an in-flight bundle must
    hold up the watermark. This doesn't mean that the watermark can't
    advance while the bundle is in flight -just that it can't advance
    past any of the timestamps outstanding in the bundle.
    > >>> Yes. The difficulty is that we don't have much visibility into
    > >>> "timestamps outstanding in the bundle" so we have to take
    > >>> min(timestamps of input elements in the bundle) which is not
    that
    > >>> different from only having watermark updates at bundle
    boundaries.
    > >>
    > >> Exactly.
    > >>
    > >> Agree, this works exactly the same. The requirement is not to
    not update the watermark, but not to update it past any on-time
    element in the bundle. Not updating the watermark at all is one
    solution, computing min(timestamps in bundle) works the same.
    Unfortunately, Flink does not construct bundles in advance, it is
    more an ad-hoc concept. Therefore the only way to hold the
    watermark is not to update it, because the timestamps of elements
    that will be part of the bundle are not known.
    > >>
    > >> Two more questions:
    > >>
    > >>   a) it seems that we are missing some @ValidatesRunner tests
    for this, right?
    > >>
    > >>   b) should we relax the restriction of not allowing
    outputWithTimestamp() output element before the current element? I
    think it should be "before lowest element in the current bundle"
    or "before output watermark, if not already late, or not droppable
    if late (uh, this gets a little complicated :))". Not allowing
    outputting element with timestamp lower than the current element
    seems to be just a "safety-first" solution to the problem
    discussed here and is too restrictive. It could be worked-around
    using getAllowedTimestampSkew(), but that can cause errors.
    > > Yes, outputWithTimestamp should likely be restricted to
    min(elements
    > > seen so far).
    > >
    > > There's another interesting API, which is being discussed for the
    > > internal variant of Dataflow, which is that rather than
    allowing one
    > > to fabricate timestamps (or windows) ex nihilo one would
    instead need
    > > ot ask for a "timestamped" or "windowed" element in the Process
    > > method, from which one could construct a new timestamped/windowed
    > > element (with a new value, but the same timestamp/window/paneinfo)
    > > that could then be safely emitted. I'm curious how
    constraining this
    > > would be.
    > >
    > >>>>> Take for example a PCollection with 1 second Fixed
    windowing.  The PCollection holds payload bodies for an external
    API to which requests will be made.  A hypothetical runner creates
    a bundle with Element A and Element B where Element A belongs to
    the window [0:00:01, 0:00:02) and Element B belongs to the window
    [0:00:02, 0:00:03).  Assume that the DoFn is going to buffer all
    elements in a bundle so as to generate fewer round-trip requests
    to the external API, and then output the corresponding responses.
    The following is a high-level order of events that could result in
    data being labelled as LATE:
    > >>>>>
    > >>>>> 1. Watermark is 0:00:00
    > >>>>> 2. DoFn receives the bundle containing both Element A and
    Element B
    > >>>>> 3. Element A is processed by the DoFn, buffering in-memory
    and returning/completing
    > >>>>> 4. Watermark is (maybe) updated after having processed the
    element; let's assume in this example it is in fact updated to 0:00:02
    > >>>>> 5. Element B (from the same bundle) is processed by the DoFn
    > >>>>> 6. It's the end of the bundle, so now the in-memory
    buffered entities are used to make a request to external API
    > >>>>> 7. The API responses are gathered and intended to be
    output to the same window from which the corresponding element
    with request data originated (Element A and Element B carried this
    data)
    > >>>>> 8. The response data associated with the request payload
    found in Element A is output with the timestamp of Element A i.e.
    something in the range of Element A's window [0:00:01, 0:00:02)
    > >>>>> 9. The data in the prior step is considered LATE, strictly
    as a result of updating the watermark to 0:00:02 in Step 4 above
    > >>>>>
    > >>>>> If Step 4 was moved to be the last step in the process
    (i.e. at the bundle boundary) this issue would be avoided.  I
    would also argue that updating the watermark only after receiving
    a response for an input Element is a more accurate depiction of
    having completed processing for the element.  All that said, I
    could buy the argument that the above description might represent
    an anti-pattern of sorts where response data should actually be
    output with a timestamp corresponding to its receipt rather than
    the timestamp of its corresponding input element carrying the
    request body.
    > >>>>>
    > >>>>> - Evan
    > >>>>>
    > >>>>> On Mon, Mar 28, 2022 at 11:07 AM Reuven Lax
    <re...@google.com> wrote:
    > >>>>>> I agree with you that changing on-time elements to late
    elements is incorrect, however I don't quite understand why doing
    things on bundle boundaries helps. Is this specific to Flink?
    > >>>>>>
    > >>>>>>
    > >>>>>>
    > >>>>>> On Mon, Mar 28, 2022 at 1:07 AM Jan Lukavský
    <je...@seznam.cz> wrote:
    > >>>>>>> Hi Robert,
    > >>>>>>>
    > >>>>>>> I had the same impression that holding the watermark
    between bundles is
    > >>>>>>> actually not part of the computational model. Now the
    question is -
    > >>>>>>> should it be?
    > >>>>>>>
    > >>>>>>> As you said, buffering and emitting in-memory buffered
    data means
    > >>>>>>> possibly outputting data that arrived as ON_TIME, but is
    outputted as
    > >>>>>>> LATE (or droppable, which is even worse). My
    understanding is that this
    > >>>>>>> is why there is the (deprecated)
    getAllowedTimestampSkew() method of
    > >>>>>>> DoFn, but that only bypasses the check, does not solve
    the issue (which
    > >>>>>>> is why it is deprecated, I suppose). I strongly believe
    that outputting
    > >>>>>>> elements that switch from ON_TIME to LATE is a
    correctness bug, because
    > >>>>>>> it has the potential to violate causality (which is strongly
    > >>>>>>> counter-intuitive in our universe :)). For some
    pipelines it can
    > >>>>>>> definitely cause incorrect outputs.
    > >>>>>>>
    > >>>>>>> If we could ensure the output watermark gets updated
    only between
    > >>>>>>> @FinishBundle and @StartBundle call then this problem
    would go away. I
    > >>>>>>> looked into the code of FlinkRunner and it seems to me
    that we could
    > >>>>>>> quite easily ensure this by not outputting watermark
    when a bundle is
    > >>>>>>> open and output it once it finishes. I didn't dig into
    that too deep, so
    > >>>>>>> I don't know if there would be any caveats, the question
    is apparently,
    > >>>>>>> if we could make these guarantees for other runners as
    well and if we
    > >>>>>>> could sensibly create a @ValidatesRunner test.
    > >>>>>>>
    > >>>>>>> WDYT?
    > >>>>>>>
    > >>>>>>>    Jan
    > >>>>>>>
    > >>>>>>> On 3/25/22 23:06, Robert Bradshaw wrote:
    > >>>>>>>> I do not think there is a hard and fast rule about
    updating watermarks
    > >>>>>>>> only at bundle boundaries. This seems perfectly legal
    for a pure 1:1
    > >>>>>>>> mapping DoFn. The issue is that DoFns are allowed to
    buffer data and
    > >>>>>>>> emit them in a later process (or finishBundle). If the
    watermark has
    > >>>>>>>> moved on, that may result in late data. We don't really
    have a way for
    > >>>>>>>> a DoFn to declare *it's* output watermark (i.e. "I
    promise not to emit
    > >>>>>>>> any data before this timestamp.")
    > >>>>>>>>
    > >>>>>>>> On Thu, Mar 24, 2022 at 8:10 AM Evan Galpin
    <egal...@apache.org> wrote:
    > >>>>>>>>> Thanks for starting this thread Jan, I'm keen to hear
    thoughts and outcomes!  I thought I would mention that answers to
    the questions posed here will help to unblock a 2.38.0 release
    blocker[1].
    > >>>>>>>>>
    > >>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-14064
    > >>>>>>>>>
    > >>>>>>>>> On Thu, Mar 24, 2022 at 5:28 AM Jan Lukavský
    <je...@seznam.cz> wrote:
    > >>>>>>>>>> Hi,
    > >>>>>>>>>>
    > >>>>>>>>>> this is follow-up thread started from [1]. In the
    thread there is mentioned multiple times that (in stateless
    ParDo), the output watermark is allowed to advance only on bundle
    boundaries [2]. Essentially that would mean that anything in
    between calls to @StartBundle and @FinishBundle would be processed
    in single instant in (output) event-time. This makes perfect sense.
    > >>>>>>>>>>
    > >>>>>>>>>> The issue is that it seems that not all runners
    actually implement this behavior. FlinkRunner for instance does
    not have a "natural" concept of bundles and those are created in a
    more ad-hoc way to adhere with the DoFn life-cycle (see [3]).
    Watermark updates and elements are completely interleaved without
    any synchronization with bundle "open" or "close". If watermark
    updates are allowed to happen only on boundaries of bundles, then
    this seems to break this contract.
    > >>>>>>>>>>
    > >>>>>>>>>> The question therefore is - should we consider
    FlinkRunner as non-compliant with this aspect of the Apache Beam
    model or is this an "optional" part that runners are free to
    implement at will? In the case of the former, do we miss some
    @ValidatesRunner tests for this?
    > >>>>>>>>>>
    > >>>>>>>>>>    Jan
    > >>>>>>>>>>
    > >>>>>>>>>> [1]
    https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
    > >>>>>>>>>>
    > >>>>>>>>>> [2]
    https://lists.apache.org/thread/7foy455spg43xo77zhrs62gc1m383t50
    > >>>>>>>>>>
    > >>>>>>>>>> [3]
    
https://github.com/apache/beam/blob/14862ccbdf2879574b6ce49149bdd7c9bf197322/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L786
    > >>>>>>>>>>
    > >>>>>>>>>>

Reply via email to