On Thu, Jan 26, 2017 at 9:48 AM, Thomas Groh <[email protected]> wrote: > > The default watermark policy for a bounded source should be negative > infinity until all of the data is read, then positive infinity.
Just to elaborate - there isn't a way for a bounded source to communicate a watermark. Runners each do this internally. Currently, modifying the timestamp of an element from within a DoFn does > not modify the watermark; modifying a timestamp forwards in time is > generally "safe", as it can't cause data to move to behind the watermark - > this is why moving elements backwards in time requires setting > "withAllowedTimestampSkew" (which also doesn't modify the watermark, which > means that elements that are moved backwards in time can become late and be > dropped by a runner). I don't think we currently have any changes in-flight > to make this configurable. > There has been one proposal to provide adequate watermark-timestamp interaction via a new model-level AdjustTimestamps primitive, but it needs more discussion and implementation has not begun. The ticket also goes into some of the same issue Thomas has described: https://issues.apache.org/jira/browse/BEAM-644 is related. (I am the author of the ticket, but the proposal there was developed collaboratively so it has at least slightly more buy-in than just me :-) Kenn > On Wed, Jan 25, 2017 at 9:24 PM, Shen Li <[email protected]> wrote: > > > Hi, > > > > When reading from a source with no timestamp specified on elements, what > > should be the default timestamp? I presume that it should be 0 as I saw > > PAssertTest trying to set timestamps to very small values with 0 allowed > > timestamp skew. Is that right? > > > > What about the default watermark policy? > > > > If a ParDo modifies the timestamp using > > DoFnProcessContext.outputWithTimestamp, how should that affect the > output > > watermark? Say the ParDo adds 100 seconds to the timestamp of each > element > > in processElement, how could the runner know it should also add 100 > seconds > > to output timestamps? > > > > Thanks, > > > > Shen > > >
