Sure. Consider the following case, where I have two input streams A and B. (ts = timestamp, wm = watermark)
processing time stream A stream B 0 elem=x, ts=1 wm=3 1 wm=1 elem=y, ts=2 2 wm=2 elem=z, ts=4 In stream B, {elem=y, ts=2} is late, as it's timestamp (2) falls behind the watermark (3). However, the Flatten output would be: {elem=x, ts=1}, {wm=1}, {elem=y, ts=2} {wm=2}, {elem=z, ts=4}, where the {elem=y, ts=2} is no longer late due to the slower watermark progress in stream A. Is the above a valid scenario? Shen On Mon, Mar 12, 2018 at 6:46 PM, Reuven Lax <re...@google.com> wrote: > Logically a Flatten is just a way to create a multi-input transform > downstream of the flatten (you can imagine a model in which Flatten was not > explicit, we just allowed multiple main inputs). This means that yes, the > watermark is the minimum of all inputs. > > I don't see how a late tuple can become early. Can you explain? > > > On Mon, Mar 12, 2018 at 2:07 PM Shen Li <cs.she...@gmail.com> wrote: > >> Hi Reuven, >> >> What about watermark? Should Flatten emit the min watermark of all input >> data streams? If that is the case, one late tuple can become early after >> Flatten, right? Will that cause any problem? >> >> Shen >> >> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <re...@google.com> wrote: >> >>> No, I don't think it makes sense for the Flatten operator to cache >>> element. >>> >>> >>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs.she...@gmail.com> wrote: >>> >>>> If multiple inputs of Flatten proceed at different speeds, should the >>>> Flatten operator cache tuples before emitting output watermarks? This can >>>> prevent a late tuple from becoming early. But if the watermark gap (i.e., >>>> cache size) becomes too large among inputs, can the application tell >>>> Beam/runner to emit output watermark anyway and consider slow input tuples >>>> as late? >>>> >>>> Thanks, >>>> Shen >>>> >>> >>