Kenneth Knowles created BEAM-644:

             Summary: Primitive to shift the watermark while assigning 
                 Key: BEAM-644
             Project: Beam
          Issue Type: New Feature
          Components: beam-model
            Reporter: Kenneth Knowles
            Assignee: Kenneth Knowles

There is a general need, especially important in the presence of 
SplittableDoFn, to be able to assign new timestamps to elements without making 
them late or droppable.

 - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one to 
produce late data, but does not allow one to shift the watermark so the new 
data is on-time.
 - For a SplittableDoFn, one may receive an element such as the name of a log 
file that contains elements for the day preceding the log file. The timestamp 
on the filename must currently be the beginning of the log. If such elements 
are constantly flowing, it may be OK, but since we don't know that element is 
coming, in that absence of data, the watermark may advance. We need a way to 
keep it far enough back even in the absence of data holding it back.

One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
following pieces:

 - A constant duration (positive or negative) D by which to shift the watermark.
 - A function from TimestampedElement<T> to new timestamp that always falls 
within D of the original timestamp.

With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
could be removed, simplifying DoFn.

Alternatively, all of this functionality could be bolted on to DoFn.

This ticket is not a proposal, but a record of the issue and ideas that were 

This message was sent by Atlassian JIRA

Reply via email to