I agree with David. Allowing control tuples within a window (along with data tuples) creates very dangerous situation where guarantees are impacted. It is much safer to enable control tuples (send/receive) at window boundaries (after END_WINDOW of window N, and before BEGIN_WINDOW for window N+1). My take on David's list is
1. -> window boundaries -> Strong +1; there will be a big issue with guarantees for operators with multiple ports. (see Thomas's response) 2. -> All downstream windows -> +1, but there are situations; a caveat could be "only to operators that implement control tuple interface/listeners", which could effectively translates to "all interested downstream operators" 3. Only Input operator can create control tuples -> -1; is restrictive even though most likely 95% of the time it will be input operators Thks, Amol On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <[email protected]> wrote: > The windowing we discuss here is in general event time based, arrival time > is a special case of it. > > I don't think state changes can be made independent of the streaming window > boundary as it would prevent idempotent processing and transitively exactly > once. For that to work, tuples need to be presented to the operator in a > guaranteed order *within* the streaming window, which is not possible with > multiple ports (and partitions). > > Thomas > > On Mon, Jun 27, 2016 at 2:53 PM, David Yan <[email protected]> wrote: > > > I think for session tracking, if the session boundaries are allowed to be > > not aligned with the streaming window boundaries, the user will have a > much > > bigger problem with idempotency. And in most cases, session tracking is > > event time based, not ingression time or processing time based, so this > may > > never be a problem. But if that ever happens, the user can always alter > the > > default 500ms width. > > > > David > > > > On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <[email protected]> > > wrote: > > > > > Ability to send custom control tuples within window may be useful, for > > > example, for sessions tracking, where session boundaries are not > aligned > > > with window boundaries and 500 ms latency is not acceptable for an > > > application. > > > > > > Thank you, > > > > > > Vlad > > > > > > > > > On 6/25/16 10:52, Thomas Weise wrote: > > > > > >> It should not matter from where the control tuple is triggered. It > will > > be > > >> good to have a generic mechanism to propagate it and other things can > be > > >> accomplished outside the engine. For example, the new comprehensive > > >> support > > >> for windowing will all be in Malhar, nothing that the engine needs to > > know > > >> about it except that we need the control tuple for watermark > propagation > > >> and idempotent processing. > > >> > > >> I also think the main difference to other tuples is the need to send > it > > to > > >> all partitions. Which is similar to checkpoint window tuples, but not > > the > > >> same. Here, we probably also need the ability for the user to control > > >> whether such tuple should traverse the entire DAG or not. For a batch > > use > > >> case, for example, we may want to send the end of file to the next > > >> operator, but not beyond, if the operator has asynchronous processing > > >> logic > > >> in it. > > >> > > >> For any logic to be idempotent, the control tuple needs to be > processed > > at > > >> a window boundary. Receiving the control tuple in the window callback > > >> would > > >> avoid having to track extra state in the operator. I don't think > that's > > a > > >> major issue, but what is the use case for processing a control tuple > > >> within > > >> the window? > > >> > > >> Thomas > > >> > > >> > > >> > > >> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni < > > [email protected]> > > >> wrote: > > >> > > >> For the use cases you mentioned, I think 1) and 2) are more likely to > > >>> be controlled directly by the application, 3) and 4) are more likely > > >>> going to be triggered externally and directly handled by the engine > > >>> and 3) is already being implemented that way (apexcore-163). > > >>> > > >>> The control tuples emitted by an operator would be sent to all > > >>> downstream partitions isn't it and that would be the chief > distinction > > >>> compared to data (apart from the payload) which would get partitioned > > >>> under normal circumstances. It would also be guaranteed that > > >>> downstream partitions will receive control tuples only after the data > > >>> that was sent before it so we could send it immediately when it is > > >>> emitted as opposed to window boundaries. > > >>> > > >>> However during unification it is important to know if these control > > >>> tuples have been received from all upstream partitions before > > >>> proceeding with a control operation. One could wait till end of the > > >>> window but that introduces a delay however small, I would like to add > > >>> to the proposal that the platform only hand over the control tuple to > > >>> the unifier when it has been received from all upstream partitions > > >>> much like how end window is processed but not wait till the actual > end > > >>> of the window. > > >>> > > >>> Regd your concern about idempotency, we typically care about > > >>> idempotency at a window level and doing the above will still allow > the > > >>> operators to preserve that easily. > > >>> > > >>> Thanks > > >>> > > >>> On Jun 24, 2016, at 11:22 AM, David Yan <[email protected]> > wrote: > > >>>> > > >>>> Hi all, > > >>>> > > >>>> I would like to propose a new feature to the Apex core engine -- the > > >>>> support of custom control tuples. Currently, we have control tuples > > such > > >>>> > > >>> as > > >>> > > >>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have > the > > >>>> support for applications to insert their own control tuples. The way > > >>>> currently to get around this is to use data tuples and have a > separate > > >>>> > > >>> port > > >>> > > >>>> for such tuples that sends tuples to all partitions of the > downstream > > >>>> operators, which is not exactly developer friendly. > > >>>> > > >>>> We have already seen a number of use cases that can use this > feature: > > >>>> > > >>>> 1) Batch support: We need to tell all operators of the physical DAG > > when > > >>>> > > >>> a > > >>> > > >>>> batch starts and ends, so the operators can do whatever that is > needed > > >>>> > > >>> upon > > >>> > > >>>> the start or the end of a batch. > > >>>> > > >>>> 2) Watermark: To support the concepts of event time windowing, the > > >>>> watermark control tuple is needed to tell which windows should be > > >>>> considered late. > > >>>> > > >>>> 3) Changing operator properties: We do have the support of changing > > >>>> operator properties on the fly, but with a custom control tuple, the > > >>>> command to change operator properties can be window aligned for all > > >>>> partitions and also across the DAG. > > >>>> > > >>>> 4) Recording tuples: Like changing operator properties, we do have > > this > > >>>> support now but only at the individual physical operator level, and > > >>>> > > >>> without > > >>> > > >>>> control of which window to record tuples for. With a custom control > > >>>> > > >>> tuple, > > >>> > > >>>> because a control tuple must belong to a window, all operators in > the > > >>>> DAG > > >>>> can start (and stop) recording for the same windows. > > >>>> > > >>>> I can think of two options to achieve this: > > >>>> > > >>>> 1) new custom control tuple type that takes user's serializable > > object. > > >>>> > > >>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW control > tuples. > > >>>> > > >>>> Please provide your feedback. Thank you. > > >>>> > > >>>> David > > >>>> > > >>> > > > > > >
