Thomas, thanks for that. I had suspected that that may be the case, but wanted 
some confirmation.

It seems to me that theoretically, it makes sense to think of “partitions” in 
the watermark space, with sections of the pipeline conforming to the “output WM 
<= input WM”, with the breaks between them happening precisely when timestamps 
themselves are being materialised or adjusted.

This makes sense because at those boundaries, the object that is being 
timestamped changes—in the log files example, in the first section of the 
pipeline, the file itself is being timestamped, but once that file is 
transformed into a list of input files, the timestamps (and hence watermark) of 
those input files themselves are in a different domain, hence at that interface 
it makes sense to break the watermark/timestamp invariants.

Thanks,
-- 
Matt

On 6 April 2017 at 19:50:40, Thomas Groh ([email protected]) wrote:

Hey Matt;  

Generally this is an unsolved problem. We track a related issue in  
https://issues.apache.org/jira/browse/BEAM-644, which permits the watermark  
to be shifted backwards in time. That would let a source that does not  
support timestamps to emit elements timestamped with "when I read the  
element" and emit a watermark that is roughly real-time, and have the  
downstream timestamp-assignment hold the watermark as far back as they  
think is reasonable.  

However, I don't believe it's required that the output watermark is always  
no later than the input watermark. The input Watermark to a transform  
represents a promise from the system that "I will never provide this  
transform with an input that is timestamped before the watermark"; the  
output watermark of a transform represents a promise from the transform to  
the system that "I will never produce output that is timestamped before  
this timestamp" (modulo late data on both ends, of course). The restriction  
that watermarks advance monotonically applies on both ends, but a  
PTransform that knows more about the timestamps than an upstream source  
could appropriately advance a watermark past that of its input.  

On Wed, Apr 5, 2017 at 11:00 AM, Matthew Jadczak <[email protected]> wrote:  

> Hi,  
>  
> This is a question which goes back to the theoretical model. Normally, as  
> defined in the Beam lateness semantics [1], the source is in charge of  
> emitting appropriate timestamps and setting its own watermark. This is in  
> general configurable by users by providing their own timestamp and  
> watermark transformation functions, as implemented in e.g. KafkaIO and  
> PubsubIO. Since the Read transform from that source is a root transform, we  
> do not have an input watermark to worry about in that case, and the output  
> watermark is under the control of the Source.  
>  
> However, what is the actual and desired behaviour when we wish to  
> materialise timestamp / watermark information in the middle of a pipeline?  
> For example, we have a source which does not support timestamps and sets  
> them all (along with the watermark) to `Long.MIN_VALUE`, or perhaps we need  
> to do some complex processing to determine the correct timestamp. Further,  
> the elements could be out of order with respect to the generated timestamps.  
>  
> I tried analysing the code to work out what the behaviour in the  
> DirectRunner is when using WithTimestamps to assign the timestamps in this  
> case, but I wasn’t able to figure it out. Is the watermark advanced to the  
> latest one of the timestamps assigned so far? If so, if data is out of  
> order is the out-of-order data simply emitted late? (and does the allowed  
> skew need to be in place there to allow that?)  
>  
> The input watermark in this case is always the minimum time. Advancing the  
> watermark in any way past that would break the assumption that output WM <=  
> input WM (as written in [1]). However not advancing the watermark would  
> mean we never fire triggers which depend on it, etc.  
>  
> Am I misunderstanding something? Do the invariants in [1] only hold if we  
> are not actively messing with timestamps/watermarks by using WithTimestamps  
> or similar?  
>  
> I had a look at [2], but this merely seems to be about adjusting the  
> watermark after a “real” watermark is actually established (for example  
> reading log files, etc.)  
>  
> Any clarification would be appreciated.  
>  
> Thanks,  
> Matt  
>  
> [1] https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_  
> IJeVZn1peOrBrhhP6Y/edit#  
> [2] https://issues.apache.org/jira/browse/BEAM-644  

Reply via email to