+1 for the plan.

I would be interested in contributing to this feature.

~ Bhupesh

On Nov 29, 2016 03:26, "Sandesh Hegde" <sand...@datatorrent.com> wrote:

> I am interested in contributing to this feature.
>
> On Mon, Nov 28, 2016 at 1:54 PM David Yan <da...@datatorrent.com> wrote:
>
> > I think we should probably go ahead with option 1 since this works with
> > most use cases and prevents developers from shooting themselves in the
> foot
> > in terms of idempotency.
> >
> > We can have a configuration property that enables option 2 later if we
> have
> > concrete use cases that call for it.
> >
> > Please share your thoughts if you think you don't agree with this plan.
> > Also, please indicate if you're interested in contributing to this
> feature.
> >
> > David
> >
> > On Sun, Nov 27, 2016 at 9:02 PM, Bhupesh Chawda <bhup...@datatorrent.com
> >
> > wrote:
> >
> > > It appears that option 1 is more favored due to unavailability of a use
> > > case which could use option 2.
> > >
> > > However, option 2 is problematic in specific cases, like presence of
> > > multiple input ports for example. In case of a linear DAG where control
> > > tuples are flowing in order with the data tuples, it should not be
> > > difficult to guarantee idempotency. For example, cases where there
> could
> > be
> > > multiple changes in behavior of an operator during a single window, it
> > > should not wait until end window for these changes to take effect.
> Since,
> > > we don't have a concrete use case right now, perhaps we do not want to
> go
> > > that road. This feature should be available through a platform
> attribute
> > > (may be at a later point in time) where the default is option 1.
> > >
> > > I think option 1 is suitable for a starting point in the implementation
> > of
> > > this feature and we should proceed with it.
> > >
> > > ~ Bhupesh
> > >
> > >
> > >
> > > On Fri, Nov 11, 2016 at 12:59 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Good question Tushar. The callback should be called only once.
> > > > The way to implement this is to keep a list of control tuple hashes
> for
> > > the
> > > > given streaming window and only do the callback when the operator has
> > not
> > > > seen it before.
> > > >
> > > > Other thoughts?
> > > >
> > > > David
> > > >
> > > > On Thu, Nov 10, 2016 at 9:32 AM, Tushar Gosavi <
> tus...@datatorrent.com
> > >
> > > > wrote:
> > > >
> > > > > Hi David,
> > > > >
> > > > > What would be the behaviour in case where we have a DAG with
> > following
> > > > > operators, the number in bracket is number of partitions, X is NxM
> > > > > partitioning.
> > > > > A(1) X B(4) X C(2)
> > > > >
> > > > > If A sends a control tuple, it will be sent to all 4 partition of
> B,
> > > > > and from each partition from B it goes to C, i.e each partition of
> C
> > > > > will receive same control tuple originated from A multiple times
> > > > > (number of upstream partitions of C). In this case will the
> callback
> > > > > function get called multiple times or just once.
> > > > >
> > > > > -Tushar.
> > > > >
> > > > >
> > > > > On Fri, Nov 4, 2016 at 12:14 AM, David Yan <da...@datatorrent.com>
> > > > wrote:
> > > > > > Hi Bhupesh,
> > > > > >
> > > > > > Since each input port has its own incoming control tuple, I would
> > > > imagine
> > > > > > there would be an additional DefaultInputPort.processControl
> method
> > > > that
> > > > > > operator developers can override.
> > > > > > If we go for option 1, my thinking is that the control tuples
> would
> > > > > always
> > > > > > be delivered at the next window boundary, even if the emit method
> > is
> > > > > called
> > > > > > within a window.
> > > > > >
> > > > > > David
> > > > > >
> > > > > > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <
> > > > bhup...@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > >> I have a question regarding the callback for a control tuple.
> Will
> > > it
> > > > be
> > > > > >> similar to InputPort::process() method? Something like
> > > > > >> InputPort::processControlTuple(t)
> > > > > >> ? Or will it be a method of the operator similar to
> beginWindow()?
> > > > > >>
> > > > > >> When we say that the control tuple will be delivered at window
> > > > boundary,
> > > > > >> does that mean all control tuples emitted in that window will be
> > > > > processed
> > > > > >> together at the end of the window? This would imply that there
> is
> > no
> > > > > >> ordering among regular tuples and control tuples.
> > > > > >>
> > > > > >> I think we should get started with the option 1 - control tuples
> > at
> > > > > window
> > > > > >> boundary, which seems to handle most of the use cases. For some
> > > cases
> > > > > which
> > > > > >> require option 2, we can always build on this.
> > > > > >>
> > > > > >> ~ Bhupesh
> > > > > >>
> > > > > >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <t...@apache.org>
> > > wrote:
> > > > > >>
> > > > > >> > I don't see how that would work. Suppose you have a file
> > splitter
> > > > and
> > > > > >> > multiple partitions of block readers. The "end of file" event
> > > cannot
> > > > > be
> > > > > >> > processed downstream until all block readers are done. I also
> > > think
> > > > > that
> > > > > >> > this is related to the batch demarcation discussion and there
> > > should
> > > > > be a
> > > > > >> > single generalized mechanism to support this.
> > > > > >> >
> > > > > >> >
> > > > > >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <
> > > > > pra...@datatorrent.com
> > > > > >> >
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> > > Suppose I am processing data in a file and I want to do
> > > something
> > > > at
> > > > > >> the
> > > > > >> > > end of a file at the output operator, I would send an end
> file
> > > > > control
> > > > > >> > > tuple and act on it when I receive it at the output. In a
> > single
> > > > > >> window I
> > > > > >> > > may end up processing multiple files and if I don't have
> > > multiple
> > > > > ports
> > > > > >> > and
> > > > > >> > > logical paths through the DAG (multiple partitions are ok).
> I
> > > can
> > > > > >> process
> > > > > >> > > end of each file immediately and also know what file was
> > closed
> > > > > without
> > > > > >> > > sending extra identification information in the end file
> > which I
> > > > > would
> > > > > >> > need
> > > > > >> > > if I am collecting all of them and processing at the end of
> > the
> > > > > window.
> > > > > >> > >
> > > > > >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <
> t...@apache.org>
> > > > > wrote:
> > > > > >> > >
> > > > > >> > > > The use cases listed in the original discussion don't call
> > for
> > > > > option
> > > > > >> > 2.
> > > > > >> > > It
> > > > > >> > > > seems to come with additional complexity and
> implementation
> > > > cost.
> > > > > >> > > >
> > > > > >> > > > Can those in favor of option 2 please also provide the use
> > > case
> > > > > for
> > > > > >> it.
> > > > > >> > > >
> > > > > >> > > > Thanks,
> > > > > >> > > > Thomas
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <
> > > > > siy...@datatorrent.com>
> > > > > >> > > > wrote:
> > > > > >> > > >
> > > > > >> > > > > I will vote for approach 1.
> > > > > >> > > > >
> > > > > >> > > > > First of all that one sounds easier to do to me. And I
> > think
> > > > > >> > > idempotency
> > > > > >> > > > is
> > > > > >> > > > > important. It may run at the cost of higher latency but
> I
> > > > think
> > > > > it
> > > > > >> is
> > > > > >> > > ok
> > > > > >> > > > >
> > > > > >> > > > > And in addition, when in the future if users do need
> > > realtime
> > > > > >> control
> > > > > >> > > > tuple
> > > > > >> > > > > processing, we can always add the option on top of it.
> > > > > >> > > > >
> > > > > >> > > > > So I vote for 1
> > > > > >> > > > >
> > > > > >> > > > > Thanks,
> > > > > >> > > > > Siyuan
> > > > > >> > > > >
> > > > > >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <
> > > > > p...@apache.org>
> > > > > >> > > > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > As a rule of thumb in any real time operating system,
> > > > control
> > > > > >> > tuples
> > > > > >> > > > > should
> > > > > >> > > > > > always be handled using Priority Queues.
> > > > > >> > > > > >
> > > > > >> > > > > > We may try to control priorities by defining levels.
> And
> > > > shall
> > > > > >> not
> > > > > >> > > > > > be delivered at window boundaries.
> > > > > >> > > > > >
> > > > > >> > > > > > In short, control tuples shall never be treated as any
> > > other
> > > > > >> tuples
> > > > > >> > > in
> > > > > >> > > > > real
> > > > > >> > > > > > time systems.
> > > > > >> > > > > >
> > > > > >> > > > > > On Thursday, November 3, 2016, David Yan <
> > > > > da...@datatorrent.com>
> > > > > >> > > > wrote:
> > > > > >> > > > > >
> > > > > >> > > > > > > Hi all,
> > > > > >> > > > > > >
> > > > > >> > > > > > > I would like to renew the discussion of control
> > tuples.
> > > > > >> > > > > > >
> > > > > >> > > > > > > Last time, we were in a debate about whether:
> > > > > >> > > > > > >
> > > > > >> > > > > > > 1) the platform should enforce that control tuples
> are
> > > > > >> delivered
> > > > > >> > at
> > > > > >> > > > > > window
> > > > > >> > > > > > > boundaries only
> > > > > >> > > > > > >
> > > > > >> > > > > > > or:
> > > > > >> > > > > > >
> > > > > >> > > > > > > 2) the platform should deliver control tuples just
> as
> > > > other
> > > > > >> > tuples
> > > > > >> > > > and
> > > > > >> > > > > > it's
> > > > > >> > > > > > > the operator developers' choice whether to handle
> the
> > > > > control
> > > > > >> > > tuples
> > > > > >> > > > as
> > > > > >> > > > > > > they arrive or delay the processing till the next
> > window
> > > > > >> > boundary.
> > > > > >> > > > > > >
> > > > > >> > > > > > > To summarize the pros and cons:
> > > > > >> > > > > > >
> > > > > >> > > > > > > Approach 1: If processing control tuples results in
> > > > changes
> > > > > of
> > > > > >> > the
> > > > > >> > > > > > behavior
> > > > > >> > > > > > > of the operator, if idempotency needs to be
> preserved,
> > > the
> > > > > >> > > processing
> > > > > >> > > > > > must
> > > > > >> > > > > > > be done at window boundaries. This approach will
> save
> > > the
> > > > > >> > operator
> > > > > >> > > > > > > developers headache to ensure that. However, this
> will
> > > > take
> > > > > >> away
> > > > > >> > > the
> > > > > >> > > > > > > choices from the operator developer if they just
> need
> > to
> > > > > >> process
> > > > > >> > > the
> > > > > >> > > > > > > control tuples as soon as possible.
> > > > > >> > > > > > >
> > > > > >> > > > > > > Approach 2: The operator has a chance to immediately
> > > > process
> > > > > >> > > control
> > > > > >> > > > > > > tuples. This would be useful if latency is more
> valued
> > > > than
> > > > > >> > > > > correctness.
> > > > > >> > > > > > > However, if this would open the possibility for
> > operator
> > > > > >> > developers
> > > > > >> > > > to
> > > > > >> > > > > > > shoot themselves in the foot. This is especially
> true
> > if
> > > > > there
> > > > > >> > are
> > > > > >> > > > > > multiple
> > > > > >> > > > > > > input ports. as there is no easy way to guarantee
> > > > processing
> > > > > >> > order
> > > > > >> > > > for
> > > > > >> > > > > > > multiple input ports.
> > > > > >> > > > > > >
> > > > > >> > > > > > > We would like to arrive to a consensus and close
> this
> > > > > >> discussion
> > > > > >> > > soon
> > > > > >> > > > > > this
> > > > > >> > > > > > > time so we can start the work on this important
> > feature.
> > > > > >> > > > > > >
> > > > > >> > > > > > > Thanks!
> > > > > >> > > > > > >
> > > > > >> > > > > > > David
> > > > > >> > > > > > >
> > > > > >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <
> > > > > >> > > > v.ro...@datatorrent.com
> > > > > >> > > > > > > <javascript:;>>
> > > > > >> > > > > > > wrote:
> > > > > >> > > > > > >
> > > > > >> > > > > > > > It is not clear how operator will emit custom
> > control
> > > > > tuple
> > > > > >> at
> > > > > >> > > > window
> > > > > >> > > > > > > > boundaries. One way is to cache/accumulate control
> > > > tuples
> > > > > in
> > > > > >> > the
> > > > > >> > > > > > operator
> > > > > >> > > > > > > > output port till window closes (END_WINDOW is
> > inserted
> > > > > into
> > > > > >> the
> > > > > >> > > > > output
> > > > > >> > > > > > > > sink) or only allow an operator to emit control
> > tuples
> > > > > inside
> > > > > >> > the
> > > > > >> > > > > > > > endWindow(). The later is a slight variation of
> the
> > > > > operator
> > > > > >> > > output
> > > > > >> > > > > > port
> > > > > >> > > > > > > > caching behavior with the only difference that now
> > the
> > > > > >> operator
> > > > > >> > > > > itself
> > > > > >> > > > > > is
> > > > > >> > > > > > > > responsible for caching/accumulating control
> tuples.
> > > > Note
> > > > > >> that
> > > > > >> > in
> > > > > >> > > > > many
> > > > > >> > > > > > > > cases it will be necessary to postpone emitting
> > > payload
> > > > > >> tuples
> > > > > >> > > that
> > > > > >> > > > > > > > logically come after the custom control tuple till
> > the
> > > > > next
> > > > > >> > > window
> > > > > >> > > > > > > begins.
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > IMO, that too restrictive and in a case where
> input
> > > > > operator
> > > > > >> > > uses a
> > > > > >> > > > > > push
> > > > > >> > > > > > > > instead of a poll (for example, it provides an end
> > > point
> > > > > >> where
> > > > > >> > > > remote
> > > > > >> > > > > > > > agents may connect and publish/push data), control
> > > > tuples
> > > > > may
> > > > > >> > be
> > > > > >> > > > used
> > > > > >> > > > > > for
> > > > > >> > > > > > > > connect/disconnect/watermark broadcast to
> > > (partitioned)
> > > > > >> > > downstream
> > > > > >> > > > > > > > operators. In this case the platform just need to
> > > > > guarantee
> > > > > >> > order
> > > > > >> > > > > > barrier
> > > > > >> > > > > > > > (any tuple emitted prior to a control tuple needs
> to
> > > be
> > > > > >> > delivered
> > > > > >> > > > > prior
> > > > > >> > > > > > > to
> > > > > >> > > > > > > > the control tuple).
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > Thank you,
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > Vlad
> > > > > >> > > > > > > >
> > > > > >> > > > > > > >
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> > > > > >> > > > > > > >
> > > > > >> > > > > > > >> I agree with David. Allowing control tuples
> within
> > a
> > > > > window
> > > > > >> > > (along
> > > > > >> > > > > > with
> > > > > >> > > > > > > >> data tuples) creates very dangerous situation
> where
> > > > > >> guarantees
> > > > > >> > > are
> > > > > >> > > > > > > >> impacted. It is much safer to enable control
> tuples
> > > > > >> > > (send/receive)
> > > > > >> > > > > at
> > > > > >> > > > > > > >> window boundaries (after END_WINDOW of window N,
> > and
> > > > > before
> > > > > >> > > > > > BEGIN_WINDOW
> > > > > >> > > > > > > >> for window N+1). My take on David's list is
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >> 1. -> window boundaries -> Strong +1; there will
> > be a
> > > > big
> > > > > >> > issue
> > > > > >> > > > with
> > > > > >> > > > > > > >> guarantees for operators with multiple ports.
> (see
> > > > > Thomas's
> > > > > >> > > > > response)
> > > > > >> > > > > > > >> 2. -> All downstream windows -> +1, but there are
> > > > > >> situations;
> > > > > >> > a
> > > > > >> > > > > caveat
> > > > > >> > > > > > > >> could be "only to operators that implement
> control
> > > > tuple
> > > > > >> > > > > > > >> interface/listeners", which could effectively
> > > > translates
> > > > > to
> > > > > >> > "all
> > > > > >> > > > > > > >> interested
> > > > > >> > > > > > > >> downstream operators"
> > > > > >> > > > > > > >> 3. Only Input operator can create control tuples
> ->
> > > -1;
> > > > > is
> > > > > >> > > > > restrictive
> > > > > >> > > > > > > >> even
> > > > > >> > > > > > > >> though most likely 95% of the time it will be
> input
> > > > > >> operators
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >> Thks,
> > > > > >> > > > > > > >> Amol
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <
> > > > > >> > > > > tho...@datatorrent.com
> > > > > >> > > > > > > <javascript:;>>
> > > > > >> > > > > > > >> wrote:
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >> The windowing we discuss here is in general event
> > > time
> > > > > >> based,
> > > > > >> > > > > arrival
> > > > > >> > > > > > > time
> > > > > >> > > > > > > >>> is a special case of it.
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>> I don't think state changes can be made
> > independent
> > > of
> > > > > the
> > > > > >> > > > > streaming
> > > > > >> > > > > > > >>> window
> > > > > >> > > > > > > >>> boundary as it would prevent idempotent
> processing
> > > and
> > > > > >> > > > transitively
> > > > > >> > > > > > > >>> exactly
> > > > > >> > > > > > > >>> once. For that to work, tuples need to be
> > presented
> > > to
> > > > > the
> > > > > >> > > > operator
> > > > > >> > > > > > in
> > > > > >> > > > > > > a
> > > > > >> > > > > > > >>> guaranteed order *within* the streaming window,
> > > which
> > > > is
> > > > > >> not
> > > > > >> > > > > possible
> > > > > >> > > > > > > >>> with
> > > > > >> > > > > > > >>> multiple ports (and partitions).
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>> Thomas
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan <
> > > > > >> > > > da...@datatorrent.com
> > > > > >> > > > > > > <javascript:;>>
> > > > > >> > > > > > > >>> wrote:
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>> I think for session tracking, if the session
> > > > boundaries
> > > > > are
> > > > > >> > > > allowed
> > > > > >> > > > > > to
> > > > > >> > > > > > > be
> > > > > >> > > > > > > >>>> not aligned with the streaming window
> boundaries,
> > > the
> > > > > user
> > > > > >> > > will
> > > > > >> > > > > > have a
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>> much
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> bigger problem with idempotency. And in most
> > cases,
> > > > > >> session
> > > > > >> > > > > tracking
> > > > > >> > > > > > > is
> > > > > >> > > > > > > >>>> event time based, not ingression time or
> > processing
> > > > > time
> > > > > >> > > based,
> > > > > >> > > > so
> > > > > >> > > > > > > this
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>> may
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> never be a problem. But if that ever happens,
> the
> > > > user
> > > > > can
> > > > > >> > > > always
> > > > > >> > > > > > > alter
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>> the
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> default 500ms width.
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>> David
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <
> > > > > >> > > > > > v.ro...@datatorrent.com
> > > > > >> > > > > > > <javascript:;>>
> > > > > >> > > > > > > >>>> wrote:
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>> Ability to send custom control tuples within
> > window
> > > > > may be
> > > > > >> > > > useful,
> > > > > >> > > > > > for
> > > > > >> > > > > > > >>>>> example, for sessions tracking, where session
> > > > > boundaries
> > > > > >> > are
> > > > > >> > > > not
> > > > > >> > > > > > > >>>>>
> > > > > >> > > > > > > >>>> aligned
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> with window boundaries and 500 ms latency is
> not
> > > > > >> acceptable
> > > > > >> > > for
> > > > > >> > > > an
> > > > > >> > > > > > > >>>>> application.
> > > > > >> > > > > > > >>>>>
> > > > > >> > > > > > > >>>>> Thank you,
> > > > > >> > > > > > > >>>>>
> > > > > >> > > > > > > >>>>> Vlad
> > > > > >> > > > > > > >>>>>
> > > > > >> > > > > > > >>>>>
> > > > > >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> > > > > >> > > > > > > >>>>>
> > > > > >> > > > > > > >>>>> It should not matter from where the control
> > tuple
> > > is
> > > > > >> > > triggered.
> > > > > >> > > > > It
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>> will
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> be
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> good to have a generic mechanism to propagate
> it
> > > and
> > > > > >> other
> > > > > >> > > > things
> > > > > >> > > > > > can
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>> be
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> accomplished outside the engine. For example,
> the
> > > new
> > > > > >> > > > > comprehensive
> > > > > >> > > > > > > >>>>>> support
> > > > > >> > > > > > > >>>>>> for windowing will all be in Malhar, nothing
> > that
> > > > the
> > > > > >> > engine
> > > > > >> > > > > needs
> > > > > >> > > > > > > to
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>> know
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> about it except that we need the control tuple
> > for
> > > > > >> > watermark
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>> propagation
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> and idempotent processing.
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>>> I also think the main difference to other
> > tuples
> > > is
> > > > > the
> > > > > >> > need
> > > > > >> > > > to
> > > > > >> > > > > > send
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>> it
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> to
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> all partitions. Which is similar to checkpoint
> > > > window
> > > > > >> > tuples,
> > > > > >> > > > but
> > > > > >> > > > > > not
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>> the
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> same. Here, we probably also need the ability
> > for
> > > > the
> > > > > >> user
> > > > > >> > to
> > > > > >> > > > > > control
> > > > > >> > > > > > > >>>>>> whether such tuple should traverse the entire
> > DAG
> > > > or
> > > > > >> not.
> > > > > >> > > For
> > > > > >> > > > a
> > > > > >> > > > > > > batch
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>> use
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> case, for example, we may want to send the end
> > of
> > > > > file to
> > > > > >> > the
> > > > > >> > > > > next
> > > > > >> > > > > > > >>>>>> operator, but not beyond, if the operator has
> > > > > >> asynchronous
> > > > > >> > > > > > > processing
> > > > > >> > > > > > > >>>>>> logic
> > > > > >> > > > > > > >>>>>> in it.
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>>> For any logic to be idempotent, the control
> > tuple
> > > > > needs
> > > > > >> to
> > > > > >> > > be
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>> processed
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> at
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> a window boundary. Receiving the control tuple
> > in
> > > > the
> > > > > >> > window
> > > > > >> > > > > > callback
> > > > > >> > > > > > > >>>>>> would
> > > > > >> > > > > > > >>>>>> avoid having to track extra state in the
> > > operator.
> > > > I
> > > > > >> don't
> > > > > >> > > > think
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>> that's
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> a
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> major issue, but what is the use case for
> > > > processing a
> > > > > >> > > control
> > > > > >> > > > > > tuple
> > > > > >> > > > > > > >>>>>> within
> > > > > >> > > > > > > >>>>>> the window?
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>>> Thomas
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod
> > Immaneni
> > > <
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>> pra...@datatorrent.com <javascript:;>>
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> wrote:
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>>> For the use cases you mentioned, I think 1)
> and
> > > 2)
> > > > > are
> > > > > >> > more
> > > > > >> > > > > likely
> > > > > >> > > > > > > to
> > > > > >> > > > > > > >>>>>>
> > > > > >> > > > > > > >>>>>>> be controlled directly by the application,
> 3)
> > > and
> > > > 4)
> > > > > >> are
> > > > > >> > > more
> > > > > >> > > > > > > likely
> > > > > >> > > > > > > >>>>>>> going to be triggered externally and
> directly
> > > > > handled
> > > > > >> by
> > > > > >> > > the
> > > > > >> > > > > > engine
> > > > > >> > > > > > > >>>>>>> and 3) is already being implemented that way
> > > > > >> > > (apexcore-163).
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>>> The control tuples emitted by an operator
> > would
> > > be
> > > > > sent
> > > > > >> > to
> > > > > >> > > > all
> > > > > >> > > > > > > >>>>>>> downstream partitions isn't it and that
> would
> > be
> > > > the
> > > > > >> > chief
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>> distinction
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> compared to data (apart from the payload) which
> > > would
> > > > > get
> > > > > >> > > > > > partitioned
> > > > > >> > > > > > > >>>>>>> under normal circumstances. It would also be
> > > > > guaranteed
> > > > > >> > > that
> > > > > >> > > > > > > >>>>>>> downstream partitions will receive control
> > > tuples
> > > > > only
> > > > > >> > > after
> > > > > >> > > > > the
> > > > > >> > > > > > > data
> > > > > >> > > > > > > >>>>>>> that was sent before it so we could send it
> > > > > immediately
> > > > > >> > > when
> > > > > >> > > > it
> > > > > >> > > > > > is
> > > > > >> > > > > > > >>>>>>> emitted as opposed to window boundaries.
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>>> However during unification it is important
> to
> > > know
> > > > > if
> > > > > >> > these
> > > > > >> > > > > > control
> > > > > >> > > > > > > >>>>>>> tuples have been received from all upstream
> > > > > partitions
> > > > > >> > > before
> > > > > >> > > > > > > >>>>>>> proceeding with a control operation. One
> could
> > > > wait
> > > > > >> till
> > > > > >> > > end
> > > > > >> > > > of
> > > > > >> > > > > > the
> > > > > >> > > > > > > >>>>>>> window but that introduces a delay however
> > > small,
> > > > I
> > > > > >> would
> > > > > >> > > > like
> > > > > >> > > > > to
> > > > > >> > > > > > > add
> > > > > >> > > > > > > >>>>>>> to the proposal that the platform only hand
> > over
> > > > the
> > > > > >> > > control
> > > > > >> > > > > > tuple
> > > > > >> > > > > > > to
> > > > > >> > > > > > > >>>>>>> the unifier when it has been received from
> all
> > > > > upstream
> > > > > >> > > > > > partitions
> > > > > >> > > > > > > >>>>>>> much like how end window is processed but
> not
> > > wait
> > > > > till
> > > > > >> > the
> > > > > >> > > > > > actual
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>> end
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> of the window.
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>>> Regd your concern about idempotency, we
> > > typically
> > > > > care
> > > > > >> > > about
> > > > > >> > > > > > > >>>>>>> idempotency at a window level and doing the
> > > above
> > > > > will
> > > > > >> > > still
> > > > > >> > > > > > allow
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>> the
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> operators to preserve that easily.
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>>> Thanks
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan <
> > > > > >> > > > da...@datatorrent.com
> > > > > >> > > > > > > <javascript:;>>
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>> wrote:
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> Hi all,
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>>> I would like to propose a new feature to
> the
> > > Apex
> > > > > core
> > > > > >> > > > engine
> > > > > >> > > > > --
> > > > > >> > > > > > > the
> > > > > >> > > > > > > >>>>>>>> support of custom control tuples.
> Currently,
> > we
> > > > > have
> > > > > >> > > control
> > > > > >> > > > > > > tuples
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>> such
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> as
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so
> > on,
> > > > > but we
> > > > > >> > > don't
> > > > > >> > > > > > have
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>> the
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> support for applications to insert their own
> > > control
> > > > > >> tuples.
> > > > > >> > > The
> > > > > >> > > > > way
> > > > > >> > > > > > > >>>>>>>> currently to get around this is to use data
> > > > tuples
> > > > > and
> > > > > >> > > have
> > > > > >> > > > a
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>> separate
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> port
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>>> for such tuples that sends tuples to all
> > > > partitions
> > > > > of
> > > > > >> > the
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>> downstream
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> operators, which is not exactly developer
> > friendly.
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>>> We have already seen a number of use cases
> > that
> > > > can
> > > > > >> use
> > > > > >> > > this
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>> feature:
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> 1) Batch support: We need to tell all operators
> > of
> > > > the
> > > > > >> > > physical
> > > > > >> > > > > DAG
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>> when
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> a
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>>> batch starts and ends, so the operators can
> do
> > > > > whatever
> > > > > >> > > that
> > > > > >> > > > is
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>> needed
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> upon
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>>> the start or the end of a batch.
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>>> 2) Watermark: To support the concepts of
> > event
> > > > time
> > > > > >> > > > windowing,
> > > > > >> > > > > > the
> > > > > >> > > > > > > >>>>>>>> watermark control tuple is needed to tell
> > which
> > > > > >> windows
> > > > > >> > > > should
> > > > > >> > > > > > be
> > > > > >> > > > > > > >>>>>>>> considered late.
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>>> 3) Changing operator properties: We do have
> > the
> > > > > >> support
> > > > > >> > of
> > > > > >> > > > > > > changing
> > > > > >> > > > > > > >>>>>>>> operator properties on the fly, but with a
> > > custom
> > > > > >> > control
> > > > > >> > > > > tuple,
> > > > > >> > > > > > > the
> > > > > >> > > > > > > >>>>>>>> command to change operator properties can
> be
> > > > window
> > > > > >> > > aligned
> > > > > >> > > > > for
> > > > > >> > > > > > > all
> > > > > >> > > > > > > >>>>>>>> partitions and also across the DAG.
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>>> 4) Recording tuples: Like changing operator
> > > > > >> properties,
> > > > > >> > we
> > > > > >> > > > do
> > > > > >> > > > > > have
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>> this
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> support now but only at the individual
> physical
> > > > > operator
> > > > > >> > > level,
> > > > > >> > > > > and
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>>> without
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>>> control of which window to record tuples
> for.
> > > > With a
> > > > > >> > custom
> > > > > >> > > > > > control
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>>> tuple,
> > > > > >> > > > > > > >>>>>>>
> > > > > >> > > > > > > >>>>>>> because a control tuple must belong to a
> > window,
> > > > all
> > > > > >> > > > operators
> > > > > >> > > > > in
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>> the
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> DAG
> > > > > >> > > > > > > >>>>>>>> can start (and stop) recording for the same
> > > > > windows.
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>>> I can think of two options to achieve this:
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>>> 1) new custom control tuple type that takes
> > > > user's
> > > > > >> > > > > serializable
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>> object.
> > > > > >> > > > > > > >>>>
> > > > > >> > > > > > > >>>>> 2) piggy back the current BEGIN_WINDOW and
> > > > END_WINDOW
> > > > > >> > control
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>> tuples.
> > > > > >> > > > > > > >>>
> > > > > >> > > > > > > >>>> Please provide your feedback. Thank you.
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>>> David
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >>>>>>>>
> > > > > >> > > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to