Actually, that's not entirely true. Here are the points about the watermark
tuple generation of the join operator:

1) We keep the timestamp of the latest watermark for each input port

2) We keep another timestamp that is equal to minimum of all the timestamps
mentioned in (1).

3) Upon arrival of a watermark from an input port, we update the timestamp
mentioned in (1), and evaluate (2). If the value of (2) changes, we
generate the watermark tuple with the timestamp that is equal to the new
value of (2).

4) That means initially, the watermark is only generated when we have seen
a watermark for all input ports. And the fact that we take the smallest
timestamp in (2) means we only consider a window as late only if all input
streams say that particular window is late.

David


On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu <lushun...@gmail.com> wrote:

> Hi Chinmay,
>
> Base on the discussion I had with David, and David please correct me if I
> am wrong, the watermark for Windowed Join Operator should be indeed
> depending on all the input streams. If a tuple is considered late for one
> input stream, it should also be considered late for the whole join
> operator. That's why in the AbstractWindowedJoinOperator, it always selects
> the watermark with the smallest timestamp from all the latest watermarks
> coming from upstreams as its current watermark, so that it can make sure
> that it's always keeping the strictest watermark to eliminate late tuples.
>
> Shunxin
>
> On Fri, Sep 16, 2016 at 10:02 AM, David Yan <da...@datatorrent.com> wrote:
>
> > I think in theory, the watermark should be sent by the input operator
> since
> > the input should have the knowledge of the criteria of lateness since it
> > can depend on many factors like the time of the day, the source of the
> data
> > (e.g. mobile data), that the WindowedOperator should in general make no
> > assumption about.
> >
> > However, I think it's possible to implement some kind of watermark
> > generation in the WindowedOperator itself if that knowledge is not
> > available from the input. It's actually already doing that if you call
> > the setFixedWatermark
> > method, which will generate a watermark tuple, with a timestamp that is
> > based on the derived time from the streaming window id, downstream for
> each
> > streaming window. It's possible to add the support of heuristic watermark
> > generation as well and you're welcome to take that up.
> >
> > For the Windowed Join operator, the watermark generated for downstream
> > depends on the watermark arriving from each input stream, and it's not
> just
> > a simple propagate. Shunxin can comment more on this.
> >
> > David
> >
> >
> > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <chin...@apache.org>
> > wrote:
> >
> > > Hi All,
> > >
> > > I was looking at Windowed Operator APIs and have to mention they're
> > pretty
> > > nicely done.
> > >
> > > I have a question related to watermark generation.
> > >
> > > What I understood is that for completion of processing of an event
> window
> > > one has provision for sending of watermark tuple from some previous
> stage
> > > in the DAG. I want to know who should be doing that and when should be
> it
> > > done.
> > >
> > > For e.g. I saw a PR of Windows Join Operator in apex-malhar and I would
> > > like to use it in my application. Can someone give me an example of
> how a
> > > DAG will look like with this operator which has a stage which generates
> > > watermark? And how should that stage decide on when to generate a
> > watermark
> > > tuple?
> > >
> > > -Chinmay.
> > >
> >
>

Reply via email to