There is not guarantee about the ordering of events within a streaming window with multiple upstream partitions. This would require a synchronization logic similar to what the streaming window provides, hence I would expect it to be best supported as part of the same window synchronization.
On Wed, Nov 2, 2016 at 10:46 PM, Pramod Immaneni <pra...@datatorrent.com> wrote: > With option 2, users can still do idempotent processing by delaying their > processing of the control tuples to end window. They have the flexibility > with this option. In the usual scenarios, you will have one port and given > that control tuples will be sent to all partitions, all the data sent > before the control tuple will arrive before the control tuple for all > downstream partitions and users can still do idempotent processing. For the > case with multiple ports you can delay processing till end window. > > On Wed, Nov 2, 2016 at 2:33 PM, Amol Kekre <a...@datatorrent.com> wrote: > > > A feature that incurs risk with processing order, and more so with > > idempotency is a big enough reason to worry about with option 2. Is there > > is a critical use case that needs this feature? > > > > Thks > > Amol > > > > > > On Wed, Nov 2, 2016 at 1:25 PM, Pramod Immaneni <pra...@datatorrent.com> > > wrote: > > > > > I like approach 2 as it gives more flexibility and also allows for > > > low-latency options. I think the following are important as well. > > > > > > 1. Delivering control tuples to all downstream partitions. > > > 2. What mechanism will the operator developer use to send the control > > > tuple? Will it be an additional mehod on the output port? > > > > > > Thanks > > > > > > On Wed, Nov 2, 2016 at 1:16 PM, David Yan <da...@datatorrent.com> > wrote: > > > > > > > Hi all, > > > > > > > > I would like to renew the discussion of control tuples. > > > > > > > > Last time, we were in a debate about whether: > > > > > > > > 1) the platform should enforce that control tuples are delivered at > > > window > > > > boundaries only > > > > > > > > or: > > > > > > > > 2) the platform should deliver control tuples just as other tuples > and > > > it's > > > > the operator developers' choice whether to handle the control tuples > as > > > > they arrive or delay the processing till the next window boundary. > > > > > > > > To summarize the pros and cons: > > > > > > > > Approach 1: If processing control tuples results in changes of the > > > behavior > > > > of the operator, if idempotency needs to be preserved, the processing > > > must > > > > be done at window boundaries. This approach will save the operator > > > > developers headache to ensure that. However, this will take away the > > > > choices from the operator developer if they just need to process the > > > > control tuples as soon as possible. > > > > > > > > Approach 2: The operator has a chance to immediately process control > > > > tuples. This would be useful if latency is more valued than > > correctness. > > > > However, if this would open the possibility for operator developers > to > > > > shoot themselves in the foot. This is especially true if there are > > > multiple > > > > input ports. as there is no easy way to guarantee processing order > for > > > > multiple input ports. > > > > > > > > We would like to arrive to a consensus and close this discussion soon > > > this > > > > time so we can start the work on this important feature. > > > > > > > > Thanks! > > > > > > > > David > > > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov < > v.ro...@datatorrent.com> > > > > wrote: > > > > > > > > > It is not clear how operator will emit custom control tuple at > window > > > > > boundaries. One way is to cache/accumulate control tuples in the > > > operator > > > > > output port till window closes (END_WINDOW is inserted into the > > output > > > > > sink) or only allow an operator to emit control tuples inside the > > > > > endWindow(). The later is a slight variation of the operator output > > > port > > > > > caching behavior with the only difference that now the operator > > itself > > > is > > > > > responsible for caching/accumulating control tuples. Note that in > > many > > > > > cases it will be necessary to postpone emitting payload tuples that > > > > > logically come after the custom control tuple till the next window > > > > begins. > > > > > > > > > > IMO, that too restrictive and in a case where input operator uses a > > > push > > > > > instead of a poll (for example, it provides an end point where > remote > > > > > agents may connect and publish/push data), control tuples may be > used > > > for > > > > > connect/disconnect/watermark broadcast to (partitioned) downstream > > > > > operators. In this case the platform just need to guarantee order > > > barrier > > > > > (any tuple emitted prior to a control tuple needs to be delivered > > prior > > > > to > > > > > the control tuple). > > > > > > > > > > Thank you, > > > > > > > > > > Vlad > > > > > > > > > > > > > > > > > > > > On 6/27/16 19:36, Amol Kekre wrote: > > > > > > > > > >> I agree with David. Allowing control tuples within a window (along > > > with > > > > >> data tuples) creates very dangerous situation where guarantees are > > > > >> impacted. It is much safer to enable control tuples (send/receive) > > at > > > > >> window boundaries (after END_WINDOW of window N, and before > > > BEGIN_WINDOW > > > > >> for window N+1). My take on David's list is > > > > >> > > > > >> 1. -> window boundaries -> Strong +1; there will be a big issue > with > > > > >> guarantees for operators with multiple ports. (see Thomas's > > response) > > > > >> 2. -> All downstream windows -> +1, but there are situations; a > > caveat > > > > >> could be "only to operators that implement control tuple > > > > >> interface/listeners", which could effectively translates to "all > > > > >> interested > > > > >> downstream operators" > > > > >> 3. Only Input operator can create control tuples -> -1; is > > restrictive > > > > >> even > > > > >> though most likely 95% of the time it will be input operators > > > > >> > > > > >> Thks, > > > > >> Amol > > > > >> > > > > >> > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise < > > tho...@datatorrent.com > > > > > > > > >> wrote: > > > > >> > > > > >> The windowing we discuss here is in general event time based, > > arrival > > > > time > > > > >>> is a special case of it. > > > > >>> > > > > >>> I don't think state changes can be made independent of the > > streaming > > > > >>> window > > > > >>> boundary as it would prevent idempotent processing and > transitively > > > > >>> exactly > > > > >>> once. For that to work, tuples need to be presented to the > operator > > > in > > > > a > > > > >>> guaranteed order *within* the streaming window, which is not > > possible > > > > >>> with > > > > >>> multiple ports (and partitions). > > > > >>> > > > > >>> Thomas > > > > >>> > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan < > da...@datatorrent.com> > > > > >>> wrote: > > > > >>> > > > > >>> I think for session tracking, if the session boundaries are > allowed > > > to > > > > be > > > > >>>> not aligned with the streaming window boundaries, the user will > > > have a > > > > >>>> > > > > >>> much > > > > >>> > > > > >>>> bigger problem with idempotency. And in most cases, session > > tracking > > > > is > > > > >>>> event time based, not ingression time or processing time based, > so > > > > this > > > > >>>> > > > > >>> may > > > > >>> > > > > >>>> never be a problem. But if that ever happens, the user can > always > > > > alter > > > > >>>> > > > > >>> the > > > > >>> > > > > >>>> default 500ms width. > > > > >>>> > > > > >>>> David > > > > >>>> > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov < > > > v.ro...@datatorrent.com> > > > > >>>> wrote: > > > > >>>> > > > > >>>> Ability to send custom control tuples within window may be > useful, > > > for > > > > >>>>> example, for sessions tracking, where session boundaries are > not > > > > >>>>> > > > > >>>> aligned > > > > >>> > > > > >>>> with window boundaries and 500 ms latency is not acceptable for > an > > > > >>>>> application. > > > > >>>>> > > > > >>>>> Thank you, > > > > >>>>> > > > > >>>>> Vlad > > > > >>>>> > > > > >>>>> > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote: > > > > >>>>> > > > > >>>>> It should not matter from where the control tuple is triggered. > > It > > > > >>>>>> > > > > >>>>> will > > > > >>> > > > > >>>> be > > > > >>>> > > > > >>>>> good to have a generic mechanism to propagate it and other > things > > > can > > > > >>>>>> > > > > >>>>> be > > > > >>> > > > > >>>> accomplished outside the engine. For example, the new > > comprehensive > > > > >>>>>> support > > > > >>>>>> for windowing will all be in Malhar, nothing that the engine > > needs > > > > to > > > > >>>>>> > > > > >>>>> know > > > > >>>> > > > > >>>>> about it except that we need the control tuple for watermark > > > > >>>>>> > > > > >>>>> propagation > > > > >>> > > > > >>>> and idempotent processing. > > > > >>>>>> > > > > >>>>>> I also think the main difference to other tuples is the need > to > > > send > > > > >>>>>> > > > > >>>>> it > > > > >>> > > > > >>>> to > > > > >>>> > > > > >>>>> all partitions. Which is similar to checkpoint window tuples, > but > > > not > > > > >>>>>> > > > > >>>>> the > > > > >>>> > > > > >>>>> same. Here, we probably also need the ability for the user to > > > control > > > > >>>>>> whether such tuple should traverse the entire DAG or not. For > a > > > > batch > > > > >>>>>> > > > > >>>>> use > > > > >>>> > > > > >>>>> case, for example, we may want to send the end of file to the > > next > > > > >>>>>> operator, but not beyond, if the operator has asynchronous > > > > processing > > > > >>>>>> logic > > > > >>>>>> in it. > > > > >>>>>> > > > > >>>>>> For any logic to be idempotent, the control tuple needs to be > > > > >>>>>> > > > > >>>>> processed > > > > >>> > > > > >>>> at > > > > >>>> > > > > >>>>> a window boundary. Receiving the control tuple in the window > > > callback > > > > >>>>>> would > > > > >>>>>> avoid having to track extra state in the operator. I don't > think > > > > >>>>>> > > > > >>>>> that's > > > > >>> > > > > >>>> a > > > > >>>> > > > > >>>>> major issue, but what is the use case for processing a control > > > tuple > > > > >>>>>> within > > > > >>>>>> the window? > > > > >>>>>> > > > > >>>>>> Thomas > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni < > > > > >>>>>> > > > > >>>>> pra...@datatorrent.com> > > > > >>>> > > > > >>>>> wrote: > > > > >>>>>> > > > > >>>>>> For the use cases you mentioned, I think 1) and 2) are more > > likely > > > > to > > > > >>>>>> > > > > >>>>>>> be controlled directly by the application, 3) and 4) are more > > > > likely > > > > >>>>>>> going to be triggered externally and directly handled by the > > > engine > > > > >>>>>>> and 3) is already being implemented that way (apexcore-163). > > > > >>>>>>> > > > > >>>>>>> The control tuples emitted by an operator would be sent to > all > > > > >>>>>>> downstream partitions isn't it and that would be the chief > > > > >>>>>>> > > > > >>>>>> distinction > > > > >>> > > > > >>>> compared to data (apart from the payload) which would get > > > partitioned > > > > >>>>>>> under normal circumstances. It would also be guaranteed that > > > > >>>>>>> downstream partitions will receive control tuples only after > > the > > > > data > > > > >>>>>>> that was sent before it so we could send it immediately when > it > > > is > > > > >>>>>>> emitted as opposed to window boundaries. > > > > >>>>>>> > > > > >>>>>>> However during unification it is important to know if these > > > control > > > > >>>>>>> tuples have been received from all upstream partitions before > > > > >>>>>>> proceeding with a control operation. One could wait till end > of > > > the > > > > >>>>>>> window but that introduces a delay however small, I would > like > > to > > > > add > > > > >>>>>>> to the proposal that the platform only hand over the control > > > tuple > > > > to > > > > >>>>>>> the unifier when it has been received from all upstream > > > partitions > > > > >>>>>>> much like how end window is processed but not wait till the > > > actual > > > > >>>>>>> > > > > >>>>>> end > > > > >>> > > > > >>>> of the window. > > > > >>>>>>> > > > > >>>>>>> Regd your concern about idempotency, we typically care about > > > > >>>>>>> idempotency at a window level and doing the above will still > > > allow > > > > >>>>>>> > > > > >>>>>> the > > > > >>> > > > > >>>> operators to preserve that easily. > > > > >>>>>>> > > > > >>>>>>> Thanks > > > > >>>>>>> > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan < > da...@datatorrent.com > > > > > > > >>>>>>> > > > > >>>>>> wrote: > > > > >>> > > > > >>>> Hi all, > > > > >>>>>>>> > > > > >>>>>>>> I would like to propose a new feature to the Apex core > engine > > -- > > > > the > > > > >>>>>>>> support of custom control tuples. Currently, we have control > > > > tuples > > > > >>>>>>>> > > > > >>>>>>> such > > > > >>>> > > > > >>>>> as > > > > >>>>>>> > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't > > > have > > > > >>>>>>>> > > > > >>>>>>> the > > > > >>> > > > > >>>> support for applications to insert their own control tuples. The > > way > > > > >>>>>>>> currently to get around this is to use data tuples and have > a > > > > >>>>>>>> > > > > >>>>>>> separate > > > > >>> > > > > >>>> port > > > > >>>>>>> > > > > >>>>>>> for such tuples that sends tuples to all partitions of the > > > > >>>>>>>> > > > > >>>>>>> downstream > > > > >>> > > > > >>>> operators, which is not exactly developer friendly. > > > > >>>>>>>> > > > > >>>>>>>> We have already seen a number of use cases that can use this > > > > >>>>>>>> > > > > >>>>>>> feature: > > > > >>> > > > > >>>> 1) Batch support: We need to tell all operators of the physical > > DAG > > > > >>>>>>>> > > > > >>>>>>> when > > > > >>>> > > > > >>>>> a > > > > >>>>>>> > > > > >>>>>>> batch starts and ends, so the operators can do whatever that > is > > > > >>>>>>>> > > > > >>>>>>> needed > > > > >>> > > > > >>>> upon > > > > >>>>>>> > > > > >>>>>>> the start or the end of a batch. > > > > >>>>>>>> > > > > >>>>>>>> 2) Watermark: To support the concepts of event time > windowing, > > > the > > > > >>>>>>>> watermark control tuple is needed to tell which windows > should > > > be > > > > >>>>>>>> considered late. > > > > >>>>>>>> > > > > >>>>>>>> 3) Changing operator properties: We do have the support of > > > > changing > > > > >>>>>>>> operator properties on the fly, but with a custom control > > tuple, > > > > the > > > > >>>>>>>> command to change operator properties can be window aligned > > for > > > > all > > > > >>>>>>>> partitions and also across the DAG. > > > > >>>>>>>> > > > > >>>>>>>> 4) Recording tuples: Like changing operator properties, we > do > > > have > > > > >>>>>>>> > > > > >>>>>>> this > > > > >>>> > > > > >>>>> support now but only at the individual physical operator level, > > and > > > > >>>>>>>> > > > > >>>>>>>> without > > > > >>>>>>> > > > > >>>>>>> control of which window to record tuples for. With a custom > > > control > > > > >>>>>>>> > > > > >>>>>>>> tuple, > > > > >>>>>>> > > > > >>>>>>> because a control tuple must belong to a window, all > operators > > in > > > > >>>>>>>> > > > > >>>>>>> the > > > > >>> > > > > >>>> DAG > > > > >>>>>>>> can start (and stop) recording for the same windows. > > > > >>>>>>>> > > > > >>>>>>>> I can think of two options to achieve this: > > > > >>>>>>>> > > > > >>>>>>>> 1) new custom control tuple type that takes user's > > serializable > > > > >>>>>>>> > > > > >>>>>>> object. > > > > >>>> > > > > >>>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW control > > > > >>>>>>>> > > > > >>>>>>> tuples. > > > > >>> > > > > >>>> Please provide your feedback. Thank you. > > > > >>>>>>>> > > > > >>>>>>>> David > > > > >>>>>>>> > > > > >>>>>>>> > > > > > > > > > > > > > > >