Hi Shunxin,

If the watermark code in your PR is not behaving the way it should, please
do change it. Thanks!

David

On Fri, Sep 16, 2016 at 11:36 AM, Shunxin Lu <lushun...@gmail.com> wrote:

> Hi David,
>
> Thanks for the clarification. Should we update the watermark for join
> operator when there's a watermark arrived from one of the input streams
> even if the watermark from another input stream is not arrived yet?
>
> Shunxin
>
> On Fri, Sep 16, 2016 at 10:59 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Actually, that's not entirely true. Here are the points about the
> watermark
> > tuple generation of the join operator:
> >
> > 1) We keep the timestamp of the latest watermark for each input port
> >
> > 2) We keep another timestamp that is equal to minimum of all the
> timestamps
> > mentioned in (1).
> >
> > 3) Upon arrival of a watermark from an input port, we update the
> timestamp
> > mentioned in (1), and evaluate (2). If the value of (2) changes, we
> > generate the watermark tuple with the timestamp that is equal to the new
> > value of (2).
> >
> > 4) That means initially, the watermark is only generated when we have
> seen
> > a watermark for all input ports. And the fact that we take the smallest
> > timestamp in (2) means we only consider a window as late only if all
> input
> > streams say that particular window is late.
> >
> > David
> >
> >
> > On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu <lushun...@gmail.com>
> wrote:
> >
> > > Hi Chinmay,
> > >
> > > Base on the discussion I had with David, and David please correct me
> if I
> > > am wrong, the watermark for Windowed Join Operator should be indeed
> > > depending on all the input streams. If a tuple is considered late for
> one
> > > input stream, it should also be considered late for the whole join
> > > operator. That's why in the AbstractWindowedJoinOperator, it always
> > selects
> > > the watermark with the smallest timestamp from all the latest
> watermarks
> > > coming from upstreams as its current watermark, so that it can make
> sure
> > > that it's always keeping the strictest watermark to eliminate late
> > tuples.
> > >
> > > Shunxin
> > >
> > > On Fri, Sep 16, 2016 at 10:02 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > I think in theory, the watermark should be sent by the input operator
> > > since
> > > > the input should have the knowledge of the criteria of lateness since
> > it
> > > > can depend on many factors like the time of the day, the source of
> the
> > > data
> > > > (e.g. mobile data), that the WindowedOperator should in general make
> no
> > > > assumption about.
> > > >
> > > > However, I think it's possible to implement some kind of watermark
> > > > generation in the WindowedOperator itself if that knowledge is not
> > > > available from the input. It's actually already doing that if you
> call
> > > > the setFixedWatermark
> > > > method, which will generate a watermark tuple, with a timestamp that
> is
> > > > based on the derived time from the streaming window id, downstream
> for
> > > each
> > > > streaming window. It's possible to add the support of heuristic
> > watermark
> > > > generation as well and you're welcome to take that up.
> > > >
> > > > For the Windowed Join operator, the watermark generated for
> downstream
> > > > depends on the watermark arriving from each input stream, and it's
> not
> > > just
> > > > a simple propagate. Shunxin can comment more on this.
> > > >
> > > > David
> > > >
> > > >
> > > > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <
> > chin...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I was looking at Windowed Operator APIs and have to mention they're
> > > > pretty
> > > > > nicely done.
> > > > >
> > > > > I have a question related to watermark generation.
> > > > >
> > > > > What I understood is that for completion of processing of an event
> > > window
> > > > > one has provision for sending of watermark tuple from some previous
> > > stage
> > > > > in the DAG. I want to know who should be doing that and when should
> > be
> > > it
> > > > > done.
> > > > >
> > > > > For e.g. I saw a PR of Windows Join Operator in apex-malhar and I
> > would
> > > > > like to use it in my application. Can someone give me an example of
> > > how a
> > > > > DAG will look like with this operator which has a stage which
> > generates
> > > > > watermark? And how should that stage decide on when to generate a
> > > > watermark
> > > > > tuple?
> > > > >
> > > > > -Chinmay.
> > > > >
> > > >
> > >
> >
>

Reply via email to