+1 to manage propagation at an operator level. An operator is either
control tuple aware and needs to manage how control tuples are routed
from input ports to output ports or it is not. In the later case it does
not matter how many input and output ports the operator has and it is
the Apex platform responsibility to route control tuples. I don't see a
use case where an operator that is not aware of a control tuple needs to
manage one or more input ports (or similar output ports) differently
than others.
In general, an operator is aware only of a specific control tuple(s)
(for example end of batch or end of file) and for a control tuples that
it was not enabled for, the behavior should be exactly the same as if
the operator is not control tuple aware, meaning that those control
tuples should be propagated from input ports to output ports by the
platform. There should be an ability to let the platform know what
control tuples an operator is aware of and can handle. This can be done
both by API call and an annotation.
Thank you,
Vlad
On 1/5/17 13:04, Bhupesh Chawda wrote:
Agreed Thomas.
I was referring to the persona of the operator developer. The user of the
operator would not be doing anything related to the propagation of control
tuples. Actually, the behavior of the operator wrt. propagation of control
tuples would be part of the operator documentation.
Also, we are providing options for the developer to route the flow of
control tuples in code during the development of the operator. The
annotations would actually help achieve it in a easier way.
~ Bhupesh
On Jan 5, 2017 21:40, "Thomas Weise" <[email protected]> wrote:
I think it is important to be clear on the roles with regard to this
functionality. The user of the operator should not have to do anything to
get it to work. So while I suggested to consider attributes earlier, there
should not be any need for the user to set those. The operator needs to
work as is.
The persona concerned with propagation of control tuples is the operator
developer. I think the clear way for the operator developer to override the
propagation behavior is in code and if that is possible there is no need
for other things such as attributes or other port level settings.
Thomas
On Wed, Jan 4, 2017 at 10:20 PM, Bhupesh Chawda <[email protected]>
wrote:
I think we all agree on the use case for selective propagation. The
question is about where to have the control - at the operator level or at
the port level.
For this ability, we have the following options:
1. Operator disables the propagation on selected output ports. Other
output ports propagate by default.
2. Operator disables propagation for the entire operator (by means of
an
attribute). Operator developer explicitly emits the received control
tuples
on selected output ports.
If the decision is to completely block the propagation, then Option 2 is
easier to use as just an attribute needs to be set, as opposed to Option 1
where user needs to set the annotation on each output port.
However, if selective propagation is needed, Option 1 would just need the
user to disable propagation on certain ports; rest are propagated by
default, while Option 2 requires the user to explicitly emit the control
tuples.
~ Bhupesh
On Thu, Jan 5, 2017 at 3:46 AM, Thomas Weise <[email protected]> wrote:
Yes, I think that for any of these cases the operator developer will
turn
of implicit propagation for the operator and then write the code to
route
or create control tuples as needed.
Thomas
On Wed, Jan 4, 2017 at 12:59 PM, Amol Kekre <[email protected]>
wrote:
I agree that by default the propagation must be implicit, i.e. if the
operator does nothing, the control tuple propagates. I do think users
should have control on deciding to "not propagate" or "create new" and
in
these cases they would need to do something explicit (override)?
The following cases come to mind
1. Sole consumer of a particular control signal (for example end of
file)
2. Creator of a particular control signal (start of file, or a signal
to
pause on something etc.)
3. One port on a data pipeline and other port for meta-data pipeline
In the above cases emit will be decided on an output port. #1 is only
place
where all output ports will disable the tuple, #2 and #3 most likely
will
be selective.
Thks
Amol
On Wed, Jan 4, 2017 at 12:25 PM, Thomas Weise <[email protected]> wrote:
I think there is (1) implicit propagation just like other control
tuples
where the operator code isn't involved and (2) where the operator
developer
wants to decide how control tuples are created or routed and will
receive
and emit them on the output ports as desired.
I don't see a use case for hybrid approaches? Maybe propagation does
not
need to be tied to ports at all, maybe just by annotation at the
operator
level?
Thomas
On Wed, Jan 4, 2017 at 10:59 AM, Bhupesh Chawda <
[email protected]
wrote:
Wouldn't having this with output ports give a finer control on the
propagation of control tuples?
We might have an operator with two output ports each of which
creates
two
different pipelines downstream. We would be able to say that one
pipeline
gets the control tuples and the other doesn't.
~ Bhupesh
On Jan 4, 2017 11:55 PM, "Thomas Weise" <[email protected]> wrote:
I'm referring to the operator that needs to make the decision to
propagate
or not. The tuples come from an input port, so it seems
appropriate
to
say
"don't propagate control tuples from this port". No matter how
many
output
ports there are.
Output ports are there for an operator to emit new tuples, in the
case
you
are discussing you don't emit new control tuples.
Thomas
On Wed, Jan 4, 2017 at 9:39 AM, Bhupesh Chawda <
[email protected]>
wrote:
Hi Thomas,
Are you suggesting an attribute on the input port for
controlling
the
propagation of control tuples to downstream operators?
I think it should be better to do it on the output port since
the
decision
to block the propagation will be made at the upstream operator
rather
than
at the downstream.
Also, we need another way of controlling the propagation at run
time
and
hence I was thinking about the method call on the output port,
in
addition
to the annotation on the output port (which is the static way).
Please correct me if I have misunderstood your question.
~ Bhupesh
On Wed, Jan 4, 2017 at 7:26 PM, Thomas Weise <[email protected]>
wrote:
Wouldn't it be more intuitive to control this with an
attribute
on
the
input port?
On Tue, Jan 3, 2017 at 11:06 PM, Bhupesh Chawda <
[email protected]
wrote:
Hi Pramod,
I was thinking of a method setPropagateControlTuples(boolean
propagate)
on
the output port of the operator.
The operator could disable this in the code at any point of
time.
Note however that this is to block the propagation of
control
tuples
from
upstream. Any control tuples emitted explicitly by the
operator
would
still
be emitted and sent to the downstream operators.
Please see
https://github.com/apache/apex-core/pull/440/files#diff-
8aa0ca1a3e645fa60e9b376c118c00a3R68
in the PR.
~ Bhupesh
On Wed, Jan 4, 2017 at 6:53 AM, Pramod Immaneni <
[email protected]>
wrote:
2 sounds good. Have you thought about what the method
would
look
like.
On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <
[email protected]
wrote:
Yes, that makes sense.
We have following options:
1. Make the annotation false by default and force the
user
to
forward
the
control tuples explicitly.
2. Annotation is true by default and static way of
blocking
stays
as
it
is.
We provide another way for blocking programmatically,
perhaps
by
means
of
another method call on the port.
~ Bhupesh
On Dec 30, 2016 00:09, "Pramod Immaneni" <
[email protected]
wrote:
Bhupesh,
Annotation seems like a static way to stop
propagation.
Give
these
are
programmatically generated I would think the operators
should
be
able
to
stop (consume without propagating) programmatically as
well.
Thanks
On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <
[email protected]
wrote:
Thanks Vlad, I am trying out the approach you
mentioned
regarding
having
another interface which allows sinks to put a
control
tuple.
Regarding the delivery of control tuples, here is
what
I
am
planning
to
do:
All the control tuples which are emitted in a
particular
window
are
delivered after all the data tuples have been
delivered
to
the
respective
ports, but before the endWindow() call. The operator
can
then
process
the
control tuples in that window and can do any
finalization
in
the
end
window
call. There will be no delivery of control tuples
after
endWindow()
and
before the next beginWindow() call.
For handling the propagation of control tuples
further
in
the
dag,
we
are
planning to have an annotation on the Output Port of
the
operator
which
would be true by default.
@OutputPortFieldAnnotation(propogateControlTuples =
false).
~ Bhupesh
On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <
[email protected]
wrote:
Custom control tuples are control tuples emitted
by
an
operator
itself
and
not by the platform. Prior to the introduction of
the
custom
control
tuples, only Apex engine itself puts control
tuples
into
various
sinks,
so
the engine created necessary Tuple objects with
the
corresponding
type
prior to calling Sink.put().
Not all sinks need to be changed. Only control
tuple
aware
sinks
should
provide such functionality. In the case there is a
lot
of
code
duplication,
please create an abstract class, that other
control
aware
sinks
will
extend
from.
Thank you,
Vlad
On 12/23/16 06:24, Bhupesh Chawda wrote:
Hi Vlad,
Thanks for the pointer on delegating the wrapping
of
the
user
tuple
to
the
control port. I was trying this out today.
The problem I see us if we introduce a
putControlTuple()
method
in
Sink,
then a lot of the existing sinks would change.
Also
the
changes
seemed
redundant as, the existing control tuples already
use
the
put()
method
of
sinks. So why do something special for custom
control
tuples?
The only aspect in which the custom control
tuples
are
different
is
that
these will be generated by the user and will
actually
be
delivered
to
the
ports in a different order. Perhaps we should be
able
to
use
the
existing
flow. The only problems as outlined before seem
to
be
identification
of
the
user tuple as a control tuple.
~ Bhupesh
On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
[email protected]
wrote:
Why is it necessary to wrap in the OutputPort?
Can't
it
be
delegated
to
a
Sink by introducing new putControlTuple method?
Thank you,
Vlad
On 12/21/16 22:10, Bhupesh Chawda wrote:
Hi Vlad,
The problem in using the Tuple class as the
wrapper
is
that
the
Ports
belong to the API and we want to wrap the
payload
object
of
the
control
tuple into the Tuple class which is not part of
the
API.
The output port will just get the payload of
the
user
control
tuple.
For
example, if the user emits a Long, as a control
tuple,
the
payload
object
will just be a Long object.
It is necessary to bundle this Long into some
recognizable
object
so
that
the BufferServerPublisher knows that this is a
Control
tuple
and
not a
regular tuple and serialize it accordingly. It
is
therefore
necessary
that
the tuple be part of some known hierarchy so
that
can
be
distinguished
from
other payload tuples. Let us call this class
ControlTupleInterface.
Note
that this needs to be done before the tuple is
inserted
into
the
sink
which
is done in the port objects. Once the tuple is
inserted
into
the
sink,
it
would seem just like any other payload tuple
and
cannot
be
distinguished.
For this reason, I had something like the
following
in
API:
package com.datatorrent.api;
public class ControlTupleInterface
{
Object payload; // User control tuple
payload. A
Long()
for
example.
UUID id; // Unique Id to de-duplicate in
downstream
operators
}
Regarding your suggestion on using the Tuple
class
as
the
wrapper
for
the
control tuple payload, let me mention the
current
scenario
flow
to
make
the
discussion easier:
We have a Tuple class in buffer server which is
responsible
for
serializing
the user control tuple bundling together a
message
type:
CUSTOM_CONTROL_TUPLE_VALUE.
*com.datatorrent.bufferserver.packet.Tuple|--
com.datatorrent.bufferserver.
packet.CustomControlTuple*
We have another Tuple class in Stram which
helps
the
BufferServerSubscriber
to de-serialize the serialized tuples. We
should
have
CustomControlTuple
in
stram as follows:
*com.datatorrent.stram.tuple.Tuple|--
com.datatorrent.stram.tuple.
CustomControlTuple*This
will
have a
field
for
user control payload.
I think we should not expose the Tuple class in
stram
to
the
API.
That
was
the main reason I introduced another
class/interface
ControlTupleInterface
as described above.
Regarding, adding methods to DefaultInputPort
and
DefaultOutputPort, I
think error detection would not be early enough
if
the
control
tuple
is
sent very late in the processing :-)
Extending the ports to ControlTupleAware*
should
help
in
this
case.
However, we still need to see if there are any
downsides
on
doing
this.
Thanks.
~ Bhupesh
On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <
[email protected]>
wrote:
Hi Bhupesh,
it should not be a CustomWrapper. The wrapper
object
should
be
CustomControlTuple that extends Tuple. There
is
already
code
that
checks
for Tuple instance. The "unWrap" name is
misleading,
IMO.
It
should
be
something like customControlTuple.getPayload()
or
customControlTuple.getAttachment(). In the
emitControl(),
create
new
CustomControlTuple using provided payload as
one
of
arguments.
It
may
still
be good to use common parent other than Object
for
control
tuple
payload
class hierarchy.
I don't understand how adding more methods to
the
Default
implementation
will help with early error detection unless
application
or
operator
that
relies on the custom control tuple
functionality
explicitly
checks
for
the
platform version at run-time or tries to emit
a
control
tuple
just
to
check
that such functionality is supported by the
platform.
Thank you,
Vlad
On 12/21/16 04:58, Bhupesh Chawda wrote:
Hi Vlad.
Yes, the API should not change. We can take
an
Object
instead,
and
later
wrap it into the required class.
Our InputPort.put and emitControl method
would
look
something
like
the
following where we handle the wrapping and
unwrapping
internally.
public void put(T tuple)
{
if (tuple instanceof CustomWrapper) {
processControl(tuple.unWrap());
} else {
process(tuple)
}
}
emitControl(Object tuple)
{
sink.put(CustomWrapper.wrap(tuple));
}
Regarding the compatibility issue, I think we
have
two
ways
of
doing
it:
1. Extend DefaultInputPort and
DefaultOutputPort
and
create
ControlAwareInput and
ControlAwareOutput
out
of
it.
This
might
require us
to additionally handle specific cases
when
non-compatible
ports
(ControlAwareOutput and DefaultInput,
for
example)
are
connected to
each
other in user apps.
2. Add the additional methods in the
existing
Default
implementations.
IMO, both of these would help in early error
detection.
~ Bhupesh
On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <
[email protected]>
wrote:
A wrapper class is required for the control
tuples
delivery,
but
Port/Operator API should use Control Tuple
payload
object
only.
Implementation of the wrapper class may
change
from
version
to
version,
but
API should not be affected by the change.
I guess, assumption is that default input
and
output
port
will
be
extended
to provide support for the control tuples.
This
may
cause
some
backward
compatibility issues. Consider scenario when
a
newer
version
of
Malhar
that
relies on EOF control tuple is deployed into
older
version
of
core
that
does not support control tuples. In such
scenario,
error
will
be
raised
only when an operator tries to emit EOF
control
tuple
at
the
end
of a
job.
Introducing control tuple aware ports solve
the
early
error
detection.
It
will require some operators to be modified
to
use
control
tuple
aware
ports, but such change may help to
distinguish
control
tuple
aware
operators from their old versions.
Vlad
On 12/20/16 04:09, Bhupesh Chawda wrote:
I investigated this and seems like it is
better
to
have a
wrapper
class
for
the user object.
This would serve 2 purposes:
1. Allow us to distinguish a custom
control
tuple
from
other
payload
tuples.
2. For the same control tuple
received
from
different
upstream
partitions, we would have some
mechanism
to
distinguish
between
the
two in
order to identify duplicates.
Additionally, the wrapper class needs to be
part
of
the
API
as
DefaultOutputPort needs to know about it,
before
putting
it
into
the
sink.
We can make sure that the user is not able
to
extend
or
modify
this
class
in any manner.
~ Bhupesh
On Mon, Dec 19, 2016 at 12:18 PM, David Yan
<
[email protected]
wrote:
This C type parameter is going to fix the
control
tuple
type
at
compile
time and this is actually not what we want.
Note
that
the
operator
may
receive or emit multiple different control
tuple
types.
David
On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <
[email protected]
wrote:
We do not need to create an interface for
data
emitted
through
emitControl or processed through
processControl.
Internally
we
could
wrap the user object in ControlTuple. you
can
add
type
parameter
for
control tuple object on ports.
DefaultInputPort<D,C>
D is the data type and C is the control
tuple
type
for
better
error
catching at compile phase.
- Tushar.
On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh
Chawda <
[email protected]
wrote:
Agreed Vlad and David.
I am just suggesting there should be a
wrapper
for
the
user
object.
It
can
be a marker interface and we can call it
something
else
like
"CustomControl".
The user object will be wrapped in
another
class
"ControlTuple"
which
traverses the BufferServer and will
perhaps
be
extended
from
the
packet/Tuple class. This class will not
be
exposed
to
the
user.
~ Bhupesh
On Sat, Dec 17, 2016 at 4:11 AM, Vlad
Rozov
<
[email protected]>
wrote:
I agree with David. Payload of the
control
tuple
is
in
the
userObject
and
operators/ports don't need to be exposed
to
the
implementation
of
the
ControlTuple class. With the proposed
interface
operators
developers
are
free to extend ControlTuple further and
I
don't
think
that
such
capability
needs to be provided. The wrapping in
the
ControlTuple
class
is
necessary
and most likely ControlTuple needs to be
extended
from
the
buffer
server
Tuple. It may be good to have a common
parent
other
than
Object
for
all
user payloads, but it may be a marker
interface
as
well.
Thank you,
Vlad
On 12/16/16 09:59, Bhupesh Chawda wrote:
Hi David,
Actually, I was thinking of another API
class
called
ControlTuple,
different from the actual tuple class
in
buffer
server
or
stram.
This could serve as a way for the
Buffer
server
publisher
to
understand
that it is a control tuple and needs to
be
wrapped
differently.
~ Bhupesh
On Dec 16, 2016 22:28, "David Yan" <
[email protected]>
wrote:
// DefaultInputPort
public void
processControl(ControlTuple
tuple)
{
// Default Implementation to
avoid
need
to
implement
it in
all
implementations
}
{code}
{code}
// DefaultOutputPort
public void
emitControl(ControlTuple
tuple)
{
}
I think we don't need to expose the
ControlTuple
class
to
the
operator
developers because the window ID is
just
the
current
window
ID
when
these
methods are called. How about making
them
just
Object?
We
also
need to
provide the way for the user to specify
custom
serializer
for
the
control
tuple.
David
On Thu, Dec 15, 2016 at 12:43 AM,
Bhupesh
Chawda
<
[email protected]
wrote:
Hi All,
Here are the initial interfaces:
{code}
// DefaultInputPort
public void
processControl(ControlTuple
tuple)
{
// Default Implementation to
avoid
need
to
implement
it
in
all
implementations
}
{code}
{code}
// DefaultOutputPort
public void
emitControl(ControlTuple
tuple)
{
}
{code}
We have an option to add these methods
to
the
interfaces
-
InputPort
and
OutputPort; But these would not be
backward
compatible
and
also
not
consistent with the current
implementation
of
basic
data
tuple
flow
(as
with process() and emit()).
We also need to expose an interface /
class
for
users
to
wrap
their
object
and emit downstream. This should be
part
of
API.
{code}
public class ControlTuple extends
Tuple
{
Object userObject;
public ControlTuple(long
windowId,
Object
userObject)
{
//
}
}
{code}
The emitted tuples would traverse the
same
flow
as
with
other
control
tuples. The plan is to intercept the
control
tuples
in
GenericNode
and
use
the Reservior to emit the control
tuples
at
the
end
of
the
window.
GenericNode seems to be the best place
to
buffer
incoming
custom
control
tuples without delivering them
immediately
to
the
operator
port.
Once
the
end of the window is reached, we plan to
use
the
reservoir
sink
to
push
them to the port. This is different
behavior
than
any
other
control
tuple
where we are changing the order of
tuples
in
the
stream.
The
custom
control
tuples will be buffered and not
delivered
to
the
ports
until
the
end
of
the
window.
To accomplish this, we need to have a
public
method
in
SweepableReservoir
which allows to put a tuple back in
the
sink
of
the
reservoir.
~ Bhupesh