It is not clear how operator will emit custom control tuple at window
boundaries. One way is to cache/accumulate control tuples in the
operator output port till window closes (END_WINDOW is inserted into the
output sink) or only allow an operator to emit control tuples inside the
endWindow(). The later is a slight variation of the operator output port
caching behavior with the only difference that now the operator itself
is responsible for caching/accumulating control tuples. Note that in
many cases it will be necessary to postpone emitting payload tuples that
logically come after the custom control tuple till the next window begins.
IMO, that too restrictive and in a case where input operator uses a push
instead of a poll (for example, it provides an end point where remote
agents may connect and publish/push data), control tuples may be used
for connect/disconnect/watermark broadcast to (partitioned) downstream
operators. In this case the platform just need to guarantee order
barrier (any tuple emitted prior to a control tuple needs to be
delivered prior to the control tuple).
Thank you,
Vlad
On 6/27/16 19:36, Amol Kekre wrote:
I agree with David. Allowing control tuples within a window (along with
data tuples) creates very dangerous situation where guarantees are
impacted. It is much safer to enable control tuples (send/receive) at
window boundaries (after END_WINDOW of window N, and before BEGIN_WINDOW
for window N+1). My take on David's list is
1. -> window boundaries -> Strong +1; there will be a big issue with
guarantees for operators with multiple ports. (see Thomas's response)
2. -> All downstream windows -> +1, but there are situations; a caveat
could be "only to operators that implement control tuple
interface/listeners", which could effectively translates to "all interested
downstream operators"
3. Only Input operator can create control tuples -> -1; is restrictive even
though most likely 95% of the time it will be input operators
Thks,
Amol
On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <[email protected]>
wrote:
The windowing we discuss here is in general event time based, arrival time
is a special case of it.
I don't think state changes can be made independent of the streaming window
boundary as it would prevent idempotent processing and transitively exactly
once. For that to work, tuples need to be presented to the operator in a
guaranteed order *within* the streaming window, which is not possible with
multiple ports (and partitions).
Thomas
On Mon, Jun 27, 2016 at 2:53 PM, David Yan <[email protected]> wrote:
I think for session tracking, if the session boundaries are allowed to be
not aligned with the streaming window boundaries, the user will have a
much
bigger problem with idempotency. And in most cases, session tracking is
event time based, not ingression time or processing time based, so this
may
never be a problem. But if that ever happens, the user can always alter
the
default 500ms width.
David
On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <[email protected]>
wrote:
Ability to send custom control tuples within window may be useful, for
example, for sessions tracking, where session boundaries are not
aligned
with window boundaries and 500 ms latency is not acceptable for an
application.
Thank you,
Vlad
On 6/25/16 10:52, Thomas Weise wrote:
It should not matter from where the control tuple is triggered. It
will
be
good to have a generic mechanism to propagate it and other things can
be
accomplished outside the engine. For example, the new comprehensive
support
for windowing will all be in Malhar, nothing that the engine needs to
know
about it except that we need the control tuple for watermark
propagation
and idempotent processing.
I also think the main difference to other tuples is the need to send
it
to
all partitions. Which is similar to checkpoint window tuples, but not
the
same. Here, we probably also need the ability for the user to control
whether such tuple should traverse the entire DAG or not. For a batch
use
case, for example, we may want to send the end of file to the next
operator, but not beyond, if the operator has asynchronous processing
logic
in it.
For any logic to be idempotent, the control tuple needs to be
processed
at
a window boundary. Receiving the control tuple in the window callback
would
avoid having to track extra state in the operator. I don't think
that's
a
major issue, but what is the use case for processing a control tuple
within
the window?
Thomas
On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni <
[email protected]>
wrote:
For the use cases you mentioned, I think 1) and 2) are more likely to
be controlled directly by the application, 3) and 4) are more likely
going to be triggered externally and directly handled by the engine
and 3) is already being implemented that way (apexcore-163).
The control tuples emitted by an operator would be sent to all
downstream partitions isn't it and that would be the chief
distinction
compared to data (apart from the payload) which would get partitioned
under normal circumstances. It would also be guaranteed that
downstream partitions will receive control tuples only after the data
that was sent before it so we could send it immediately when it is
emitted as opposed to window boundaries.
However during unification it is important to know if these control
tuples have been received from all upstream partitions before
proceeding with a control operation. One could wait till end of the
window but that introduces a delay however small, I would like to add
to the proposal that the platform only hand over the control tuple to
the unifier when it has been received from all upstream partitions
much like how end window is processed but not wait till the actual
end
of the window.
Regd your concern about idempotency, we typically care about
idempotency at a window level and doing the above will still allow
the
operators to preserve that easily.
Thanks
On Jun 24, 2016, at 11:22 AM, David Yan <[email protected]>
wrote:
Hi all,
I would like to propose a new feature to the Apex core engine -- the
support of custom control tuples. Currently, we have control tuples
such
as
BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have
the
support for applications to insert their own control tuples. The way
currently to get around this is to use data tuples and have a
separate
port
for such tuples that sends tuples to all partitions of the
downstream
operators, which is not exactly developer friendly.
We have already seen a number of use cases that can use this
feature:
1) Batch support: We need to tell all operators of the physical DAG
when
a
batch starts and ends, so the operators can do whatever that is
needed
upon
the start or the end of a batch.
2) Watermark: To support the concepts of event time windowing, the
watermark control tuple is needed to tell which windows should be
considered late.
3) Changing operator properties: We do have the support of changing
operator properties on the fly, but with a custom control tuple, the
command to change operator properties can be window aligned for all
partitions and also across the DAG.
4) Recording tuples: Like changing operator properties, we do have
this
support now but only at the individual physical operator level, and
without
control of which window to record tuples for. With a custom control
tuple,
because a control tuple must belong to a window, all operators in
the
DAG
can start (and stop) recording for the same windows.
I can think of two options to achieve this:
1) new custom control tuple type that takes user's serializable
object.
2) piggy back the current BEGIN_WINDOW and END_WINDOW control
tuples.
Please provide your feedback. Thank you.
David