[
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16933024#comment-16933024
]
Kenneth Knowles commented on BEAM-644:
--------------------------------------
I want to clarify that the watermark is _not_ global. You might be thinking of
Spark streaming :-)
But I understand your point. The watermark is mostly not in your control. It is
always simply bounded by all the input watermarks. I agree with your desire and
I agree it is doable. The new feature of "splittable DoFn" which replaces
UnboundedSource will let a DoFn control the watermark somewhat directly. I'm
not sure if it will be as flexible as you desire. [~lcwik] and [~chamikara] and
[~laraschmidt] and [~boyuanz] are all more knowledgeable than I am.
As an example to demonstrate it is implementable and meaningful: you could
certainly have two pipelines where one is writing to Pubsub instead of
outputWithTimestamp. The other pipeline could read from Pubsub and use a
timestamp attribute. In this system you get an independent watermark calculated
heuristically at the junction between the two pipelines. (this is not a great
idea, just an example of an architecture that decouples the watermark)
But by the way, taking one branch and mapping all event timestamps to the
current processing time is safe. I illustrate it here:
https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134
(click once to animate). The part of your use case that requires something
special is pulling event timestamps from JSON instead of from Pubsub attributes
(the PubsubIO has no idea how to parse the payloads, as you already know). It
is a very common and valid use case.
> Primitive to shift the watermark while assigning timestamps
> -----------------------------------------------------------
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
> Issue Type: New Feature
> Components: beam-model
> Reporter: Kenneth Knowles
> Priority: Major
>
> There is a general need, especially important in the presence of
> SplittableDoFn, to be able to assign new timestamps to elements without
> making them late or droppable.
> - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one
> to produce late data, but does not allow one to shift the watermark so the
> new data is on-time.
> - For a SplittableDoFn, one may receive an element such as the name of a log
> file that contains elements for the day preceding the log file. The timestamp
> on the filename must currently be the beginning of the log. If such elements
> are constantly flowing, it may be OK, but since we don't know that element is
> coming, in that absence of data, the watermark may advance. We need a way to
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the
> following pieces:
> - A constant duration (positive or negative) D by which to shift the
> watermark.
> - A function from TimestampedElement<T> to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were
> mentioned.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)