Hi Chinmay,

Hope I am not too late.

If I understand correctly, you need a watermark tuple which is consistent
for the two inputs that the join operator is receiving. One of the ways, as
David suggested is to allow the Windowed Operator itself to generate the
watermark tuple.

One such heuristic is to look at the latest time among the incoming tuples
and have a watermark generated which is a fixed distance from the ideal
watermark. In other words, once you have the time of the watermark =
(latest timestamp seen across all ports - fixed delay), you can assume a
watermark tuple with that timestamp. This can be done at the end of the
window.

~ Bhupesh

On Sun, Sep 18, 2016 at 6:13 AM, Shunxin Lu <lushun...@gmail.com> wrote:

> Hi Chinmay,
>
> The Join Operator should be able to support any user defined join
> operation. You just need to create a subclass implementing the
> JoinAccumulation interface and define the behavior you want, then set the
> accumulation after creating a Join operator instance . Please take a look
> at the Combine and PojoInnerJoin in package
> org.apache.apex.malhar.lib.window.impl, and see how to use the
> accumulation
> in WindowedJoinOperatorTestApplication.
>
> Thanks,
> Shunxin
>
> On Sat, Sep 17, 2016 at 1:30 AM, Chinmay Kolhatkar <
> chin...@datatorrent.com>
> wrote:
>
> > Thanks for the information guys.
> >
> > David, I can take a look at heuristic watermark if I get any free cycles.
> >
> > Shunxin, does the Join operator that you're implementing support theta
> join
> > or is it subset of the theta join?
> >
> > Thanks,
> > Chinmay.
> >
> >
> >
> > On Sat, Sep 17, 2016 at 1:21 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Hi Shunxin,
> > >
> > > If the watermark code in your PR is not behaving the way it should,
> > please
> > > do change it. Thanks!
> > >
> > > David
> > >
> > > On Fri, Sep 16, 2016 at 11:36 AM, Shunxin Lu <lushun...@gmail.com>
> > wrote:
> > >
> > > > Hi David,
> > > >
> > > > Thanks for the clarification. Should we update the watermark for join
> > > > operator when there's a watermark arrived from one of the input
> streams
> > > > even if the watermark from another input stream is not arrived yet?
> > > >
> > > > Shunxin
> > > >
> > > > On Fri, Sep 16, 2016 at 10:59 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > Actually, that's not entirely true. Here are the points about the
> > > > watermark
> > > > > tuple generation of the join operator:
> > > > >
> > > > > 1) We keep the timestamp of the latest watermark for each input
> port
> > > > >
> > > > > 2) We keep another timestamp that is equal to minimum of all the
> > > > timestamps
> > > > > mentioned in (1).
> > > > >
> > > > > 3) Upon arrival of a watermark from an input port, we update the
> > > > timestamp
> > > > > mentioned in (1), and evaluate (2). If the value of (2) changes, we
> > > > > generate the watermark tuple with the timestamp that is equal to
> the
> > > new
> > > > > value of (2).
> > > > >
> > > > > 4) That means initially, the watermark is only generated when we
> have
> > > > seen
> > > > > a watermark for all input ports. And the fact that we take the
> > smallest
> > > > > timestamp in (2) means we only consider a window as late only if
> all
> > > > input
> > > > > streams say that particular window is late.
> > > > >
> > > > > David
> > > > >
> > > > >
> > > > > On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu <lushun...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Chinmay,
> > > > > >
> > > > > > Base on the discussion I had with David, and David please correct
> > me
> > > > if I
> > > > > > am wrong, the watermark for Windowed Join Operator should be
> indeed
> > > > > > depending on all the input streams. If a tuple is considered late
> > for
> > > > one
> > > > > > input stream, it should also be considered late for the whole
> join
> > > > > > operator. That's why in the AbstractWindowedJoinOperator, it
> always
> > > > > selects
> > > > > > the watermark with the smallest timestamp from all the latest
> > > > watermarks
> > > > > > coming from upstreams as its current watermark, so that it can
> make
> > > > sure
> > > > > > that it's always keeping the strictest watermark to eliminate
> late
> > > > > tuples.
> > > > > >
> > > > > > Shunxin
> > > > > >
> > > > > > On Fri, Sep 16, 2016 at 10:02 AM, David Yan <
> da...@datatorrent.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > I think in theory, the watermark should be sent by the input
> > > operator
> > > > > > since
> > > > > > > the input should have the knowledge of the criteria of lateness
> > > since
> > > > > it
> > > > > > > can depend on many factors like the time of the day, the source
> > of
> > > > the
> > > > > > data
> > > > > > > (e.g. mobile data), that the WindowedOperator should in general
> > > make
> > > > no
> > > > > > > assumption about.
> > > > > > >
> > > > > > > However, I think it's possible to implement some kind of
> > watermark
> > > > > > > generation in the WindowedOperator itself if that knowledge is
> > not
> > > > > > > available from the input. It's actually already doing that if
> you
> > > > call
> > > > > > > the setFixedWatermark
> > > > > > > method, which will generate a watermark tuple, with a timestamp
> > > that
> > > > is
> > > > > > > based on the derived time from the streaming window id,
> > downstream
> > > > for
> > > > > > each
> > > > > > > streaming window. It's possible to add the support of heuristic
> > > > > watermark
> > > > > > > generation as well and you're welcome to take that up.
> > > > > > >
> > > > > > > For the Windowed Join operator, the watermark generated for
> > > > downstream
> > > > > > > depends on the watermark arriving from each input stream, and
> > it's
> > > > not
> > > > > > just
> > > > > > > a simple propagate. Shunxin can comment more on this.
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <
> > > > > chin...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > I was looking at Windowed Operator APIs and have to mention
> > > they're
> > > > > > > pretty
> > > > > > > > nicely done.
> > > > > > > >
> > > > > > > > I have a question related to watermark generation.
> > > > > > > >
> > > > > > > > What I understood is that for completion of processing of an
> > > event
> > > > > > window
> > > > > > > > one has provision for sending of watermark tuple from some
> > > previous
> > > > > > stage
> > > > > > > > in the DAG. I want to know who should be doing that and when
> > > should
> > > > > be
> > > > > > it
> > > > > > > > done.
> > > > > > > >
> > > > > > > > For e.g. I saw a PR of Windows Join Operator in apex-malhar
> > and I
> > > > > would
> > > > > > > > like to use it in my application. Can someone give me an
> > example
> > > of
> > > > > > how a
> > > > > > > > DAG will look like with this operator which has a stage which
> > > > > generates
> > > > > > > > watermark? And how should that stage decide on when to
> > generate a
> > > > > > > watermark
> > > > > > > > tuple?
> > > > > > > >
> > > > > > > > -Chinmay.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to