[
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937240#comment-16937240
]
Luke Cwik edited comment on BEAM-644 at 9/24/19 10:54 PM:
----------------------------------------------------------
SplittableDoFn will allow one to control the watermark but you will always be
limited by how far the watermark has been advanced of upstream transforms. This
will allow you to have scenarios like Read(PubsubA) --> Read(PubsubB) where
PubsubA outputs topic names and PubsubB reads messages from the topics produced
by PubsubA. The watermark of PubsubA will be the upperbound for how far
PubsubB's watermark can advance. This is still somewhat up for discussion as
there could be scenarios where you want the downstream transform to be able to
advance the watermark further then the upstream transforms watermark but this
leads to usability questions around what should be dropped and where (is this
now considered late data?).
was (Author: lcwik):
SplittableDoFn will allow one to control the watermark but you will always be
limited by how far the watermark has been advanced of upstream transforms. This
will allow you to have scenarios like Read(PubsubA) --> Read(PubsubB) where
PubsubA outputs topic names and PubsubB reads messages from the topics produced
by PubsubA. The watermark of PubsubA will be the upperbound for how far
PubsubB's watermark can advance. This is still somewhat up for discussion as
there could be scenarios where you want the downstream transform to be able to
advance the watermark further then the upstream transforms watermark but this
leads to usability questions around what should be dropped and where.
> Primitive to shift the watermark while assigning timestamps
> -----------------------------------------------------------
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
> Issue Type: New Feature
> Components: beam-model
> Reporter: Kenneth Knowles
> Priority: Major
>
> There is a general need, especially important in the presence of
> SplittableDoFn, to be able to assign new timestamps to elements without
> making them late or droppable.
> - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one
> to produce late data, but does not allow one to shift the watermark so the
> new data is on-time.
> - For a SplittableDoFn, one may receive an element such as the name of a log
> file that contains elements for the day preceding the log file. The timestamp
> on the filename must currently be the beginning of the log. If such elements
> are constantly flowing, it may be OK, but since we don't know that element is
> coming, in that absence of data, the watermark may advance. We need a way to
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the
> following pieces:
> - A constant duration (positive or negative) D by which to shift the
> watermark.
> - A function from TimestampedElement<T> to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were
> mentioned.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)