Ah, that's fine I think. What's not fine is .for an on-time element to later turn into a late element.
On Mon, Mar 12, 2018 at 4:05 PM Shen Li <cs.she...@gmail.com> wrote: > Sure. Consider the following case, where I have two input streams A and B. > (ts = timestamp, wm = watermark) > > processing time stream A stream B > > 0 elem=x, ts=1 wm=3 > 1 wm=1 elem=y, > ts=2 > 2 wm=2 elem=z, > ts=4 > > In stream B, {elem=y, ts=2} is late, as it's timestamp (2) falls behind > the watermark (3). > > However, the Flatten output would be: {elem=x, ts=1}, {wm=1}, {elem=y, > ts=2} {wm=2}, {elem=z, ts=4}, where the {elem=y, ts=2} is no longer late > due to the slower watermark progress in stream A. > > Is the above a valid scenario? > > Shen > > > On Mon, Mar 12, 2018 at 6:46 PM, Reuven Lax <re...@google.com> wrote: > >> Logically a Flatten is just a way to create a multi-input transform >> downstream of the flatten (you can imagine a model in which Flatten was not >> explicit, we just allowed multiple main inputs). This means that yes, the >> watermark is the minimum of all inputs. >> >> I don't see how a late tuple can become early. Can you explain? >> >> >> On Mon, Mar 12, 2018 at 2:07 PM Shen Li <cs.she...@gmail.com> wrote: >> >>> Hi Reuven, >>> >>> What about watermark? Should Flatten emit the min watermark of all input >>> data streams? If that is the case, one late tuple can become early after >>> Flatten, right? Will that cause any problem? >>> >>> Shen >>> >>> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <re...@google.com> wrote: >>> >>>> No, I don't think it makes sense for the Flatten operator to cache >>>> element. >>>> >>>> >>>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs.she...@gmail.com> wrote: >>>> >>>>> If multiple inputs of Flatten proceed at different speeds, should the >>>>> Flatten operator cache tuples before emitting output watermarks? This can >>>>> prevent a late tuple from becoming early. But if the watermark gap (i.e., >>>>> cache size) becomes too large among inputs, can the application tell >>>>> Beam/runner to emit output watermark anyway and consider slow input tuples >>>>> as late? >>>>> >>>>> Thanks, >>>>> Shen >>>>> >>>> >>> >