Thomas and Reuven, Thank you for the explanation.
Shen On Mon, Mar 12, 2018 at 7:05 PM, Thomas Groh <[email protected]> wrote: > That one would be, for example, having a PCollection with a highly > advanced watermark and a PCollection with a much earlier watermark, and > have an input that is behind the watermark of the former PCollection go > through the flatten - at which point it moves to being ahead of the > watermark. > > That's fine, because one of two things happens in practice: > * Either the upstream contains a GroupByKey, in which the element will be > dropped if the window is expired > * Or, the upstream does not contain a GroupByKey, which means the element > never appeared at such a grouping behind the watermark, its final window > was never expired before that element arrived at the first downstream > GroupByKey. > > Specifically we're concerned about GroupByKeys because that's the point at > which we become certain of the window the element is within, and if that > window is expired; before that point, we can't claim with certainty on the > final window the element will be assigned to. > > > On Mon, Mar 12, 2018 at 3:47 PM Reuven Lax <[email protected]> wrote: > >> Logically a Flatten is just a way to create a multi-input transform >> downstream of the flatten (you can imagine a model in which Flatten was not >> explicit, we just allowed multiple main inputs). This means that yes, the >> watermark is the minimum of all inputs. >> >> I don't see how a late tuple can become early. Can you explain? >> >> >> On Mon, Mar 12, 2018 at 2:07 PM Shen Li <[email protected]> wrote: >> >>> Hi Reuven, >>> >>> What about watermark? Should Flatten emit the min watermark of all input >>> data streams? If that is the case, one late tuple can become early after >>> Flatten, right? Will that cause any problem? >>> >>> Shen >>> >>> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <[email protected]> wrote: >>> >>>> No, I don't think it makes sense for the Flatten operator to cache >>>> element. >>>> >>>> >>>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <[email protected]> wrote: >>>> >>>>> If multiple inputs of Flatten proceed at different speeds, should the >>>>> Flatten operator cache tuples before emitting output watermarks? This can >>>>> prevent a late tuple from becoming early. But if the watermark gap (i.e., >>>>> cache size) becomes too large among inputs, can the application tell >>>>> Beam/runner to emit output watermark anyway and consider slow input tuples >>>>> as late? >>>>> >>>>> Thanks, >>>>> Shen >>>>> >>>> >>>
