Sure. Consider the following case, where I have two input streams A and B.
(ts = timestamp, wm = watermark)

processing time            stream A                      stream B

0                                   elem=x, ts=1                 wm=3
1                                   wm=1                        elem=y, ts=2
2                                   wm=2                        elem=z, ts=4

In stream B, {elem=y, ts=2} is late, as it's timestamp (2) falls behind the
watermark (3).

However, the Flatten output would be: {elem=x, ts=1}, {wm=1}, {elem=y,
ts=2} {wm=2}, {elem=z, ts=4}, where the {elem=y, ts=2} is no longer late
due to the slower watermark progress in stream A.

Is the above a valid scenario?

Shen


On Mon, Mar 12, 2018 at 6:46 PM, Reuven Lax <re...@google.com> wrote:

> Logically a Flatten is just a way to create a multi-input transform
> downstream of the flatten (you can imagine a model in which Flatten was not
> explicit, we just allowed multiple main inputs). This means that yes, the
> watermark is the minimum of all inputs.
>
> I don't see how a late tuple can become early. Can you explain?
>
>
> On Mon, Mar 12, 2018 at 2:07 PM Shen Li <cs.she...@gmail.com> wrote:
>
>> Hi Reuven,
>>
>> What about watermark? Should Flatten emit the min watermark of all input
>> data streams? If that is the case, one late tuple can become early after
>> Flatten, right? Will that cause any problem?
>>
>> Shen
>>
>> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <re...@google.com> wrote:
>>
>>> No, I don't think it makes sense for the Flatten operator to cache
>>> element.
>>>
>>>
>>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs.she...@gmail.com> wrote:
>>>
>>>> If multiple inputs of Flatten proceed at different speeds, should the
>>>> Flatten operator cache tuples before emitting output watermarks? This can
>>>> prevent a late tuple from becoming early. But if the watermark gap (i.e.,
>>>> cache size) becomes too large among inputs, can the application tell
>>>> Beam/runner to emit output watermark anyway and consider slow input tuples
>>>> as late?
>>>>
>>>> Thanks,
>>>> Shen
>>>>
>>>
>>

Reply via email to