I agree with David. Allowing control tuples within a window (along with
data tuples) creates very dangerous situation where guarantees are
impacted. It is much safer to enable control tuples (send/receive) at
window boundaries (after END_WINDOW of window N, and before BEGIN_WINDOW
for window N+1). My take on David's list is

1. -> window boundaries -> Strong +1; there will be a big issue with
guarantees for operators with multiple ports. (see Thomas's response)
2. -> All downstream windows -> +1, but there are situations; a caveat
could be "only to operators that implement control tuple
interface/listeners", which could effectively translates to "all interested
downstream operators"
3. Only Input operator can create control tuples -> -1; is restrictive even
though most likely 95% of the time it will be input operators

Thks,
Amol


On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <[email protected]>
wrote:

> The windowing we discuss here is in general event time based, arrival time
> is a special case of it.
>
> I don't think state changes can be made independent of the streaming window
> boundary as it would prevent idempotent processing and transitively exactly
> once. For that to work, tuples need to be presented to the operator in a
> guaranteed order *within* the streaming window, which is not possible with
> multiple ports (and partitions).
>
> Thomas
>
> On Mon, Jun 27, 2016 at 2:53 PM, David Yan <[email protected]> wrote:
>
> > I think for session tracking, if the session boundaries are allowed to be
> > not aligned with the streaming window boundaries, the user will have a
> much
> > bigger problem with idempotency. And in most cases, session tracking is
> > event time based, not ingression time or processing time based, so this
> may
> > never be a problem. But if that ever happens, the user can always alter
> the
> > default 500ms width.
> >
> > David
> >
> > On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <[email protected]>
> > wrote:
> >
> > > Ability to send custom control tuples within window may be useful, for
> > > example, for sessions tracking, where session boundaries are not
> aligned
> > > with window boundaries and 500 ms latency is not acceptable for an
> > > application.
> > >
> > > Thank you,
> > >
> > > Vlad
> > >
> > >
> > > On 6/25/16 10:52, Thomas Weise wrote:
> > >
> > >> It should not matter from where the control tuple is triggered. It
> will
> > be
> > >> good to have a generic mechanism to propagate it and other things can
> be
> > >> accomplished outside the engine. For example, the new comprehensive
> > >> support
> > >> for windowing will all be in Malhar, nothing that the engine needs to
> > know
> > >> about it except that we need the control tuple for watermark
> propagation
> > >> and idempotent processing.
> > >>
> > >> I also think the main difference to other tuples is the need to send
> it
> > to
> > >> all partitions. Which is similar to checkpoint window tuples, but not
> > the
> > >> same. Here, we probably also need the ability for the user to control
> > >> whether such tuple should traverse the entire DAG or not. For a batch
> > use
> > >> case, for example, we may want to send the end of file to the next
> > >> operator, but not beyond, if the operator has asynchronous processing
> > >> logic
> > >> in it.
> > >>
> > >> For any logic to be idempotent, the control tuple needs to be
> processed
> > at
> > >> a window boundary. Receiving the control tuple in the window callback
> > >> would
> > >> avoid having to track extra state in the operator. I don't think
> that's
> > a
> > >> major issue, but what is the use case for processing a control tuple
> > >> within
> > >> the window?
> > >>
> > >> Thomas
> > >>
> > >>
> > >>
> > >> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni <
> > [email protected]>
> > >> wrote:
> > >>
> > >> For the use cases you mentioned, I think 1) and 2) are more likely to
> > >>> be controlled directly by the application, 3) and 4) are more likely
> > >>> going to be triggered externally and directly handled by the engine
> > >>> and 3) is already being implemented that way (apexcore-163).
> > >>>
> > >>> The control tuples emitted by an operator would be sent to all
> > >>> downstream partitions isn't it and that would be the chief
> distinction
> > >>> compared to data (apart from the payload) which would get partitioned
> > >>> under normal circumstances. It would also be guaranteed that
> > >>> downstream partitions will receive control tuples only after the data
> > >>> that was sent before it so we could send it immediately when it is
> > >>> emitted as opposed to window boundaries.
> > >>>
> > >>> However during unification it is important to know if these control
> > >>> tuples have been received from all upstream partitions before
> > >>> proceeding with a control operation. One could wait till end of the
> > >>> window but that introduces a delay however small, I would like to add
> > >>> to the proposal that the platform only hand over the control tuple to
> > >>> the unifier when it has been received from all upstream partitions
> > >>> much like how end window is processed but not wait till the actual
> end
> > >>> of the window.
> > >>>
> > >>> Regd your concern about idempotency, we typically care about
> > >>> idempotency at a window level and doing the above will still allow
> the
> > >>> operators to preserve that easily.
> > >>>
> > >>> Thanks
> > >>>
> > >>> On Jun 24, 2016, at 11:22 AM, David Yan <[email protected]>
> wrote:
> > >>>>
> > >>>> Hi all,
> > >>>>
> > >>>> I would like to propose a new feature to the Apex core engine -- the
> > >>>> support of custom control tuples. Currently, we have control tuples
> > such
> > >>>>
> > >>> as
> > >>>
> > >>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have
> the
> > >>>> support for applications to insert their own control tuples. The way
> > >>>> currently to get around this is to use data tuples and have a
> separate
> > >>>>
> > >>> port
> > >>>
> > >>>> for such tuples that sends tuples to all partitions of the
> downstream
> > >>>> operators, which is not exactly developer friendly.
> > >>>>
> > >>>> We have already seen a number of use cases that can use this
> feature:
> > >>>>
> > >>>> 1) Batch support: We need to tell all operators of the physical DAG
> > when
> > >>>>
> > >>> a
> > >>>
> > >>>> batch starts and ends, so the operators can do whatever that is
> needed
> > >>>>
> > >>> upon
> > >>>
> > >>>> the start or the end of a batch.
> > >>>>
> > >>>> 2) Watermark: To support the concepts of event time windowing, the
> > >>>> watermark control tuple is needed to tell which windows should be
> > >>>> considered late.
> > >>>>
> > >>>> 3) Changing operator properties: We do have the support of changing
> > >>>> operator properties on the fly, but with a custom control tuple, the
> > >>>> command to change operator properties can be window aligned for all
> > >>>> partitions and also across the DAG.
> > >>>>
> > >>>> 4) Recording tuples: Like changing operator properties, we do have
> > this
> > >>>> support now but only at the individual physical operator level, and
> > >>>>
> > >>> without
> > >>>
> > >>>> control of which window to record tuples for. With a custom control
> > >>>>
> > >>> tuple,
> > >>>
> > >>>> because a control tuple must belong to a window, all operators in
> the
> > >>>> DAG
> > >>>> can start (and stop) recording for the same windows.
> > >>>>
> > >>>> I can think of two options to achieve this:
> > >>>>
> > >>>> 1) new custom control tuple type that takes user's serializable
> > object.
> > >>>>
> > >>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW control
> tuples.
> > >>>>
> > >>>> Please provide your feedback. Thank you.
> > >>>>
> > >>>> David
> > >>>>
> > >>>
> > >
> >
>

Reply via email to