Hi Sinthuja, +1 for making an asynchronous defineStream() method as well as keeping a synchronous version. Would it make sense to keep the defineStream() method as it is and make the new method defineStreamAsync() or similar? Am suggesting this because AFAIK, some current implementations depend on defineStream() being synchronous and all those places might have to be refactored if the change is introduced this way.
Regarding the publish() method, can you give an example for a situation where the streamId is not streamIdCache as well as stream definition not being in streamDefnCache? I mean will it be a common situation? If this happens in the case where the stream is not yet defined, can't we drop the events at client side without calling the findStream() method? Also, if we are going with approach#2, will there be a buffer at receiver side as well, or will the events be dropped immediately? Thanks, Lasantha On 17 January 2014 18:08, Sinthuja Ragendran <[email protected]> wrote: > Hi all, > > I'm in the process of merging the changes of AsyncDataPublisher with > DataPublisher and hence make our DataPublisher do asynchronous calls on > main operations; not only publish but also in connecting to receiver, > define stream, etc. This effort is for improve performance in client side > and reduce the complexity to chose between what data publisher to use in > users' perspective. > > The below shown is the expected bahaviour of the re-factored asynchronous > DataPublisher. > > API Synchronous Data Publisher Refactored Datapublisher (Async) > public DataPublisher(receiverURL, username, password); > > Connects to the receiver during the object instantiation time. It's > blocking call, and if data receiver is down it halt until it timeout. > > > Connects to receiver asynchronously, and during the instantiation it > obtains the details and connect to the receiver in separate worker thread. > Therefore regardless of data receiver is up or down, it won't affect the > client and the invocation thread immediately returns back. > > public String defineStream(StreamDefinition streamDefinition) > > > > > > > It's also blocking call, which halts until the stream is being defined > successfully. If we consider the BAM scenario, it'll be halted until it > save the stream definition in to cassandra. And if cassandra is > down/slow/network latency affects the client side. But the success based on > successful return of stream id OR failure based on > DifferentStreamDefinitionAlreadyDefinedException, > MalformedStreamDefinitionException, etc will be known to the client during > the invocation since it's synchronous call. > It adds the stream definition to the streamDefnCache and returns the > generated streamId based on stream name and version to invocation. During > the defineStream() invocation time, the stream wouldn't have actually > defined and it'll be only stored in the cache. It'll actually define the > stream, once the connection to the receiver was made successfully, and if > any event was triggered to publish for this stream. Hence client won't be > blocked at any casue during this, but always it'll be success and will not > be able propagate the exception since it's asynchronous. > > public String defineStreamNow(StreamDefinition streamDefinition) Not > applicable > > New method to do the defineStream() synchronous. Hence if the user want to > define stream as blocking call then they can use this. > > public void publish(Event event) > > > > > > > > > > > > > > > > > > > > This is already asynchronouse. Invocation thread will put the event into > the queue and it'll return. Queue worker thread will operate on queue and > publish the events. > > > > > > > > > > > > > > > > > > > Same as default behaviour here also asynchronous with additional > functionalities. Here the events will be buffered if the connection is > lost/ not established, and it'll automatically reconnect if the data > receiver is available back and send the buffered events which was collected > during that time period. Let say publish was called before the connection > is established to the receiver, then the events will be buffered (The > buffersize is configurable) and will be pusblished by a worker thread once > the connection is established. Also before actually publishing a event, we > need to make sure we have defined the stream. There are two caches used > here; streamDefnCache and streamIdCache. streamDefnCache will be having the > stream defintions which should be defined, and streamIdCache will have > streamIds after actually defining/saving the stream defition with receiver. > > > > > * The worker thread will first check whether streamId is existing in the > streamIdCache and if it's existing it'll just call publish the event. If > it's not existing in the streamIdCache, then it'll check whether there is > any stream needs to be defined before publishing the event from > streamDefnCache and if it's there it'll define the stream and remove the > streamDefintion from the streamDefnCache and update the streamId in the > streamIdCache. Here we need to decide what we are going to if the streamId > is not there in streamIdCache, stream defintion is also not there in the > streamDefnCache. The below are the possibilities,1) findStream() to check > whether it's already defined. If success send the events, else drop the > events in the client side it self. (Pros: Reduce unnecessary traffic in > receiver side for wrong events, Cons: findStream() is also expesive call > and untill the stream is defined all the events are going will undergo this > check)2) publish the events regardless of anything, and it'll dropped in > after reaching receiver and validating it before processing.* > public String findStream(String name, String version) Retrieves the > streamId for a stream. > Deprecating this API since streamId is combination streamname and version. > > public static validateStreamDefinition(String streamDefn) Not applicable > This will be a synchronous call to validate the stream definition string > and whether there is already stream defined with same name, version. > > > Above are the major changes in the behaviour of the APIs. And also the > highlighted part in the publish() needs to be decided. I appreciate any > feedback on above. > > Thanks, > Sinthuja > > > -- > *Sinthuja Rajendran* > Software Engineer <http://wso2.com/> > WSO2, Inc.:http://wso2.com > > Blog: http://sinthu-rajan.blogspot.com/ > Mobile: +94774273955 > > > > _______________________________________________ > Architecture mailing list > [email protected] > https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture > > -- *Lasantha Fernando* Software Engineer - Data Technologies Team WSO2 Inc. http://wso2.com email: [email protected] mobile: (+94) 71 5247551
_______________________________________________ Architecture mailing list [email protected] https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
