[
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812300#comment-16812300
]
Deniz Saner commented on BEAM-644:
----------------------------------
Can somebody elaborate on the status of this issue? The method
`withAllowedTimestampSkew` has been marked deprecated for quite some time now,
but from the issue links, I cannot really infer whether a suitable replacement
has been implemented yet.
At my company, we are finding ourselves still using `withAllowedTimestampSkew`,
because we are batching and our data and one message observed at timestamp x
may contain hundreds of messages at timestamp x - 10.
> Primitive to shift the watermark while assigning timestamps
> -----------------------------------------------------------
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
> Issue Type: New Feature
> Components: beam-model
> Reporter: Kenneth Knowles
> Priority: Major
>
> There is a general need, especially important in the presence of
> SplittableDoFn, to be able to assign new timestamps to elements without
> making them late or droppable.
> - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one
> to produce late data, but does not allow one to shift the watermark so the
> new data is on-time.
> - For a SplittableDoFn, one may receive an element such as the name of a log
> file that contains elements for the day preceding the log file. The timestamp
> on the filename must currently be the beginning of the log. If such elements
> are constantly flowing, it may be OK, but since we don't know that element is
> coming, in that absence of data, the watermark may advance. We need a way to
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the
> following pieces:
> - A constant duration (positive or negative) D by which to shift the
> watermark.
> - A function from TimestampedElement<T> to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were
> mentioned.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)