2 sounds good. Have you thought about what the method would look like. On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <bhup...@datatorrent.com> wrote:
> Yes, that makes sense. > We have following options: > 1. Make the annotation false by default and force the user to forward the > control tuples explicitly. > 2. Annotation is true by default and static way of blocking stays as it is. > We provide another way for blocking programmatically, perhaps by means of > another method call on the port. > > ~ Bhupesh > > On Dec 30, 2016 00:09, "Pramod Immaneni" <pra...@datatorrent.com> wrote: > > > Bhupesh, > > > > Annotation seems like a static way to stop propagation. Give these are > > programmatically generated I would think the operators should be able to > > stop (consume without propagating) programmatically as well. > > > > Thanks > > > > On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <bhup...@datatorrent.com > > > > wrote: > > > > > Thanks Vlad, I am trying out the approach you mentioned regarding > having > > > another interface which allows sinks to put a control tuple. > > > > > > Regarding the delivery of control tuples, here is what I am planning to > > do: > > > All the control tuples which are emitted in a particular window are > > > delivered after all the data tuples have been delivered to the > respective > > > ports, but before the endWindow() call. The operator can then process > the > > > control tuples in that window and can do any finalization in the end > > window > > > call. There will be no delivery of control tuples after endWindow() and > > > before the next beginWindow() call. > > > > > > For handling the propagation of control tuples further in the dag, we > are > > > planning to have an annotation on the Output Port of the operator which > > > would be true by default. > > > @OutputPortFieldAnnotation(propogateControlTuples = false). > > > > > > ~ Bhupesh > > > > > > > > > On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <v.ro...@datatorrent.com> > > > wrote: > > > > > > > Custom control tuples are control tuples emitted by an operator > itself > > > and > > > > not by the platform. Prior to the introduction of the custom control > > > > tuples, only Apex engine itself puts control tuples into various > sinks, > > > so > > > > the engine created necessary Tuple objects with the corresponding > type > > > > prior to calling Sink.put(). > > > > > > > > Not all sinks need to be changed. Only control tuple aware sinks > should > > > > provide such functionality. In the case there is a lot of code > > > duplication, > > > > please create an abstract class, that other control aware sinks will > > > extend > > > > from. > > > > > > > > Thank you, > > > > > > > > Vlad > > > > > > > > > > > > On 12/23/16 06:24, Bhupesh Chawda wrote: > > > > > > > >> Hi Vlad, > > > >> > > > >> Thanks for the pointer on delegating the wrapping of the user tuple > to > > > the > > > >> control port. I was trying this out today. > > > >> The problem I see us if we introduce a putControlTuple() method in > > Sink, > > > >> then a lot of the existing sinks would change. Also the changes > seemed > > > >> redundant as, the existing control tuples already use the put() > method > > > of > > > >> sinks. So why do something special for custom control tuples? > > > >> > > > >> The only aspect in which the custom control tuples are different is > > that > > > >> these will be generated by the user and will actually be delivered > to > > > the > > > >> ports in a different order. Perhaps we should be able to use the > > > existing > > > >> flow. The only problems as outlined before seem to be identification > > of > > > >> the > > > >> user tuple as a control tuple. > > > >> > > > >> ~ Bhupesh > > > >> > > > >> > > > >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov < > v.ro...@datatorrent.com > > > > > > >> wrote: > > > >> > > > >> Why is it necessary to wrap in the OutputPort? Can't it be delegated > > to > > > a > > > >>> Sink by introducing new putControlTuple method? > > > >>> > > > >>> Thank you, > > > >>> > > > >>> Vlad > > > >>> > > > >>> > > > >>> On 12/21/16 22:10, Bhupesh Chawda wrote: > > > >>> > > > >>> Hi Vlad, > > > >>>> > > > >>>> The problem in using the Tuple class as the wrapper is that the > > Ports > > > >>>> belong to the API and we want to wrap the payload object of the > > > control > > > >>>> tuple into the Tuple class which is not part of the API. > > > >>>> > > > >>>> The output port will just get the payload of the user control > tuple. > > > For > > > >>>> example, if the user emits a Long, as a control tuple, the payload > > > >>>> object > > > >>>> will just be a Long object. > > > >>>> > > > >>>> It is necessary to bundle this Long into some recognizable object > so > > > >>>> that > > > >>>> the BufferServerPublisher knows that this is a Control tuple and > > not a > > > >>>> regular tuple and serialize it accordingly. It is therefore > > necessary > > > >>>> that > > > >>>> the tuple be part of some known hierarchy so that can be > > distinguished > > > >>>> from > > > >>>> other payload tuples. Let us call this class > ControlTupleInterface. > > > Note > > > >>>> that this needs to be done before the tuple is inserted into the > > sink > > > >>>> which > > > >>>> is done in the port objects. Once the tuple is inserted into the > > sink, > > > >>>> it > > > >>>> would seem just like any other payload tuple and cannot be > > > >>>> distinguished. > > > >>>> > > > >>>> For this reason, I had something like the following in API: > > > >>>> > > > >>>> package com.datatorrent.api; > > > >>>> public class ControlTupleInterface > > > >>>> { > > > >>>> Object payload; // User control tuple payload. A Long() for > > > example. > > > >>>> UUID id; // Unique Id to de-duplicate in downstream operators > > > >>>> } > > > >>>> > > > >>>> Regarding your suggestion on using the Tuple class as the wrapper > > for > > > >>>> the > > > >>>> control tuple payload, let me mention the current scenario flow to > > > make > > > >>>> the > > > >>>> discussion easier: > > > >>>> > > > >>>> We have a Tuple class in buffer server which is responsible for > > > >>>> serializing > > > >>>> the user control tuple bundling together a message type: > > > >>>> CUSTOM_CONTROL_TUPLE_VALUE. > > > >>>> > > > >>>> > > > >>>> *com.datatorrent.bufferserver.packet.Tuple|-- > > > >>>> com.datatorrent.bufferserver.packet.CustomControlTuple* > > > >>>> We have another Tuple class in Stram which helps the > > > >>>> BufferServerSubscriber > > > >>>> to de-serialize the serialized tuples. We should have > > > CustomControlTuple > > > >>>> in > > > >>>> stram as follows: > > > >>>> > > > >>>> > > > >>>> *com.datatorrent.stram.tuple.Tuple|-- > > > >>>> com.datatorrent.stram.tuple.CustomControlTuple*This will have a > > field > > > >>>> for > > > >>>> > > > >>>> user control payload. > > > >>>> > > > >>>> I think we should not expose the Tuple class in stram to the API. > > That > > > >>>> was > > > >>>> the main reason I introduced another class/interface > > > >>>> ControlTupleInterface > > > >>>> as described above. > > > >>>> > > > >>>> Regarding, adding methods to DefaultInputPort and > > DefaultOutputPort, I > > > >>>> think error detection would not be early enough if the control > tuple > > > is > > > >>>> sent very late in the processing :-) > > > >>>> Extending the ports to ControlTupleAware* should help in this > case. > > > >>>> However, we still need to see if there are any downsides on doing > > > this. > > > >>>> > > > >>>> Thanks. > > > >>>> > > > >>>> ~ Bhupesh > > > >>>> > > > >>>> > > > >>>> > > > >>>> > > > >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov < > > v.ro...@datatorrent.com> > > > >>>> wrote: > > > >>>> > > > >>>> Hi Bhupesh, > > > >>>> > > > >>>>> it should not be a CustomWrapper. The wrapper object should be > > > >>>>> CustomControlTuple that extends Tuple. There is already code that > > > >>>>> checks > > > >>>>> for Tuple instance. The "unWrap" name is misleading, IMO. It > should > > > be > > > >>>>> something like customControlTuple.getPayload() or > > > >>>>> customControlTuple.getAttachment(). In the emitControl(), create > > new > > > >>>>> CustomControlTuple using provided payload as one of arguments. It > > may > > > >>>>> still > > > >>>>> be good to use common parent other than Object for control tuple > > > >>>>> payload > > > >>>>> class hierarchy. > > > >>>>> > > > >>>>> I don't understand how adding more methods to the Default > > > >>>>> implementation > > > >>>>> will help with early error detection unless application or > operator > > > >>>>> that > > > >>>>> relies on the custom control tuple functionality explicitly > checks > > > for > > > >>>>> the > > > >>>>> platform version at run-time or tries to emit a control tuple > just > > to > > > >>>>> check > > > >>>>> that such functionality is supported by the platform. > > > >>>>> > > > >>>>> Thank you, > > > >>>>> > > > >>>>> Vlad > > > >>>>> > > > >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote: > > > >>>>> > > > >>>>> Hi Vlad. > > > >>>>> > > > >>>>>> Yes, the API should not change. We can take an Object instead, > and > > > >>>>>> later > > > >>>>>> wrap it into the required class. > > > >>>>>> > > > >>>>>> Our InputPort.put and emitControl method would look something > like > > > the > > > >>>>>> following where we handle the wrapping and unwrapping > internally. > > > >>>>>> > > > >>>>>> public void put(T tuple) > > > >>>>>> { > > > >>>>>> if (tuple instanceof CustomWrapper) { > > > >>>>>> processControl(tuple.unWrap()); > > > >>>>>> } else { > > > >>>>>> process(tuple) > > > >>>>>> } > > > >>>>>> } > > > >>>>>> > > > >>>>>> emitControl(Object tuple) > > > >>>>>> { > > > >>>>>> sink.put(CustomWrapper.wrap(tuple)); > > > >>>>>> } > > > >>>>>> > > > >>>>>> Regarding the compatibility issue, I think we have two ways of > > doing > > > >>>>>> it: > > > >>>>>> > > > >>>>>> 1. Extend DefaultInputPort and DefaultOutputPort and > create > > > >>>>>> ControlAwareInput and ControlAwareOutput out of it. This > > might > > > >>>>>> require us > > > >>>>>> to additionally handle specific cases when non-compatible > > > ports > > > >>>>>> (ControlAwareOutput and DefaultInput, for example) are > > > >>>>>> connected to > > > >>>>>> each > > > >>>>>> other in user apps. > > > >>>>>> 2. Add the additional methods in the existing Default > > > >>>>>> implementations. > > > >>>>>> > > > >>>>>> > > > >>>>>> IMO, both of these would help in early error detection. > > > >>>>>> > > > >>>>>> ~ Bhupesh > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov < > > > v.ro...@datatorrent.com> > > > >>>>>> wrote: > > > >>>>>> > > > >>>>>> A wrapper class is required for the control tuples delivery, but > > > >>>>>> > > > >>>>>> Port/Operator API should use Control Tuple payload object only. > > > >>>>>>> Implementation of the wrapper class may change from version to > > > >>>>>>> version, > > > >>>>>>> but > > > >>>>>>> API should not be affected by the change. > > > >>>>>>> > > > >>>>>>> I guess, assumption is that default input and output port will > be > > > >>>>>>> extended > > > >>>>>>> to provide support for the control tuples. This may cause some > > > >>>>>>> backward > > > >>>>>>> compatibility issues. Consider scenario when a newer version of > > > >>>>>>> Malhar > > > >>>>>>> that > > > >>>>>>> relies on EOF control tuple is deployed into older version of > > core > > > >>>>>>> that > > > >>>>>>> does not support control tuples. In such scenario, error will > be > > > >>>>>>> raised > > > >>>>>>> only when an operator tries to emit EOF control tuple at the > end > > > of a > > > >>>>>>> job. > > > >>>>>>> Introducing control tuple aware ports solve the early error > > > >>>>>>> detection. > > > >>>>>>> It > > > >>>>>>> will require some operators to be modified to use control tuple > > > aware > > > >>>>>>> ports, but such change may help to distinguish control tuple > > aware > > > >>>>>>> operators from their old versions. > > > >>>>>>> > > > >>>>>>> Vlad > > > >>>>>>> > > > >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote: > > > >>>>>>> > > > >>>>>>> I investigated this and seems like it is better to have a > wrapper > > > >>>>>>> class > > > >>>>>>> > > > >>>>>>> for > > > >>>>>>>> the user object. > > > >>>>>>>> This would serve 2 purposes: > > > >>>>>>>> > > > >>>>>>>> 1. Allow us to distinguish a custom control tuple from > > > other > > > >>>>>>>> payload > > > >>>>>>>> tuples. > > > >>>>>>>> 2. For the same control tuple received from different > > > >>>>>>>> upstream > > > >>>>>>>> > > > >>>>>>>> partitions, we would have some mechanism to distinguish > > > >>>>>>>> between > > > >>>>>>>> the > > > >>>>>>>> two in > > > >>>>>>>> order to identify duplicates. > > > >>>>>>>> > > > >>>>>>>> Additionally, the wrapper class needs to be part of the API as > > > >>>>>>>> DefaultOutputPort needs to know about it, before putting it > into > > > the > > > >>>>>>>> sink. > > > >>>>>>>> We can make sure that the user is not able to extend or modify > > > this > > > >>>>>>>> class > > > >>>>>>>> in any manner. > > > >>>>>>>> > > > >>>>>>>> ~ Bhupesh > > > >>>>>>>> > > > >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan < > david...@gmail.com > > > > > > >>>>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>> This C type parameter is going to fix the control tuple type > at > > > >>>>>>>> compile > > > >>>>>>>> > > > >>>>>>>> time and this is actually not what we want. Note that the > > operator > > > >>>>>>>> may > > > >>>>>>>> > > > >>>>>>>>> receive or emit multiple different control tuple types. > > > >>>>>>>>> > > > >>>>>>>>> David > > > >>>>>>>>> > > > >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" < > > tus...@datatorrent.com > > > > > > > >>>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>> We do not need to create an interface for data emitted > through > > > >>>>>>>>> emitControl or processed through processControl. Internally > we > > > >>>>>>>>> could > > > >>>>>>>>> wrap the user object in ControlTuple. you can add type > > parameter > > > >>>>>>>>> for > > > >>>>>>>>> control tuple object on ports. > > > >>>>>>>>> > > > >>>>>>>>> DefaultInputPort<D,C> > > > >>>>>>>>> D is the data type and C is the control tuple type for better > > > error > > > >>>>>>>>> catching at compile phase. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> - Tushar. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda < > > > >>>>>>>>> bhup...@datatorrent.com > > > >>>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>> Agreed Vlad and David. > > > >>>>>>>>> > > > >>>>>>>>> I am just suggesting there should be a wrapper for the user > > > object. > > > >>>>>>>>>> It > > > >>>>>>>>>> > > > >>>>>>>>>> can > > > >>>>>>>>>> > > > >>>>>>>>>> be a marker interface and we can call it something else like > > > >>>>>>>>> > > > >>>>>>>>> "CustomControl". > > > >>>>>>>>>> > > > >>>>>>>>>> The user object will be wrapped in another class > > "ControlTuple" > > > >>>>>>>>>> which > > > >>>>>>>>>> traverses the BufferServer and will perhaps be extended from > > the > > > >>>>>>>>>> packet/Tuple class. This class will not be exposed to the > > user. > > > >>>>>>>>>> > > > >>>>>>>>>> ~ Bhupesh > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov < > > > >>>>>>>>>> v.ro...@datatorrent.com> > > > >>>>>>>>>> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>> I agree with David. Payload of the control tuple is in the > > > >>>>>>>>> userObject > > > >>>>>>>>> > > > >>>>>>>>> and > > > >>>>>>>>>> operators/ports don't need to be exposed to the > implementation > > > of > > > >>>>>>>>>> the > > > >>>>>>>>>> > > > >>>>>>>>>> ControlTuple class. With the proposed interface operators > > > >>>>>>>>>> developers > > > >>>>>>>>>> > > > >>>>>>>>>>> are > > > >>>>>>>>>>> free to extend ControlTuple further and I don't think that > > such > > > >>>>>>>>>>> > > > >>>>>>>>>>> capability > > > >>>>>>>>>>> > > > >>>>>>>>>>> needs to be provided. The wrapping in the ControlTuple > class > > is > > > >>>>>>>>>> necessary > > > >>>>>>>>>> and most likely ControlTuple needs to be extended from the > > > buffer > > > >>>>>>>>>> server > > > >>>>>>>>>> > > > >>>>>>>>>> Tuple. It may be good to have a common parent other than > > Object > > > >>>>>>>>>> for > > > >>>>>>>>>> > > > >>>>>>>>>>> all > > > >>>>>>>>>>> user payloads, but it may be a marker interface as well. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Thank you, > > > >>>>>>>>>>> > > > >>>>>>>>>>> Vlad > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>> Hi David, > > > >>>>>>>>>>> > > > >>>>>>>>>>> Actually, I was thinking of another API class called > > > >>>>>>>>>>> ControlTuple, > > > >>>>>>>>>>> > > > >>>>>>>>>>>> different from the actual tuple class in buffer server or > > > stram. > > > >>>>>>>>>>>> This could serve as a way for the Buffer server publisher > to > > > >>>>>>>>>>>> understand > > > >>>>>>>>>>>> that it is a control tuple and needs to be wrapped > > > differently. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> ~ Bhupesh > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <david...@gmail.com> > > > wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> // DefaultInputPort > > > >>>>>>>>>>>> public void processControl(ControlTuple tuple) > > > >>>>>>>>>>>> { > > > >>>>>>>>>>>> // Default Implementation to avoid need to > > implement > > > >>>>>>>>>>>> it in > > > >>>>>>>>>>>> all > > > >>>>>>>>>>>> implementations > > > >>>>>>>>>>>> } > > > >>>>>>>>>>>> {code} > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> {code} > > > >>>>>>>>>>>> // DefaultOutputPort > > > >>>>>>>>>>>> public void emitControl(ControlTuple tuple) > > > >>>>>>>>>>>> { > > > >>>>>>>>>>>> } > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> I think we don't need to expose the ControlTuple class to > > the > > > >>>>>>>>>>>> operator > > > >>>>>>>>>>>> developers because the window ID is just the current > window > > ID > > > >>>>>>>>>>>> when > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> these > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> methods are called. How about making them just Object? We > > also > > > >>>>>>>>>>> > > > >>>>>>>>>> need to > > > >>>>>>>>>> > > > >>>>>>>>>> provide the way for the user to specify custom serializer > for > > > the > > > >>>>>>>>>> > > > >>>>>>>>>>> control > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> tuple. > > > >>>>>>>>>>> > > > >>>>>>>>>> David > > > >>>>>>>>>> > > > >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda < > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> bhup...@datatorrent.com > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>> Hi All, > > > >>>>>>>>>> > > > >>>>>>>>>>> Here are the initial interfaces: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> {code} > > > >>>>>>>>>>>>> // DefaultInputPort > > > >>>>>>>>>>>>> public void processControl(ControlTuple tuple) > > > >>>>>>>>>>>>> { > > > >>>>>>>>>>>>> // Default Implementation to avoid need to > > implement > > > >>>>>>>>>>>>> it > > > >>>>>>>>>>>>> in > > > >>>>>>>>>>>>> all > > > >>>>>>>>>>>>> implementations > > > >>>>>>>>>>>>> } > > > >>>>>>>>>>>>> {code} > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> {code} > > > >>>>>>>>>>>>> // DefaultOutputPort > > > >>>>>>>>>>>>> public void emitControl(ControlTuple tuple) > > > >>>>>>>>>>>>> { > > > >>>>>>>>>>>>> } > > > >>>>>>>>>>>>> {code} > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> We have an option to add these methods to the interfaces > - > > > >>>>>>>>>>>>> InputPort > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> and > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> OutputPort; But these would not be backward compatible > and > > > also > > > >>>>>>>>>>>> > > > >>>>>>>>>>> not > > > >>>>>>>>>>> consistent with the current implementation of basic data > > tuple > > > >>>>>>>>>>> flow > > > >>>>>>>>>>> > > > >>>>>>>>>>> (as > > > >>>>>>>>>>>> with process() and emit()). > > > >>>>>>>>>>>> > > > >>>>>>>>>>> We also need to expose an interface / class for users to > wrap > > > >>>>>>>>>>> their > > > >>>>>>>>>>> > > > >>>>>>>>>>> object > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> and emit downstream. This should be part of API. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> {code} > > > >>>>>>>>>>>>> public class ControlTuple extends Tuple > > > >>>>>>>>>>>>> { > > > >>>>>>>>>>>>> Object userObject; > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> public ControlTuple(long windowId, Object > > userObject) > > > >>>>>>>>>>>>> { > > > >>>>>>>>>>>>> // > > > >>>>>>>>>>>>> } > > > >>>>>>>>>>>>> } > > > >>>>>>>>>>>>> {code} > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> The emitted tuples would traverse the same flow as with > > other > > > >>>>>>>>>>>>> control > > > >>>>>>>>>>>>> tuples. The plan is to intercept the control tuples in > > > >>>>>>>>>>>>> GenericNode > > > >>>>>>>>>>>>> and > > > >>>>>>>>>>>>> use > > > >>>>>>>>>>>>> the Reservior to emit the control tuples at the end of > the > > > >>>>>>>>>>>>> window. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> GenericNode seems to be the best place to buffer incoming > > > >>>>>>>>>>>>> custom > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> control > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> tuples without delivering them immediately to the > operator > > > >>>>>>>>>>>> port. > > > >>>>>>>>>>>> > > > >>>>>>>>>>> Once > > > >>>>>>>>>>> the > > > >>>>>>>>>>> end of the window is reached, we plan to use the reservoir > > sink > > > >>>>>>>>>>> to > > > >>>>>>>>>>> push > > > >>>>>>>>>>> them to the port. This is different behavior than any other > > > >>>>>>>>>>> control > > > >>>>>>>>>>> tuple > > > >>>>>>>>>>> where we are changing the order of tuples in the stream. > The > > > >>>>>>>>>>> custom > > > >>>>>>>>>>> control > > > >>>>>>>>>>> > > > >>>>>>>>>>> tuples will be buffered and not delivered to the ports > until > > > the > > > >>>>>>>>>>>> end > > > >>>>>>>>>>>> of > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>> > > > >>>>>>>>>>> window. > > > >>>>>>>>>>> > > > >>>>>>>>>>> To accomplish this, we need to have a public method in > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> SweepableReservoir > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> which allows to put a tuple back in the sink of the > > > reservoir. > > > >>>>>>>>>>>> > > > >>>>>>>>>>> ~ Bhupesh > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > > > > > > > >