A wrapper class is required for the control tuples delivery, but
Port/Operator API should use Control Tuple payload object only.
Implementation of the wrapper class may change from version to version,
but API should not be affected by the change.
I guess, assumption is that default input and output port will be
extended to provide support for the control tuples. This may cause some
backward compatibility issues. Consider scenario when a newer version of
Malhar that relies on EOF control tuple is deployed into older version
of core that does not support control tuples. In such scenario, error
will be raised only when an operator tries to emit EOF control tuple at
the end of a job. Introducing control tuple aware ports solve the early
error detection. It will require some operators to be modified to use
control tuple aware ports, but such change may help to distinguish
control tuple aware operators from their old versions.
Vlad
On 12/20/16 04:09, Bhupesh Chawda wrote:
I investigated this and seems like it is better to have a wrapper class for
the user object.
This would serve 2 purposes:
1. Allow us to distinguish a custom control tuple from other payload
tuples.
2. For the same control tuple received from different upstream
partitions, we would have some mechanism to distinguish between the two in
order to identify duplicates.
Additionally, the wrapper class needs to be part of the API as
DefaultOutputPort needs to know about it, before putting it into the sink.
We can make sure that the user is not able to extend or modify this class
in any manner.
~ Bhupesh
On Mon, Dec 19, 2016 at 12:18 PM, David Yan <[email protected]> wrote:
This C type parameter is going to fix the control tuple type at compile
time and this is actually not what we want. Note that the operator may
receive or emit multiple different control tuple types.
David
On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <[email protected]> wrote:
We do not need to create an interface for data emitted through
emitControl or processed through processControl. Internally we could
wrap the user object in ControlTuple. you can add type parameter for
control tuple object on ports.
DefaultInputPort<D,C>
D is the data type and C is the control tuple type for better error
catching at compile phase.
- Tushar.
On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda <[email protected]>
wrote:
Agreed Vlad and David.
I am just suggesting there should be a wrapper for the user object. It
can
be a marker interface and we can call it something else like
"CustomControl".
The user object will be wrapped in another class "ControlTuple" which
traverses the BufferServer and will perhaps be extended from the
packet/Tuple class. This class will not be exposed to the user.
~ Bhupesh
On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov <[email protected]>
wrote:
I agree with David. Payload of the control tuple is in the userObject
and
operators/ports don't need to be exposed to the implementation of the
ControlTuple class. With the proposed interface operators developers are
free to extend ControlTuple further and I don't think that such
capability
needs to be provided. The wrapping in the ControlTuple class is
necessary
and most likely ControlTuple needs to be extended from the buffer server
Tuple. It may be good to have a common parent other than Object for all
user payloads, but it may be a marker interface as well.
Thank you,
Vlad
On 12/16/16 09:59, Bhupesh Chawda wrote:
Hi David,
Actually, I was thinking of another API class called ControlTuple,
different from the actual tuple class in buffer server or stram.
This could serve as a way for the Buffer server publisher to understand
that it is a control tuple and needs to be wrapped differently.
~ Bhupesh
On Dec 16, 2016 22:28, "David Yan" <[email protected]> wrote:
// DefaultInputPort
public void processControl(ControlTuple tuple)
{
// Default Implementation to avoid need to implement it in all
implementations
}
{code}
{code}
// DefaultOutputPort
public void emitControl(ControlTuple tuple)
{
}
I think we don't need to expose the ControlTuple class to the operator
developers because the window ID is just the current window ID when
these
methods are called. How about making them just Object? We also need to
provide the way for the user to specify custom serializer for the
control
tuple.
David
On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda <
[email protected]
wrote:
Hi All,
Here are the initial interfaces:
{code}
// DefaultInputPort
public void processControl(ControlTuple tuple)
{
// Default Implementation to avoid need to implement it in all
implementations
}
{code}
{code}
// DefaultOutputPort
public void emitControl(ControlTuple tuple)
{
}
{code}
We have an option to add these methods to the interfaces - InputPort
and
OutputPort; But these would not be backward compatible and also not
consistent with the current implementation of basic data tuple flow
(as
with process() and emit()).
We also need to expose an interface / class for users to wrap their
object
and emit downstream. This should be part of API.
{code}
public class ControlTuple extends Tuple
{
Object userObject;
public ControlTuple(long windowId, Object userObject)
{
//
}
}
{code}
The emitted tuples would traverse the same flow as with other control
tuples. The plan is to intercept the control tuples in GenericNode and
use
the Reservior to emit the control tuples at the end of the window.
GenericNode seems to be the best place to buffer incoming custom
control
tuples without delivering them immediately to the operator port. Once
the
end of the window is reached, we plan to use the reservoir sink to
push
them to the port. This is different behavior than any other control
tuple
where we are changing the order of tuples in the stream. The custom
control
tuples will be buffered and not delivered to the ports until the end
of
the
window.
To accomplish this, we need to have a public method in
SweepableReservoir
which allows to put a tuple back in the sink of the reservoir.
~ Bhupesh