Ah, that's fine I think. What's not fine is .for an on-time element to
later turn into a late element.


On Mon, Mar 12, 2018 at 4:05 PM Shen Li <cs.she...@gmail.com> wrote:

> Sure. Consider the following case, where I have two input streams A and B.
> (ts = timestamp, wm = watermark)
>
> processing time            stream A                      stream B
>
> 0                                   elem=x, ts=1                 wm=3
> 1                                   wm=1                        elem=y,
> ts=2
> 2                                   wm=2                        elem=z,
> ts=4
>
> In stream B, {elem=y, ts=2} is late, as it's timestamp (2) falls behind
> the watermark (3).
>
> However, the Flatten output would be: {elem=x, ts=1}, {wm=1}, {elem=y,
> ts=2} {wm=2}, {elem=z, ts=4}, where the {elem=y, ts=2} is no longer late
> due to the slower watermark progress in stream A.
>
> Is the above a valid scenario?
>
> Shen
>
>
> On Mon, Mar 12, 2018 at 6:46 PM, Reuven Lax <re...@google.com> wrote:
>
>> Logically a Flatten is just a way to create a multi-input transform
>> downstream of the flatten (you can imagine a model in which Flatten was not
>> explicit, we just allowed multiple main inputs). This means that yes, the
>> watermark is the minimum of all inputs.
>>
>> I don't see how a late tuple can become early. Can you explain?
>>
>>
>> On Mon, Mar 12, 2018 at 2:07 PM Shen Li <cs.she...@gmail.com> wrote:
>>
>>> Hi Reuven,
>>>
>>> What about watermark? Should Flatten emit the min watermark of all input
>>> data streams? If that is the case, one late tuple can become early after
>>> Flatten, right? Will that cause any problem?
>>>
>>> Shen
>>>
>>> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <re...@google.com> wrote:
>>>
>>>> No, I don't think it makes sense for the Flatten operator to cache
>>>> element.
>>>>
>>>>
>>>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs.she...@gmail.com> wrote:
>>>>
>>>>> If multiple inputs of Flatten proceed at different speeds, should the
>>>>> Flatten operator cache tuples before emitting output watermarks? This can
>>>>> prevent a late tuple from becoming early. But if the watermark gap (i.e.,
>>>>> cache size) becomes too large among inputs, can the application tell
>>>>> Beam/runner to emit output watermark anyway and consider slow input tuples
>>>>> as late?
>>>>>
>>>>> Thanks,
>>>>> Shen
>>>>>
>>>>
>>>
>

Reply via email to