Base on the discussion I had with David, and David please correct me if I
am wrong, the watermark for Windowed Join Operator should be indeed
depending on all the input streams. If a tuple is considered late for one
input stream, it should also be considered late for the whole join
operator. That's why in the AbstractWindowedJoinOperator, it always selects
the watermark with the smallest timestamp from all the latest watermarks
coming from upstreams as its current watermark, so that it can make sure
that it's always keeping the strictest watermark to eliminate late tuples.
On Fri, Sep 16, 2016 at 10:02 AM, David Yan <da...@datatorrent.com> wrote:
> I think in theory, the watermark should be sent by the input operator since
> the input should have the knowledge of the criteria of lateness since it
> can depend on many factors like the time of the day, the source of the data
> (e.g. mobile data), that the WindowedOperator should in general make no
> assumption about.
> However, I think it's possible to implement some kind of watermark
> generation in the WindowedOperator itself if that knowledge is not
> available from the input. It's actually already doing that if you call
> the setFixedWatermark
> method, which will generate a watermark tuple, with a timestamp that is
> based on the derived time from the streaming window id, downstream for each
> streaming window. It's possible to add the support of heuristic watermark
> generation as well and you're welcome to take that up.
> For the Windowed Join operator, the watermark generated for downstream
> depends on the watermark arriving from each input stream, and it's not just
> a simple propagate. Shunxin can comment more on this.
> On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <chin...@apache.org>
> > Hi All,
> > I was looking at Windowed Operator APIs and have to mention they're
> > nicely done.
> > I have a question related to watermark generation.
> > What I understood is that for completion of processing of an event window
> > one has provision for sending of watermark tuple from some previous stage
> > in the DAG. I want to know who should be doing that and when should be it
> > done.
> > For e.g. I saw a PR of Windows Join Operator in apex-malhar and I would
> > like to use it in my application. Can someone give me an example of how a
> > DAG will look like with this operator which has a stage which generates
> > watermark? And how should that stage decide on when to generate a
> > tuple?
> > -Chinmay.