On Thu, Jan 26, 2017 at 9:48 AM, Thomas Groh <[email protected]>
wrote:
>
> The default watermark policy for a bounded source should be negative
> infinity until all of the data is read, then positive infinity.


Just to elaborate - there isn't a way for a bounded source to communicate a
watermark. Runners each do this internally.

Currently, modifying the timestamp of an element from within a DoFn does
> not modify the watermark; modifying a timestamp forwards in time is
> generally "safe", as it can't cause data to move to behind the watermark -
> this is why moving elements backwards in time requires setting
> "withAllowedTimestampSkew" (which also doesn't modify the watermark, which
> means that elements that are moved backwards in time can become late and be
> dropped by a runner). I don't think we currently have any changes in-flight
> to make this configurable.
>

There has been one proposal to provide adequate watermark-timestamp
interaction via a new model-level AdjustTimestamps primitive, but it needs
more discussion and implementation has not begun. The ticket also goes into
some of the same issue Thomas has described:
https://issues.apache.org/jira/browse/BEAM-644 is related.

(I am the author of the ticket, but the proposal there was developed
collaboratively so it has at least slightly more buy-in than just me :-)

Kenn


> On Wed, Jan 25, 2017 at 9:24 PM, Shen Li <[email protected]> wrote:
>
> > Hi,
> >
> > When reading from a source with no timestamp specified on elements, what
> > should be the default timestamp? I presume that it should be 0 as I saw
> > PAssertTest trying to set timestamps to very small values with 0 allowed
> > timestamp skew. Is that right?
> >
> > What about the default watermark policy?
> >
> > If a ParDo modifies the timestamp using
> > DoFnProcessContext.outputWithTimestamp, how should that affect the
> output
> > watermark? Say the ParDo adds 100 seconds to the timestamp of each
> element
> > in processElement, how could the runner know it should also add 100
> seconds
> > to output timestamps?
> >
> > Thanks,
> >
> > Shen
> >
>

Reply via email to