Thomas and Reuven,

Thank you for the explanation.

Shen

On Mon, Mar 12, 2018 at 7:05 PM, Thomas Groh <[email protected]> wrote:

> That one would be, for example, having a PCollection with a highly
> advanced watermark and a PCollection with a much earlier watermark, and
> have an input that is behind the watermark of the former PCollection go
> through the flatten - at which point it moves to being ahead of the
> watermark.
>
> That's fine, because one of two things happens in practice:
> * Either the upstream contains a GroupByKey, in which the element will be
> dropped if the window is expired
> * Or, the upstream does not contain a GroupByKey, which means the element
> never appeared at such a grouping behind the watermark, its final window
> was never expired before that element arrived at the first downstream
> GroupByKey.
>
> Specifically we're concerned about GroupByKeys because that's the point at
> which we become certain of the window the element is within, and if that
> window is expired; before that point, we can't claim with certainty on the
> final window the element will be assigned to.
>
>
> On Mon, Mar 12, 2018 at 3:47 PM Reuven Lax <[email protected]> wrote:
>
>> Logically a Flatten is just a way to create a multi-input transform
>> downstream of the flatten (you can imagine a model in which Flatten was not
>> explicit, we just allowed multiple main inputs). This means that yes, the
>> watermark is the minimum of all inputs.
>>
>> I don't see how a late tuple can become early. Can you explain?
>>
>>
>> On Mon, Mar 12, 2018 at 2:07 PM Shen Li <[email protected]> wrote:
>>
>>> Hi Reuven,
>>>
>>> What about watermark? Should Flatten emit the min watermark of all input
>>> data streams? If that is the case, one late tuple can become early after
>>> Flatten, right? Will that cause any problem?
>>>
>>> Shen
>>>
>>> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <[email protected]> wrote:
>>>
>>>> No, I don't think it makes sense for the Flatten operator to cache
>>>> element.
>>>>
>>>>
>>>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <[email protected]> wrote:
>>>>
>>>>> If multiple inputs of Flatten proceed at different speeds, should the
>>>>> Flatten operator cache tuples before emitting output watermarks? This can
>>>>> prevent a late tuple from becoming early. But if the watermark gap (i.e.,
>>>>> cache size) becomes too large among inputs, can the application tell
>>>>> Beam/runner to emit output watermark anyway and consider slow input tuples
>>>>> as late?
>>>>>
>>>>> Thanks,
>>>>> Shen
>>>>>
>>>>
>>>

Reply via email to