2 sounds good. Have you thought about what the method would look like.

On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <bhup...@datatorrent.com>
wrote:

> Yes, that makes sense.
> We have following options:
> 1. Make the annotation false by default and force the user to forward the
> control tuples explicitly.
> 2. Annotation is true by default and static way of blocking stays as it is.
> We provide another way for blocking programmatically, perhaps by means of
> another method call on the port.
>
> ~ Bhupesh
>
> On Dec 30, 2016 00:09, "Pramod Immaneni" <pra...@datatorrent.com> wrote:
>
> > Bhupesh,
> >
> > Annotation seems like a static way to stop propagation. Give these are
> > programmatically generated I would think the operators should be able to
> > stop (consume without propagating) programmatically as well.
> >
> > Thanks
> >
> > On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <bhup...@datatorrent.com
> >
> > wrote:
> >
> > > Thanks Vlad, I am trying out the approach you mentioned regarding
> having
> > > another interface which allows sinks to put a control tuple.
> > >
> > > Regarding the delivery of control tuples, here is what I am planning to
> > do:
> > > All the control tuples which are emitted in a particular window are
> > > delivered after all the data tuples have been delivered to the
> respective
> > > ports, but before the endWindow() call. The operator can then process
> the
> > > control tuples in that window and can do any finalization in the end
> > window
> > > call. There will be no delivery of control tuples after endWindow() and
> > > before the next beginWindow() call.
> > >
> > > For handling the propagation of control tuples further in the dag, we
> are
> > > planning to have an annotation on the Output Port of the operator which
> > > would be true by default.
> > > @OutputPortFieldAnnotation(propogateControlTuples = false).
> > >
> > > ~ Bhupesh
> > >
> > >
> > > On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <v.ro...@datatorrent.com>
> > > wrote:
> > >
> > > > Custom control tuples are control tuples emitted by an operator
> itself
> > > and
> > > > not by the platform. Prior to the introduction of the custom control
> > > > tuples, only Apex engine itself puts control tuples into various
> sinks,
> > > so
> > > > the engine created necessary Tuple objects with the corresponding
> type
> > > > prior to calling Sink.put().
> > > >
> > > > Not all sinks need to be changed. Only control tuple aware sinks
> should
> > > > provide such functionality. In the case there is a lot of code
> > > duplication,
> > > > please create an abstract class, that other control aware sinks will
> > > extend
> > > > from.
> > > >
> > > > Thank you,
> > > >
> > > > Vlad
> > > >
> > > >
> > > > On 12/23/16 06:24, Bhupesh Chawda wrote:
> > > >
> > > >> Hi Vlad,
> > > >>
> > > >> Thanks for the pointer on delegating the wrapping of the user tuple
> to
> > > the
> > > >> control port. I was trying this out today.
> > > >> The problem I see us if we introduce a putControlTuple() method in
> > Sink,
> > > >> then a lot of the existing sinks would change. Also the changes
> seemed
> > > >> redundant as, the existing control tuples already use the put()
> method
> > > of
> > > >> sinks. So why do something special for custom control tuples?
> > > >>
> > > >> The only aspect in which the custom control tuples are different is
> > that
> > > >> these will be generated by the user and will actually be delivered
> to
> > > the
> > > >> ports in a different order. Perhaps we should be able to use the
> > > existing
> > > >> flow. The only problems as outlined before seem to be identification
> > of
> > > >> the
> > > >> user tuple as a control tuple.
> > > >>
> > > >> ~ Bhupesh
> > > >>
> > > >>
> > > >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
> v.ro...@datatorrent.com
> > >
> > > >> wrote:
> > > >>
> > > >> Why is it necessary to wrap in the OutputPort? Can't it be delegated
> > to
> > > a
> > > >>> Sink by introducing new putControlTuple method?
> > > >>>
> > > >>> Thank you,
> > > >>>
> > > >>> Vlad
> > > >>>
> > > >>>
> > > >>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> > > >>>
> > > >>> Hi Vlad,
> > > >>>>
> > > >>>> The problem in using the Tuple class as the wrapper is that the
> > Ports
> > > >>>> belong to the API and we want to wrap the payload object of the
> > > control
> > > >>>> tuple into the Tuple class which is not part of the API.
> > > >>>>
> > > >>>> The output port will just get the payload of the user control
> tuple.
> > > For
> > > >>>> example, if the user emits a Long, as a control tuple, the payload
> > > >>>> object
> > > >>>> will just be a Long object.
> > > >>>>
> > > >>>> It is necessary to bundle this Long into some recognizable object
> so
> > > >>>> that
> > > >>>> the BufferServerPublisher knows that this is a Control tuple and
> > not a
> > > >>>> regular tuple and serialize it accordingly. It is therefore
> > necessary
> > > >>>> that
> > > >>>> the tuple be part of some known hierarchy so that can be
> > distinguished
> > > >>>> from
> > > >>>> other payload tuples. Let us call this class
> ControlTupleInterface.
> > > Note
> > > >>>> that this needs to be done before the tuple is inserted into the
> > sink
> > > >>>> which
> > > >>>> is done in the port objects. Once the tuple is inserted into the
> > sink,
> > > >>>> it
> > > >>>> would seem just like any other payload tuple and cannot be
> > > >>>> distinguished.
> > > >>>>
> > > >>>> For this reason, I had something like the following in API:
> > > >>>>
> > > >>>> package com.datatorrent.api;
> > > >>>> public class ControlTupleInterface
> > > >>>> {
> > > >>>>     Object payload; // User control tuple payload. A Long() for
> > > example.
> > > >>>>     UUID id;  // Unique Id to de-duplicate in downstream operators
> > > >>>> }
> > > >>>>
> > > >>>> Regarding your suggestion on using the Tuple class as the wrapper
> > for
> > > >>>> the
> > > >>>> control tuple payload, let me mention the current scenario flow to
> > > make
> > > >>>> the
> > > >>>> discussion easier:
> > > >>>>
> > > >>>> We have a Tuple class in buffer server which is responsible for
> > > >>>> serializing
> > > >>>> the user control tuple bundling together a message type:
> > > >>>> CUSTOM_CONTROL_TUPLE_VALUE.
> > > >>>>
> > > >>>>
> > > >>>> *com.datatorrent.bufferserver.packet.Tuple|--
> > > >>>> com.datatorrent.bufferserver.packet.CustomControlTuple*
> > > >>>> We have another Tuple class in Stram which helps the
> > > >>>> BufferServerSubscriber
> > > >>>> to de-serialize the serialized tuples. We should have
> > > CustomControlTuple
> > > >>>> in
> > > >>>> stram as follows:
> > > >>>>
> > > >>>>
> > > >>>> *com.datatorrent.stram.tuple.Tuple|--
> > > >>>> com.datatorrent.stram.tuple.CustomControlTuple*This will have a
> > field
> > > >>>> for
> > > >>>>
> > > >>>> user control payload.
> > > >>>>
> > > >>>> I think we should not expose the Tuple class in stram to the API.
> > That
> > > >>>> was
> > > >>>> the main reason I introduced another class/interface
> > > >>>> ControlTupleInterface
> > > >>>> as described above.
> > > >>>>
> > > >>>> Regarding, adding methods to DefaultInputPort and
> > DefaultOutputPort, I
> > > >>>> think error detection would not be early enough if the control
> tuple
> > > is
> > > >>>> sent very late in the processing :-)
> > > >>>> Extending the ports to ControlTupleAware* should help in this
> case.
> > > >>>> However, we still need to see if there are any downsides on doing
> > > this.
> > > >>>>
> > > >>>> Thanks.
> > > >>>>
> > > >>>> ~ Bhupesh
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <
> > v.ro...@datatorrent.com>
> > > >>>> wrote:
> > > >>>>
> > > >>>> Hi Bhupesh,
> > > >>>>
> > > >>>>> it should not be a CustomWrapper.  The wrapper object should be
> > > >>>>> CustomControlTuple that extends Tuple. There is already code that
> > > >>>>> checks
> > > >>>>> for Tuple instance. The "unWrap" name is misleading, IMO. It
> should
> > > be
> > > >>>>> something like customControlTuple.getPayload() or
> > > >>>>> customControlTuple.getAttachment(). In the emitControl(), create
> > new
> > > >>>>> CustomControlTuple using provided payload as one of arguments. It
> > may
> > > >>>>> still
> > > >>>>> be good to use common parent other than Object for control tuple
> > > >>>>> payload
> > > >>>>> class hierarchy.
> > > >>>>>
> > > >>>>> I don't understand how adding more methods to the Default
> > > >>>>> implementation
> > > >>>>> will help with early error detection unless application or
> operator
> > > >>>>> that
> > > >>>>> relies on the custom control tuple functionality explicitly
> checks
> > > for
> > > >>>>> the
> > > >>>>> platform version at run-time or tries to emit a control tuple
> just
> > to
> > > >>>>> check
> > > >>>>> that such functionality is supported by the platform.
> > > >>>>>
> > > >>>>> Thank you,
> > > >>>>>
> > > >>>>> Vlad
> > > >>>>>
> > > >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote:
> > > >>>>>
> > > >>>>> Hi Vlad.
> > > >>>>>
> > > >>>>>> Yes, the API should not change. We can take an Object instead,
> and
> > > >>>>>> later
> > > >>>>>> wrap it into the required class.
> > > >>>>>>
> > > >>>>>> Our InputPort.put and emitControl method would look something
> like
> > > the
> > > >>>>>> following where we handle the wrapping and unwrapping
> internally.
> > > >>>>>>
> > > >>>>>> public void put(T tuple)
> > > >>>>>> {
> > > >>>>>>      if (tuple instanceof CustomWrapper) {
> > > >>>>>>        processControl(tuple.unWrap());
> > > >>>>>>      }  else {
> > > >>>>>>        process(tuple)
> > > >>>>>>      }
> > > >>>>>> }
> > > >>>>>>
> > > >>>>>> emitControl(Object tuple)
> > > >>>>>> {
> > > >>>>>>      sink.put(CustomWrapper.wrap(tuple));
> > > >>>>>> }
> > > >>>>>>
> > > >>>>>> Regarding the compatibility issue, I think we have two ways of
> > doing
> > > >>>>>> it:
> > > >>>>>>
> > > >>>>>>       1. Extend DefaultInputPort and DefaultOutputPort and
> create
> > > >>>>>>       ControlAwareInput and ControlAwareOutput out of it. This
> > might
> > > >>>>>> require us
> > > >>>>>>       to additionally handle specific cases when non-compatible
> > > ports
> > > >>>>>>       (ControlAwareOutput and DefaultInput, for example) are
> > > >>>>>> connected to
> > > >>>>>> each
> > > >>>>>>       other in user apps.
> > > >>>>>>       2. Add the additional methods in the existing Default
> > > >>>>>> implementations.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> IMO, both of these would help in early error detection.
> > > >>>>>>
> > > >>>>>> ~ Bhupesh
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <
> > > v.ro...@datatorrent.com>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>> A wrapper class is required for the control tuples delivery, but
> > > >>>>>>
> > > >>>>>> Port/Operator API should use Control Tuple payload object only.
> > > >>>>>>> Implementation of the wrapper class may change from version to
> > > >>>>>>> version,
> > > >>>>>>> but
> > > >>>>>>> API should not be affected by the change.
> > > >>>>>>>
> > > >>>>>>> I guess, assumption is that default input and output port will
> be
> > > >>>>>>> extended
> > > >>>>>>> to provide support for the control tuples. This may cause some
> > > >>>>>>> backward
> > > >>>>>>> compatibility issues. Consider scenario when a newer version of
> > > >>>>>>> Malhar
> > > >>>>>>> that
> > > >>>>>>> relies on EOF control tuple is deployed into older version of
> > core
> > > >>>>>>> that
> > > >>>>>>> does not support control tuples. In such scenario, error will
> be
> > > >>>>>>> raised
> > > >>>>>>> only when an operator tries to emit EOF control tuple at the
> end
> > > of a
> > > >>>>>>> job.
> > > >>>>>>> Introducing control tuple aware ports solve the early error
> > > >>>>>>> detection.
> > > >>>>>>> It
> > > >>>>>>> will require some operators to be modified to use control tuple
> > > aware
> > > >>>>>>> ports, but such change may help to distinguish control tuple
> > aware
> > > >>>>>>> operators from their old versions.
> > > >>>>>>>
> > > >>>>>>> Vlad
> > > >>>>>>>
> > > >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote:
> > > >>>>>>>
> > > >>>>>>> I investigated this and seems like it is better to have a
> wrapper
> > > >>>>>>> class
> > > >>>>>>>
> > > >>>>>>> for
> > > >>>>>>>> the user object.
> > > >>>>>>>> This would serve 2 purposes:
> > > >>>>>>>>
> > > >>>>>>>>        1. Allow us to distinguish a custom control tuple from
> > > other
> > > >>>>>>>> payload
> > > >>>>>>>>        tuples.
> > > >>>>>>>>        2. For the same control tuple received from different
> > > >>>>>>>> upstream
> > > >>>>>>>>
> > > >>>>>>>>        partitions, we would have some mechanism to distinguish
> > > >>>>>>>> between
> > > >>>>>>>> the
> > > >>>>>>>> two in
> > > >>>>>>>>        order to identify duplicates.
> > > >>>>>>>>
> > > >>>>>>>> Additionally, the wrapper class needs to be part of the API as
> > > >>>>>>>> DefaultOutputPort needs to know about it, before putting it
> into
> > > the
> > > >>>>>>>> sink.
> > > >>>>>>>> We can make sure that the user is not able to extend or modify
> > > this
> > > >>>>>>>> class
> > > >>>>>>>> in any manner.
> > > >>>>>>>>
> > > >>>>>>>> ~ Bhupesh
> > > >>>>>>>>
> > > >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan <
> david...@gmail.com
> > >
> > > >>>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>> This C type parameter is going to fix the control tuple type
> at
> > > >>>>>>>> compile
> > > >>>>>>>>
> > > >>>>>>>> time and this is actually not what we want. Note that the
> > operator
> > > >>>>>>>> may
> > > >>>>>>>>
> > > >>>>>>>>> receive or emit multiple different control tuple types.
> > > >>>>>>>>>
> > > >>>>>>>>> David
> > > >>>>>>>>>
> > > >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <
> > tus...@datatorrent.com
> > > >
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>> We do not need to create an interface for data emitted
> through
> > > >>>>>>>>> emitControl or processed through processControl. Internally
> we
> > > >>>>>>>>> could
> > > >>>>>>>>> wrap the user object in ControlTuple. you can add type
> > parameter
> > > >>>>>>>>> for
> > > >>>>>>>>> control tuple object on ports.
> > > >>>>>>>>>
> > > >>>>>>>>> DefaultInputPort<D,C>
> > > >>>>>>>>> D is the data type and C is the control tuple type for better
> > > error
> > > >>>>>>>>> catching at compile phase.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> - Tushar.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda <
> > > >>>>>>>>> bhup...@datatorrent.com
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>> Agreed Vlad and David.
> > > >>>>>>>>>
> > > >>>>>>>>> I am just suggesting there should be a wrapper for the user
> > > object.
> > > >>>>>>>>>> It
> > > >>>>>>>>>>
> > > >>>>>>>>>> can
> > > >>>>>>>>>>
> > > >>>>>>>>>> be a marker interface and we can call it something else like
> > > >>>>>>>>>
> > > >>>>>>>>> "CustomControl".
> > > >>>>>>>>>>
> > > >>>>>>>>>> The user object will be wrapped in another class
> > "ControlTuple"
> > > >>>>>>>>>> which
> > > >>>>>>>>>> traverses the BufferServer and will perhaps be extended from
> > the
> > > >>>>>>>>>> packet/Tuple class. This class will not be exposed to the
> > user.
> > > >>>>>>>>>>
> > > >>>>>>>>>> ~ Bhupesh
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov <
> > > >>>>>>>>>> v.ro...@datatorrent.com>
> > > >>>>>>>>>>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>> I agree with David. Payload of the control tuple is in the
> > > >>>>>>>>> userObject
> > > >>>>>>>>>
> > > >>>>>>>>> and
> > > >>>>>>>>>> operators/ports don't need to be exposed to the
> implementation
> > > of
> > > >>>>>>>>>> the
> > > >>>>>>>>>>
> > > >>>>>>>>>> ControlTuple class. With the proposed interface operators
> > > >>>>>>>>>> developers
> > > >>>>>>>>>>
> > > >>>>>>>>>>> are
> > > >>>>>>>>>>> free to extend ControlTuple further and I don't think that
> > such
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> capability
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> needs to be provided. The wrapping in the ControlTuple
> class
> > is
> > > >>>>>>>>>> necessary
> > > >>>>>>>>>> and most likely ControlTuple needs to be extended from the
> > > buffer
> > > >>>>>>>>>> server
> > > >>>>>>>>>>
> > > >>>>>>>>>> Tuple. It may be good to have a common parent other than
> > Object
> > > >>>>>>>>>> for
> > > >>>>>>>>>>
> > > >>>>>>>>>>> all
> > > >>>>>>>>>>> user payloads, but it may be a marker interface as well.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thank you,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Vlad
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Hi David,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Actually, I was thinking of another API class called
> > > >>>>>>>>>>> ControlTuple,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> different from the actual tuple class in buffer server or
> > > stram.
> > > >>>>>>>>>>>> This could serve as a way for the Buffer server publisher
> to
> > > >>>>>>>>>>>> understand
> > > >>>>>>>>>>>> that it is a control tuple and needs to be wrapped
> > > differently.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> ~ Bhupesh
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <david...@gmail.com>
> > > wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>       // DefaultInputPort
> > > >>>>>>>>>>>>        public void processControl(ControlTuple tuple)
> > > >>>>>>>>>>>>        {
> > > >>>>>>>>>>>>          // Default Implementation to avoid need to
> > implement
> > > >>>>>>>>>>>> it in
> > > >>>>>>>>>>>> all
> > > >>>>>>>>>>>> implementations
> > > >>>>>>>>>>>>        }
> > > >>>>>>>>>>>> {code}
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> {code}
> > > >>>>>>>>>>>>       // DefaultOutputPort
> > > >>>>>>>>>>>>        public void emitControl(ControlTuple tuple)
> > > >>>>>>>>>>>>        {
> > > >>>>>>>>>>>>        }
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I think we don't need to expose the ControlTuple class to
> > the
> > > >>>>>>>>>>>> operator
> > > >>>>>>>>>>>> developers because the window ID is just the current
> window
> > ID
> > > >>>>>>>>>>>> when
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> these
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> methods are called. How about making them just Object? We
> > also
> > > >>>>>>>>>>>
> > > >>>>>>>>>> need to
> > > >>>>>>>>>>
> > > >>>>>>>>>> provide the way for the user to specify custom serializer
> for
> > > the
> > > >>>>>>>>>>
> > > >>>>>>>>>>> control
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> tuple.
> > > >>>>>>>>>>>
> > > >>>>>>>>>> David
> > > >>>>>>>>>>
> > > >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda <
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> bhup...@datatorrent.com
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>> Hi All,
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Here are the initial interfaces:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> {code}
> > > >>>>>>>>>>>>>       // DefaultInputPort
> > > >>>>>>>>>>>>>        public void processControl(ControlTuple tuple)
> > > >>>>>>>>>>>>>        {
> > > >>>>>>>>>>>>>          // Default Implementation to avoid need to
> > implement
> > > >>>>>>>>>>>>> it
> > > >>>>>>>>>>>>> in
> > > >>>>>>>>>>>>> all
> > > >>>>>>>>>>>>> implementations
> > > >>>>>>>>>>>>>        }
> > > >>>>>>>>>>>>> {code}
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> {code}
> > > >>>>>>>>>>>>>       // DefaultOutputPort
> > > >>>>>>>>>>>>>        public void emitControl(ControlTuple tuple)
> > > >>>>>>>>>>>>>        {
> > > >>>>>>>>>>>>>        }
> > > >>>>>>>>>>>>> {code}
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> We have an option to add these methods to the interfaces
> -
> > > >>>>>>>>>>>>> InputPort
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> OutputPort; But these would not be backward compatible
> and
> > > also
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>> not
> > > >>>>>>>>>>> consistent with the current implementation of basic data
> > tuple
> > > >>>>>>>>>>> flow
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> (as
> > > >>>>>>>>>>>> with process() and emit()).
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>> We also need to expose an interface / class for users to
> wrap
> > > >>>>>>>>>>> their
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> object
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> and emit downstream. This should be part of API.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> {code}
> > > >>>>>>>>>>>>> public class ControlTuple extends Tuple
> > > >>>>>>>>>>>>> {
> > > >>>>>>>>>>>>>        Object userObject;
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>        public ControlTuple(long windowId, Object
> > userObject)
> > > >>>>>>>>>>>>>        {
> > > >>>>>>>>>>>>>          //
> > > >>>>>>>>>>>>>        }
> > > >>>>>>>>>>>>> }
> > > >>>>>>>>>>>>> {code}
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> The emitted tuples would traverse the same flow as with
> > other
> > > >>>>>>>>>>>>> control
> > > >>>>>>>>>>>>> tuples. The plan is to intercept the control tuples in
> > > >>>>>>>>>>>>> GenericNode
> > > >>>>>>>>>>>>> and
> > > >>>>>>>>>>>>> use
> > > >>>>>>>>>>>>> the Reservior to emit the control tuples at the end of
> the
> > > >>>>>>>>>>>>> window.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> GenericNode seems to be the best place to buffer incoming
> > > >>>>>>>>>>>>> custom
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> control
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> tuples without delivering them immediately to the
> operator
> > > >>>>>>>>>>>> port.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>> Once
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>> end of the window is reached, we plan to use the reservoir
> > sink
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>> push
> > > >>>>>>>>>>> them to the port. This is different behavior than any other
> > > >>>>>>>>>>> control
> > > >>>>>>>>>>> tuple
> > > >>>>>>>>>>> where we are changing the order of tuples in the stream.
> The
> > > >>>>>>>>>>> custom
> > > >>>>>>>>>>> control
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> tuples will be buffered and not delivered to the ports
> until
> > > the
> > > >>>>>>>>>>>> end
> > > >>>>>>>>>>>> of
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>> window.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> To accomplish this, we need to have a public method in
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> SweepableReservoir
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> which allows to put a tuple back in the sink of the
> > > reservoir.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>> ~ Bhupesh
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >
> > >
> >
>

Reply via email to