Ben Chambers commented on BEAM-644:

I'm not sure if this will clarify, but there are two parts:
1. A function which for a given input timestamp (or input element) computes an 
output timestamp.
2. A duration which declares the maximum distance the output timestamp could be 
behind the input timestamp.

Consider a case where elements are read from a stream with timestamps based on 
when they were published.
But each element also has a timestamp that indicates when it actually happened.
And we know that there is at most a 60 minute delay between when it happened 
and when it was published.

We want to adjust the timestamps of elements to be when they actually happened. 
But this may shift timestamps up to 60 minutes into the past. Naively, this 
will make data later, since the watermark logic doesn't know about the shift. 
But, with this proposal we'd write something that couples the shift up to 60 
minutes into the past with a declaration that we'll shift at most up to 60 
minutes into the past. The first applies to each element while the second 
relates the output watermark to the input watermark.

Similar reasoning could apply the other direction as well (shifting the output 
watermark forward) if we knew that we'd always adjust timestamps forward by at 
least a certain amount.

> Primitive to shift the watermark while assigning timestamps
> -----------------------------------------------------------
>                 Key: BEAM-644
>                 URL: https://issues.apache.org/jira/browse/BEAM-644
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement<T> to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.

This message was sent by Atlassian JIRA

Reply via email to