[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931986#comment-16931986
 ] 

Chris Heath commented on BEAM-644:
----------------------------------

I recently asked a somewhat related question on StackOverflow 
[https://stackoverflow.com/questions/57960673/how-to-switch-time-domains-in-a-beam-pipeline],
 and after further thought, I believe what is really needed is for the 
watermark not to be global.  At well-defined points in the pipeline (namely 
DoFns that call outputWithTimestamp), we need to be able to switch to a 
completely separate watermark that applies from that point on.  Each watermark 
in the pipeline progresses independently, at different speeds with separate 
heuristics.

Our particular application ideally requires 3 time domains with 3 watermarks:  
First read data from Google pubsub (timestamps = published date), then parse 
the JSON payload. From there the pipeline branches: one branch switches to 
event time (timestamps obtained from JSON payload) and applies event 
correlations, with late data dropped; the second branch processes parsed events 
in batches (timestamps = processing time) with no late data ever.

> Primitive to shift the watermark while assigning timestamps
> -----------------------------------------------------------
>
>                 Key: BEAM-644
>                 URL: https://issues.apache.org/jira/browse/BEAM-644
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Kenneth Knowles
>            Priority: Major
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement<T> to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to