That one would be, for example, having a PCollection with a highly advanced watermark and a PCollection with a much earlier watermark, and have an input that is behind the watermark of the former PCollection go through the flatten - at which point it moves to being ahead of the watermark.
That's fine, because one of two things happens in practice: * Either the upstream contains a GroupByKey, in which the element will be dropped if the window is expired * Or, the upstream does not contain a GroupByKey, which means the element never appeared at such a grouping behind the watermark, its final window was never expired before that element arrived at the first downstream GroupByKey. Specifically we're concerned about GroupByKeys because that's the point at which we become certain of the window the element is within, and if that window is expired; before that point, we can't claim with certainty on the final window the element will be assigned to. On Mon, Mar 12, 2018 at 3:47 PM Reuven Lax <re...@google.com> wrote: > Logically a Flatten is just a way to create a multi-input transform > downstream of the flatten (you can imagine a model in which Flatten was not > explicit, we just allowed multiple main inputs). This means that yes, the > watermark is the minimum of all inputs. > > I don't see how a late tuple can become early. Can you explain? > > > On Mon, Mar 12, 2018 at 2:07 PM Shen Li <cs.she...@gmail.com> wrote: > >> Hi Reuven, >> >> What about watermark? Should Flatten emit the min watermark of all input >> data streams? If that is the case, one late tuple can become early after >> Flatten, right? Will that cause any problem? >> >> Shen >> >> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <re...@google.com> wrote: >> >>> No, I don't think it makes sense for the Flatten operator to cache >>> element. >>> >>> >>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs.she...@gmail.com> wrote: >>> >>>> If multiple inputs of Flatten proceed at different speeds, should the >>>> Flatten operator cache tuples before emitting output watermarks? This can >>>> prevent a late tuple from becoming early. But if the watermark gap (i.e., >>>> cache size) becomes too large among inputs, can the application tell >>>> Beam/runner to emit output watermark anyway and consider slow input tuples >>>> as late? >>>> >>>> Thanks, >>>> Shen >>>> >>> >>