Hi Chinmay,

The Join Operator should be able to support any user defined join
operation. You just need to create a subclass implementing the
JoinAccumulation interface and define the behavior you want, then set the
accumulation after creating a Join operator instance . Please take a look
at the Combine and PojoInnerJoin in package
org.apache.apex.malhar.lib.window.impl, and see how to use the accumulation
in WindowedJoinOperatorTestApplication.

Thanks,
Shunxin

On Sat, Sep 17, 2016 at 1:30 AM, Chinmay Kolhatkar <chin...@datatorrent.com>
wrote:

> Thanks for the information guys.
>
> David, I can take a look at heuristic watermark if I get any free cycles.
>
> Shunxin, does the Join operator that you're implementing support theta join
> or is it subset of the theta join?
>
> Thanks,
> Chinmay.
>
>
>
> On Sat, Sep 17, 2016 at 1:21 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Hi Shunxin,
> >
> > If the watermark code in your PR is not behaving the way it should,
> please
> > do change it. Thanks!
> >
> > David
> >
> > On Fri, Sep 16, 2016 at 11:36 AM, Shunxin Lu <lushun...@gmail.com>
> wrote:
> >
> > > Hi David,
> > >
> > > Thanks for the clarification. Should we update the watermark for join
> > > operator when there's a watermark arrived from one of the input streams
> > > even if the watermark from another input stream is not arrived yet?
> > >
> > > Shunxin
> > >
> > > On Fri, Sep 16, 2016 at 10:59 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Actually, that's not entirely true. Here are the points about the
> > > watermark
> > > > tuple generation of the join operator:
> > > >
> > > > 1) We keep the timestamp of the latest watermark for each input port
> > > >
> > > > 2) We keep another timestamp that is equal to minimum of all the
> > > timestamps
> > > > mentioned in (1).
> > > >
> > > > 3) Upon arrival of a watermark from an input port, we update the
> > > timestamp
> > > > mentioned in (1), and evaluate (2). If the value of (2) changes, we
> > > > generate the watermark tuple with the timestamp that is equal to the
> > new
> > > > value of (2).
> > > >
> > > > 4) That means initially, the watermark is only generated when we have
> > > seen
> > > > a watermark for all input ports. And the fact that we take the
> smallest
> > > > timestamp in (2) means we only consider a window as late only if all
> > > input
> > > > streams say that particular window is late.
> > > >
> > > > David
> > > >
> > > >
> > > > On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu <lushun...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Chinmay,
> > > > >
> > > > > Base on the discussion I had with David, and David please correct
> me
> > > if I
> > > > > am wrong, the watermark for Windowed Join Operator should be indeed
> > > > > depending on all the input streams. If a tuple is considered late
> for
> > > one
> > > > > input stream, it should also be considered late for the whole join
> > > > > operator. That's why in the AbstractWindowedJoinOperator, it always
> > > > selects
> > > > > the watermark with the smallest timestamp from all the latest
> > > watermarks
> > > > > coming from upstreams as its current watermark, so that it can make
> > > sure
> > > > > that it's always keeping the strictest watermark to eliminate late
> > > > tuples.
> > > > >
> > > > > Shunxin
> > > > >
> > > > > On Fri, Sep 16, 2016 at 10:02 AM, David Yan <da...@datatorrent.com
> >
> > > > wrote:
> > > > >
> > > > > > I think in theory, the watermark should be sent by the input
> > operator
> > > > > since
> > > > > > the input should have the knowledge of the criteria of lateness
> > since
> > > > it
> > > > > > can depend on many factors like the time of the day, the source
> of
> > > the
> > > > > data
> > > > > > (e.g. mobile data), that the WindowedOperator should in general
> > make
> > > no
> > > > > > assumption about.
> > > > > >
> > > > > > However, I think it's possible to implement some kind of
> watermark
> > > > > > generation in the WindowedOperator itself if that knowledge is
> not
> > > > > > available from the input. It's actually already doing that if you
> > > call
> > > > > > the setFixedWatermark
> > > > > > method, which will generate a watermark tuple, with a timestamp
> > that
> > > is
> > > > > > based on the derived time from the streaming window id,
> downstream
> > > for
> > > > > each
> > > > > > streaming window. It's possible to add the support of heuristic
> > > > watermark
> > > > > > generation as well and you're welcome to take that up.
> > > > > >
> > > > > > For the Windowed Join operator, the watermark generated for
> > > downstream
> > > > > > depends on the watermark arriving from each input stream, and
> it's
> > > not
> > > > > just
> > > > > > a simple propagate. Shunxin can comment more on this.
> > > > > >
> > > > > > David
> > > > > >
> > > > > >
> > > > > > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <
> > > > chin...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I was looking at Windowed Operator APIs and have to mention
> > they're
> > > > > > pretty
> > > > > > > nicely done.
> > > > > > >
> > > > > > > I have a question related to watermark generation.
> > > > > > >
> > > > > > > What I understood is that for completion of processing of an
> > event
> > > > > window
> > > > > > > one has provision for sending of watermark tuple from some
> > previous
> > > > > stage
> > > > > > > in the DAG. I want to know who should be doing that and when
> > should
> > > > be
> > > > > it
> > > > > > > done.
> > > > > > >
> > > > > > > For e.g. I saw a PR of Windows Join Operator in apex-malhar
> and I
> > > > would
> > > > > > > like to use it in my application. Can someone give me an
> example
> > of
> > > > > how a
> > > > > > > DAG will look like with this operator which has a stage which
> > > > generates
> > > > > > > watermark? And how should that stage decide on when to
> generate a
> > > > > > watermark
> > > > > > > tuple?
> > > > > > >
> > > > > > > -Chinmay.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to