Hi All, I have created a review only PR based on the discussion so far. This will also help make the discussion easier and we can continue with the review in parallel.
Here is the PR: https://github.com/apache/apex-core/pull/440 Please help review this. I am still working on documentation and tests. ~ Bhupesh On Sun, Jan 1, 2017 at 9:59 AM, Bhupesh Chawda <[email protected]> wrote: > Yes, that makes sense. > We have following options: > 1. Make the annotation false by default and force the user to forward the > control tuples explicitly. > 2. Annotation is true by default and static way of blocking stays as it > is. We provide another way for blocking programmatically, perhaps by means > of another method call on the port. > > ~ Bhupesh > > On Dec 30, 2016 00:09, "Pramod Immaneni" <[email protected]> wrote: > >> Bhupesh, >> >> Annotation seems like a static way to stop propagation. Give these are >> programmatically generated I would think the operators should be able to >> stop (consume without propagating) programmatically as well. >> >> Thanks >> >> On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <[email protected]> >> wrote: >> >> > Thanks Vlad, I am trying out the approach you mentioned regarding having >> > another interface which allows sinks to put a control tuple. >> > >> > Regarding the delivery of control tuples, here is what I am planning to >> do: >> > All the control tuples which are emitted in a particular window are >> > delivered after all the data tuples have been delivered to the >> respective >> > ports, but before the endWindow() call. The operator can then process >> the >> > control tuples in that window and can do any finalization in the end >> window >> > call. There will be no delivery of control tuples after endWindow() and >> > before the next beginWindow() call. >> > >> > For handling the propagation of control tuples further in the dag, we >> are >> > planning to have an annotation on the Output Port of the operator which >> > would be true by default. >> > @OutputPortFieldAnnotation(propogateControlTuples = false). >> > >> > ~ Bhupesh >> > >> > >> > On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <[email protected]> >> > wrote: >> > >> > > Custom control tuples are control tuples emitted by an operator itself >> > and >> > > not by the platform. Prior to the introduction of the custom control >> > > tuples, only Apex engine itself puts control tuples into various >> sinks, >> > so >> > > the engine created necessary Tuple objects with the corresponding type >> > > prior to calling Sink.put(). >> > > >> > > Not all sinks need to be changed. Only control tuple aware sinks >> should >> > > provide such functionality. In the case there is a lot of code >> > duplication, >> > > please create an abstract class, that other control aware sinks will >> > extend >> > > from. >> > > >> > > Thank you, >> > > >> > > Vlad >> > > >> > > >> > > On 12/23/16 06:24, Bhupesh Chawda wrote: >> > > >> > >> Hi Vlad, >> > >> >> > >> Thanks for the pointer on delegating the wrapping of the user tuple >> to >> > the >> > >> control port. I was trying this out today. >> > >> The problem I see us if we introduce a putControlTuple() method in >> Sink, >> > >> then a lot of the existing sinks would change. Also the changes >> seemed >> > >> redundant as, the existing control tuples already use the put() >> method >> > of >> > >> sinks. So why do something special for custom control tuples? >> > >> >> > >> The only aspect in which the custom control tuples are different is >> that >> > >> these will be generated by the user and will actually be delivered to >> > the >> > >> ports in a different order. Perhaps we should be able to use the >> > existing >> > >> flow. The only problems as outlined before seem to be identification >> of >> > >> the >> > >> user tuple as a control tuple. >> > >> >> > >> ~ Bhupesh >> > >> >> > >> >> > >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov < >> [email protected]> >> > >> wrote: >> > >> >> > >> Why is it necessary to wrap in the OutputPort? Can't it be delegated >> to >> > a >> > >>> Sink by introducing new putControlTuple method? >> > >>> >> > >>> Thank you, >> > >>> >> > >>> Vlad >> > >>> >> > >>> >> > >>> On 12/21/16 22:10, Bhupesh Chawda wrote: >> > >>> >> > >>> Hi Vlad, >> > >>>> >> > >>>> The problem in using the Tuple class as the wrapper is that the >> Ports >> > >>>> belong to the API and we want to wrap the payload object of the >> > control >> > >>>> tuple into the Tuple class which is not part of the API. >> > >>>> >> > >>>> The output port will just get the payload of the user control >> tuple. >> > For >> > >>>> example, if the user emits a Long, as a control tuple, the payload >> > >>>> object >> > >>>> will just be a Long object. >> > >>>> >> > >>>> It is necessary to bundle this Long into some recognizable object >> so >> > >>>> that >> > >>>> the BufferServerPublisher knows that this is a Control tuple and >> not a >> > >>>> regular tuple and serialize it accordingly. It is therefore >> necessary >> > >>>> that >> > >>>> the tuple be part of some known hierarchy so that can be >> distinguished >> > >>>> from >> > >>>> other payload tuples. Let us call this class ControlTupleInterface. >> > Note >> > >>>> that this needs to be done before the tuple is inserted into the >> sink >> > >>>> which >> > >>>> is done in the port objects. Once the tuple is inserted into the >> sink, >> > >>>> it >> > >>>> would seem just like any other payload tuple and cannot be >> > >>>> distinguished. >> > >>>> >> > >>>> For this reason, I had something like the following in API: >> > >>>> >> > >>>> package com.datatorrent.api; >> > >>>> public class ControlTupleInterface >> > >>>> { >> > >>>> Object payload; // User control tuple payload. A Long() for >> > example. >> > >>>> UUID id; // Unique Id to de-duplicate in downstream operators >> > >>>> } >> > >>>> >> > >>>> Regarding your suggestion on using the Tuple class as the wrapper >> for >> > >>>> the >> > >>>> control tuple payload, let me mention the current scenario flow to >> > make >> > >>>> the >> > >>>> discussion easier: >> > >>>> >> > >>>> We have a Tuple class in buffer server which is responsible for >> > >>>> serializing >> > >>>> the user control tuple bundling together a message type: >> > >>>> CUSTOM_CONTROL_TUPLE_VALUE. >> > >>>> >> > >>>> >> > >>>> *com.datatorrent.bufferserver.packet.Tuple|-- >> > >>>> com.datatorrent.bufferserver.packet.CustomControlTuple* >> > >>>> We have another Tuple class in Stram which helps the >> > >>>> BufferServerSubscriber >> > >>>> to de-serialize the serialized tuples. We should have >> > CustomControlTuple >> > >>>> in >> > >>>> stram as follows: >> > >>>> >> > >>>> >> > >>>> *com.datatorrent.stram.tuple.Tuple|-- >> > >>>> com.datatorrent.stram.tuple.CustomControlTuple*This will have a >> field >> > >>>> for >> > >>>> >> > >>>> user control payload. >> > >>>> >> > >>>> I think we should not expose the Tuple class in stram to the API. >> That >> > >>>> was >> > >>>> the main reason I introduced another class/interface >> > >>>> ControlTupleInterface >> > >>>> as described above. >> > >>>> >> > >>>> Regarding, adding methods to DefaultInputPort and >> DefaultOutputPort, I >> > >>>> think error detection would not be early enough if the control >> tuple >> > is >> > >>>> sent very late in the processing :-) >> > >>>> Extending the ports to ControlTupleAware* should help in this case. >> > >>>> However, we still need to see if there are any downsides on doing >> > this. >> > >>>> >> > >>>> Thanks. >> > >>>> >> > >>>> ~ Bhupesh >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov < >> [email protected]> >> > >>>> wrote: >> > >>>> >> > >>>> Hi Bhupesh, >> > >>>> >> > >>>>> it should not be a CustomWrapper. The wrapper object should be >> > >>>>> CustomControlTuple that extends Tuple. There is already code that >> > >>>>> checks >> > >>>>> for Tuple instance. The "unWrap" name is misleading, IMO. It >> should >> > be >> > >>>>> something like customControlTuple.getPayload() or >> > >>>>> customControlTuple.getAttachment(). In the emitControl(), create >> new >> > >>>>> CustomControlTuple using provided payload as one of arguments. It >> may >> > >>>>> still >> > >>>>> be good to use common parent other than Object for control tuple >> > >>>>> payload >> > >>>>> class hierarchy. >> > >>>>> >> > >>>>> I don't understand how adding more methods to the Default >> > >>>>> implementation >> > >>>>> will help with early error detection unless application or >> operator >> > >>>>> that >> > >>>>> relies on the custom control tuple functionality explicitly checks >> > for >> > >>>>> the >> > >>>>> platform version at run-time or tries to emit a control tuple >> just to >> > >>>>> check >> > >>>>> that such functionality is supported by the platform. >> > >>>>> >> > >>>>> Thank you, >> > >>>>> >> > >>>>> Vlad >> > >>>>> >> > >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote: >> > >>>>> >> > >>>>> Hi Vlad. >> > >>>>> >> > >>>>>> Yes, the API should not change. We can take an Object instead, >> and >> > >>>>>> later >> > >>>>>> wrap it into the required class. >> > >>>>>> >> > >>>>>> Our InputPort.put and emitControl method would look something >> like >> > the >> > >>>>>> following where we handle the wrapping and unwrapping internally. >> > >>>>>> >> > >>>>>> public void put(T tuple) >> > >>>>>> { >> > >>>>>> if (tuple instanceof CustomWrapper) { >> > >>>>>> processControl(tuple.unWrap()); >> > >>>>>> } else { >> > >>>>>> process(tuple) >> > >>>>>> } >> > >>>>>> } >> > >>>>>> >> > >>>>>> emitControl(Object tuple) >> > >>>>>> { >> > >>>>>> sink.put(CustomWrapper.wrap(tuple)); >> > >>>>>> } >> > >>>>>> >> > >>>>>> Regarding the compatibility issue, I think we have two ways of >> doing >> > >>>>>> it: >> > >>>>>> >> > >>>>>> 1. Extend DefaultInputPort and DefaultOutputPort and create >> > >>>>>> ControlAwareInput and ControlAwareOutput out of it. This >> might >> > >>>>>> require us >> > >>>>>> to additionally handle specific cases when non-compatible >> > ports >> > >>>>>> (ControlAwareOutput and DefaultInput, for example) are >> > >>>>>> connected to >> > >>>>>> each >> > >>>>>> other in user apps. >> > >>>>>> 2. Add the additional methods in the existing Default >> > >>>>>> implementations. >> > >>>>>> >> > >>>>>> >> > >>>>>> IMO, both of these would help in early error detection. >> > >>>>>> >> > >>>>>> ~ Bhupesh >> > >>>>>> >> > >>>>>> >> > >>>>>> >> > >>>>>> >> > >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov < >> > [email protected]> >> > >>>>>> wrote: >> > >>>>>> >> > >>>>>> A wrapper class is required for the control tuples delivery, but >> > >>>>>> >> > >>>>>> Port/Operator API should use Control Tuple payload object only. >> > >>>>>>> Implementation of the wrapper class may change from version to >> > >>>>>>> version, >> > >>>>>>> but >> > >>>>>>> API should not be affected by the change. >> > >>>>>>> >> > >>>>>>> I guess, assumption is that default input and output port will >> be >> > >>>>>>> extended >> > >>>>>>> to provide support for the control tuples. This may cause some >> > >>>>>>> backward >> > >>>>>>> compatibility issues. Consider scenario when a newer version of >> > >>>>>>> Malhar >> > >>>>>>> that >> > >>>>>>> relies on EOF control tuple is deployed into older version of >> core >> > >>>>>>> that >> > >>>>>>> does not support control tuples. In such scenario, error will be >> > >>>>>>> raised >> > >>>>>>> only when an operator tries to emit EOF control tuple at the end >> > of a >> > >>>>>>> job. >> > >>>>>>> Introducing control tuple aware ports solve the early error >> > >>>>>>> detection. >> > >>>>>>> It >> > >>>>>>> will require some operators to be modified to use control tuple >> > aware >> > >>>>>>> ports, but such change may help to distinguish control tuple >> aware >> > >>>>>>> operators from their old versions. >> > >>>>>>> >> > >>>>>>> Vlad >> > >>>>>>> >> > >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote: >> > >>>>>>> >> > >>>>>>> I investigated this and seems like it is better to have a >> wrapper >> > >>>>>>> class >> > >>>>>>> >> > >>>>>>> for >> > >>>>>>>> the user object. >> > >>>>>>>> This would serve 2 purposes: >> > >>>>>>>> >> > >>>>>>>> 1. Allow us to distinguish a custom control tuple from >> > other >> > >>>>>>>> payload >> > >>>>>>>> tuples. >> > >>>>>>>> 2. For the same control tuple received from different >> > >>>>>>>> upstream >> > >>>>>>>> >> > >>>>>>>> partitions, we would have some mechanism to distinguish >> > >>>>>>>> between >> > >>>>>>>> the >> > >>>>>>>> two in >> > >>>>>>>> order to identify duplicates. >> > >>>>>>>> >> > >>>>>>>> Additionally, the wrapper class needs to be part of the API as >> > >>>>>>>> DefaultOutputPort needs to know about it, before putting it >> into >> > the >> > >>>>>>>> sink. >> > >>>>>>>> We can make sure that the user is not able to extend or modify >> > this >> > >>>>>>>> class >> > >>>>>>>> in any manner. >> > >>>>>>>> >> > >>>>>>>> ~ Bhupesh >> > >>>>>>>> >> > >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan < >> [email protected]> >> > >>>>>>>> wrote: >> > >>>>>>>> >> > >>>>>>>> This C type parameter is going to fix the control tuple type at >> > >>>>>>>> compile >> > >>>>>>>> >> > >>>>>>>> time and this is actually not what we want. Note that the >> operator >> > >>>>>>>> may >> > >>>>>>>> >> > >>>>>>>>> receive or emit multiple different control tuple types. >> > >>>>>>>>> >> > >>>>>>>>> David >> > >>>>>>>>> >> > >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" < >> [email protected] >> > > >> > >>>>>>>>> wrote: >> > >>>>>>>>> >> > >>>>>>>>> We do not need to create an interface for data emitted through >> > >>>>>>>>> emitControl or processed through processControl. Internally we >> > >>>>>>>>> could >> > >>>>>>>>> wrap the user object in ControlTuple. you can add type >> parameter >> > >>>>>>>>> for >> > >>>>>>>>> control tuple object on ports. >> > >>>>>>>>> >> > >>>>>>>>> DefaultInputPort<D,C> >> > >>>>>>>>> D is the data type and C is the control tuple type for better >> > error >> > >>>>>>>>> catching at compile phase. >> > >>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>>> - Tushar. >> > >>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda < >> > >>>>>>>>> [email protected] >> > >>>>>>>>> wrote: >> > >>>>>>>>> >> > >>>>>>>>> Agreed Vlad and David. >> > >>>>>>>>> >> > >>>>>>>>> I am just suggesting there should be a wrapper for the user >> > object. >> > >>>>>>>>>> It >> > >>>>>>>>>> >> > >>>>>>>>>> can >> > >>>>>>>>>> >> > >>>>>>>>>> be a marker interface and we can call it something else like >> > >>>>>>>>> >> > >>>>>>>>> "CustomControl". >> > >>>>>>>>>> >> > >>>>>>>>>> The user object will be wrapped in another class >> "ControlTuple" >> > >>>>>>>>>> which >> > >>>>>>>>>> traverses the BufferServer and will perhaps be extended from >> the >> > >>>>>>>>>> packet/Tuple class. This class will not be exposed to the >> user. >> > >>>>>>>>>> >> > >>>>>>>>>> ~ Bhupesh >> > >>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov < >> > >>>>>>>>>> [email protected]> >> > >>>>>>>>>> >> > >>>>>>>>>> wrote: >> > >>>>>>>>>> >> > >>>>>>>>>> I agree with David. Payload of the control tuple is in the >> > >>>>>>>>> userObject >> > >>>>>>>>> >> > >>>>>>>>> and >> > >>>>>>>>>> operators/ports don't need to be exposed to the >> implementation >> > of >> > >>>>>>>>>> the >> > >>>>>>>>>> >> > >>>>>>>>>> ControlTuple class. With the proposed interface operators >> > >>>>>>>>>> developers >> > >>>>>>>>>> >> > >>>>>>>>>>> are >> > >>>>>>>>>>> free to extend ControlTuple further and I don't think that >> such >> > >>>>>>>>>>> >> > >>>>>>>>>>> capability >> > >>>>>>>>>>> >> > >>>>>>>>>>> needs to be provided. The wrapping in the ControlTuple >> class is >> > >>>>>>>>>> necessary >> > >>>>>>>>>> and most likely ControlTuple needs to be extended from the >> > buffer >> > >>>>>>>>>> server >> > >>>>>>>>>> >> > >>>>>>>>>> Tuple. It may be good to have a common parent other than >> Object >> > >>>>>>>>>> for >> > >>>>>>>>>> >> > >>>>>>>>>>> all >> > >>>>>>>>>>> user payloads, but it may be a marker interface as well. >> > >>>>>>>>>>> >> > >>>>>>>>>>> Thank you, >> > >>>>>>>>>>> >> > >>>>>>>>>>> Vlad >> > >>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote: >> > >>>>>>>>>>> >> > >>>>>>>>>>> Hi David, >> > >>>>>>>>>>> >> > >>>>>>>>>>> Actually, I was thinking of another API class called >> > >>>>>>>>>>> ControlTuple, >> > >>>>>>>>>>> >> > >>>>>>>>>>>> different from the actual tuple class in buffer server or >> > stram. >> > >>>>>>>>>>>> This could serve as a way for the Buffer server publisher >> to >> > >>>>>>>>>>>> understand >> > >>>>>>>>>>>> that it is a control tuple and needs to be wrapped >> > differently. >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> ~ Bhupesh >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <[email protected]> >> > wrote: >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> // DefaultInputPort >> > >>>>>>>>>>>> public void processControl(ControlTuple tuple) >> > >>>>>>>>>>>> { >> > >>>>>>>>>>>> // Default Implementation to avoid need to >> implement >> > >>>>>>>>>>>> it in >> > >>>>>>>>>>>> all >> > >>>>>>>>>>>> implementations >> > >>>>>>>>>>>> } >> > >>>>>>>>>>>> {code} >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> {code} >> > >>>>>>>>>>>> // DefaultOutputPort >> > >>>>>>>>>>>> public void emitControl(ControlTuple tuple) >> > >>>>>>>>>>>> { >> > >>>>>>>>>>>> } >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> I think we don't need to expose the ControlTuple class to >> the >> > >>>>>>>>>>>> operator >> > >>>>>>>>>>>> developers because the window ID is just the current >> window ID >> > >>>>>>>>>>>> when >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> these >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> methods are called. How about making them just Object? We >> also >> > >>>>>>>>>>> >> > >>>>>>>>>> need to >> > >>>>>>>>>> >> > >>>>>>>>>> provide the way for the user to specify custom serializer for >> > the >> > >>>>>>>>>> >> > >>>>>>>>>>> control >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> tuple. >> > >>>>>>>>>>> >> > >>>>>>>>>> David >> > >>>>>>>>>> >> > >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda < >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> [email protected] >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> wrote: >> > >>>>>>>>>>> >> > >>>>>>>>>> Hi All, >> > >>>>>>>>>> >> > >>>>>>>>>>> Here are the initial interfaces: >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> {code} >> > >>>>>>>>>>>>> // DefaultInputPort >> > >>>>>>>>>>>>> public void processControl(ControlTuple tuple) >> > >>>>>>>>>>>>> { >> > >>>>>>>>>>>>> // Default Implementation to avoid need to >> implement >> > >>>>>>>>>>>>> it >> > >>>>>>>>>>>>> in >> > >>>>>>>>>>>>> all >> > >>>>>>>>>>>>> implementations >> > >>>>>>>>>>>>> } >> > >>>>>>>>>>>>> {code} >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> {code} >> > >>>>>>>>>>>>> // DefaultOutputPort >> > >>>>>>>>>>>>> public void emitControl(ControlTuple tuple) >> > >>>>>>>>>>>>> { >> > >>>>>>>>>>>>> } >> > >>>>>>>>>>>>> {code} >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> We have an option to add these methods to the interfaces - >> > >>>>>>>>>>>>> InputPort >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> and >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> OutputPort; But these would not be backward compatible and >> > also >> > >>>>>>>>>>>> >> > >>>>>>>>>>> not >> > >>>>>>>>>>> consistent with the current implementation of basic data >> tuple >> > >>>>>>>>>>> flow >> > >>>>>>>>>>> >> > >>>>>>>>>>> (as >> > >>>>>>>>>>>> with process() and emit()). >> > >>>>>>>>>>>> >> > >>>>>>>>>>> We also need to expose an interface / class for users to >> wrap >> > >>>>>>>>>>> their >> > >>>>>>>>>>> >> > >>>>>>>>>>> object >> > >>>>>>>>>>>> >> > >>>>>>>>>>>>> and emit downstream. This should be part of API. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> {code} >> > >>>>>>>>>>>>> public class ControlTuple extends Tuple >> > >>>>>>>>>>>>> { >> > >>>>>>>>>>>>> Object userObject; >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> public ControlTuple(long windowId, Object >> userObject) >> > >>>>>>>>>>>>> { >> > >>>>>>>>>>>>> // >> > >>>>>>>>>>>>> } >> > >>>>>>>>>>>>> } >> > >>>>>>>>>>>>> {code} >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> The emitted tuples would traverse the same flow as with >> other >> > >>>>>>>>>>>>> control >> > >>>>>>>>>>>>> tuples. The plan is to intercept the control tuples in >> > >>>>>>>>>>>>> GenericNode >> > >>>>>>>>>>>>> and >> > >>>>>>>>>>>>> use >> > >>>>>>>>>>>>> the Reservior to emit the control tuples at the end of the >> > >>>>>>>>>>>>> window. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> GenericNode seems to be the best place to buffer incoming >> > >>>>>>>>>>>>> custom >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> control >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> tuples without delivering them immediately to the operator >> > >>>>>>>>>>>> port. >> > >>>>>>>>>>>> >> > >>>>>>>>>>> Once >> > >>>>>>>>>>> the >> > >>>>>>>>>>> end of the window is reached, we plan to use the reservoir >> sink >> > >>>>>>>>>>> to >> > >>>>>>>>>>> push >> > >>>>>>>>>>> them to the port. This is different behavior than any other >> > >>>>>>>>>>> control >> > >>>>>>>>>>> tuple >> > >>>>>>>>>>> where we are changing the order of tuples in the stream. The >> > >>>>>>>>>>> custom >> > >>>>>>>>>>> control >> > >>>>>>>>>>> >> > >>>>>>>>>>> tuples will be buffered and not delivered to the ports until >> > the >> > >>>>>>>>>>>> end >> > >>>>>>>>>>>> of >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> the >> > >>>>>>>>>>>> >> > >>>>>>>>>>> window. >> > >>>>>>>>>>> >> > >>>>>>>>>>> To accomplish this, we need to have a public method in >> > >>>>>>>>>>>> >> > >>>>>>>>>>>>> SweepableReservoir >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> which allows to put a tuple back in the sink of the >> > reservoir. >> > >>>>>>>>>>>> >> > >>>>>>>>>>> ~ Bhupesh >> > >>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> >> > > >> > >> >
