Actually on a second thought, the join accumulation interface for the non-keyed join operator can support theta joins if you have the proper accumulation implementation for that.
David On Mon, Sep 26, 2016 at 12:42 PM, David Yan <da...@datatorrent.com> wrote: > Chinmay, > > Just to clarify, the Join Operator does not support theta joins. It only > supports equi-joins on either the Window, or both the Window and the Key. > > David > > On Sat, Sep 17, 2016 at 1:30 AM, Chinmay Kolhatkar < > chin...@datatorrent.com> wrote: > >> Thanks for the information guys. >> >> David, I can take a look at heuristic watermark if I get any free cycles. >> >> Shunxin, does the Join operator that you're implementing support theta >> join >> or is it subset of the theta join? >> >> Thanks, >> Chinmay. >> >> >> >> On Sat, Sep 17, 2016 at 1:21 AM, David Yan <da...@datatorrent.com> wrote: >> >> > Hi Shunxin, >> > >> > If the watermark code in your PR is not behaving the way it should, >> please >> > do change it. Thanks! >> > >> > David >> > >> > On Fri, Sep 16, 2016 at 11:36 AM, Shunxin Lu <lushun...@gmail.com> >> wrote: >> > >> > > Hi David, >> > > >> > > Thanks for the clarification. Should we update the watermark for join >> > > operator when there's a watermark arrived from one of the input >> streams >> > > even if the watermark from another input stream is not arrived yet? >> > > >> > > Shunxin >> > > >> > > On Fri, Sep 16, 2016 at 10:59 AM, David Yan <da...@datatorrent.com> >> > wrote: >> > > >> > > > Actually, that's not entirely true. Here are the points about the >> > > watermark >> > > > tuple generation of the join operator: >> > > > >> > > > 1) We keep the timestamp of the latest watermark for each input port >> > > > >> > > > 2) We keep another timestamp that is equal to minimum of all the >> > > timestamps >> > > > mentioned in (1). >> > > > >> > > > 3) Upon arrival of a watermark from an input port, we update the >> > > timestamp >> > > > mentioned in (1), and evaluate (2). If the value of (2) changes, we >> > > > generate the watermark tuple with the timestamp that is equal to the >> > new >> > > > value of (2). >> > > > >> > > > 4) That means initially, the watermark is only generated when we >> have >> > > seen >> > > > a watermark for all input ports. And the fact that we take the >> smallest >> > > > timestamp in (2) means we only consider a window as late only if all >> > > input >> > > > streams say that particular window is late. >> > > > >> > > > David >> > > > >> > > > >> > > > On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu <lushun...@gmail.com> >> > > wrote: >> > > > >> > > > > Hi Chinmay, >> > > > > >> > > > > Base on the discussion I had with David, and David please correct >> me >> > > if I >> > > > > am wrong, the watermark for Windowed Join Operator should be >> indeed >> > > > > depending on all the input streams. If a tuple is considered late >> for >> > > one >> > > > > input stream, it should also be considered late for the whole join >> > > > > operator. That's why in the AbstractWindowedJoinOperator, it >> always >> > > > selects >> > > > > the watermark with the smallest timestamp from all the latest >> > > watermarks >> > > > > coming from upstreams as its current watermark, so that it can >> make >> > > sure >> > > > > that it's always keeping the strictest watermark to eliminate late >> > > > tuples. >> > > > > >> > > > > Shunxin >> > > > > >> > > > > On Fri, Sep 16, 2016 at 10:02 AM, David Yan < >> da...@datatorrent.com> >> > > > wrote: >> > > > > >> > > > > > I think in theory, the watermark should be sent by the input >> > operator >> > > > > since >> > > > > > the input should have the knowledge of the criteria of lateness >> > since >> > > > it >> > > > > > can depend on many factors like the time of the day, the source >> of >> > > the >> > > > > data >> > > > > > (e.g. mobile data), that the WindowedOperator should in general >> > make >> > > no >> > > > > > assumption about. >> > > > > > >> > > > > > However, I think it's possible to implement some kind of >> watermark >> > > > > > generation in the WindowedOperator itself if that knowledge is >> not >> > > > > > available from the input. It's actually already doing that if >> you >> > > call >> > > > > > the setFixedWatermark >> > > > > > method, which will generate a watermark tuple, with a timestamp >> > that >> > > is >> > > > > > based on the derived time from the streaming window id, >> downstream >> > > for >> > > > > each >> > > > > > streaming window. It's possible to add the support of heuristic >> > > > watermark >> > > > > > generation as well and you're welcome to take that up. >> > > > > > >> > > > > > For the Windowed Join operator, the watermark generated for >> > > downstream >> > > > > > depends on the watermark arriving from each input stream, and >> it's >> > > not >> > > > > just >> > > > > > a simple propagate. Shunxin can comment more on this. >> > > > > > >> > > > > > David >> > > > > > >> > > > > > >> > > > > > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar < >> > > > chin...@apache.org> >> > > > > > wrote: >> > > > > > >> > > > > > > Hi All, >> > > > > > > >> > > > > > > I was looking at Windowed Operator APIs and have to mention >> > they're >> > > > > > pretty >> > > > > > > nicely done. >> > > > > > > >> > > > > > > I have a question related to watermark generation. >> > > > > > > >> > > > > > > What I understood is that for completion of processing of an >> > event >> > > > > window >> > > > > > > one has provision for sending of watermark tuple from some >> > previous >> > > > > stage >> > > > > > > in the DAG. I want to know who should be doing that and when >> > should >> > > > be >> > > > > it >> > > > > > > done. >> > > > > > > >> > > > > > > For e.g. I saw a PR of Windows Join Operator in apex-malhar >> and I >> > > > would >> > > > > > > like to use it in my application. Can someone give me an >> example >> > of >> > > > > how a >> > > > > > > DAG will look like with this operator which has a stage which >> > > > generates >> > > > > > > watermark? And how should that stage decide on when to >> generate a >> > > > > > watermark >> > > > > > > tuple? >> > > > > > > >> > > > > > > -Chinmay. >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >