Hi lasantha,
On Sun, Jan 19, 2014 at 2:16 AM, Lasantha Fernando <[email protected]>wrote: > Hi Sinthuja, > > Thanks for the explanations. > > > On 17 January 2014 21:16, Sinthuja Ragendran <[email protected]> wrote: > >> Hi lasantha, >> >> Thanks for your suggestions. Please see my comments inline. >> >> >> On Friday, January 17, 2014, Lasantha Fernando <[email protected]> wrote: >> >>> Hi Sinthuja, >>> >>> +1 for making an asynchronous defineStream() method as well as keeping a >>> synchronous version. Would it make sense to keep the defineStream() method >>> as it is and make the new method defineStreamAsync() or similar? Am >>> suggesting this because AFAIK, some current implementations depend on >>> defineStream() being synchronous and all those places might have to be >>> refactored if the change is introduced this way. >>> >> >> IMO we should have the default behavior as asynchronous, if the user want >> synchronous call then explicitly mention them as defineStreamNow(). Here >> I'm hoping to have the method signatures as it's, hence we don't need to >> change the current data agent's code. >> > > AFAIR, there were some places where the stream is defined when the first > event comes and those methods depended on the definition of the stream > happening immediately. We might have to check those code segments and fix > them if the defineStream() behaviour is changed from synchronous to > asynchronous. > Yes, during this effort it will be handled. To defining the stream when the first event comes in the publisher worker thread will be changed to defineStreamNow() rather defineStream(). > > >>> Regarding the publish() method, can you give an example for a situation >>> where the streamId is not streamIdCache as well as stream definition not >>> being in streamDefnCache? I mean will it be a common situation? >>> >> >> It's not common, but there can be a situation like that. For example, >> user could have already defined the stream may be with different data >> publisher and then now publish the events for that stream without >> defineStream() with current data publisher. In that case we won't have >> streamID in streamIdCache and streamDefn won't be existing in >> streamDefnCache. But the incoming event is valid and should reach the >> receiver as the stream has been already defined. >> > > Another small question... How will the streamDefnCache and streamIdCache > of a particular data publisher be eventually updated regarding a stream > definition that was defined in a different data publisher? > If we go with #1 approach, with findStream() we can find the relevant streamId and update the cache. #1 approach is viable when the scenario follows the correct path (ie, Already stream is defined and publishing) as it can't reduce the unnecessary traffic at receiver. Let say a scenario due to some reason the stream is not defined (may be user fault), then it'll make findStream() for every events which makes it slow. This won't affect the invocation client thread blocking as it's asynchronouse, but there is a high possibility of buffer becomes full and events may get dropped. Note this problem only happens in the false path and success path will work fine as I explained above. If we go with #2 approach, where in the receiver needs to validate the success and failure events. IMO also #2 approach is better, but it also has it's cons as it'll fail after reaching the receiver and increase the processing of unnecessary events. Thanks, Sinthuja. > > Anyway, to me, approach#2 feels better. IMHO, the responsibility of > filtering and dropping out invalid events should be at the receiver end. > > Thanks, > Lasantha > > >>> If this happens in the case where the stream is not yet defined, can't >>> we drop the events at client side without calling the findStream() method? >>> >> >> If the stream has been already defined as explained above, then we can't >> drop the event. >> >>> >>> Also, if we are going with approach#2, will there be a buffer at >>> receiver side as well, or will the events be dropped immediately? >>> >> >> No, there is a buffer in receiver also. And during the event formatting >> time, it'll fail if the stream is not defined. So after some processing >> it'll drop, not immediately. >> >> Thanks, >> Sinthuja >> >>> >>> Thanks, >>> Lasantha >>> >>> >>> On 17 January 2014 18:08, Sinthuja Ragendran <[email protected]> wrote: >>> >>>> Hi all, >>>> >>>> I'm in the process of merging the changes of AsyncDataPublisher with >>>> DataPublisher and hence make our DataPublisher do asynchronous calls on >>>> main operations; not only publish but also in connecting to receiver, >>>> define stream, etc. This effort is for improve performance in client side >>>> and reduce the complexity to chose between what data publisher to use in >>>> users' perspective. >>>> >>>> The below shown is the expected bahaviour of the re-factored >>>> asynchronous DataPublisher. >>>> >>>> API Synchronous Data Publisher Refactored Datapublisher (Async) >>>> public DataPublisher(receiverURL, username, password); >>>> >>>> Connects to the receiver during the object instantiation time. It's >>>> blocking call, and if data receiver is down it halt until it timeout. >>>> >>>> >>>> Connects to receiver asynchronously, and during the instantiation it >>>> obtains the details and connect to the receiver in separate worker thread. >>>> Therefore regardless of data receiver is up or down, it won't affect the >>>> client and the invocation thread immediately returns back. >>>> >>>> public String defineStream(StreamDefinition streamDefinition) >>>> >>>> >>>> >>>> >>>> >>>> >>>> It's also blocking call, which halts until the stream is being defined >>>> successfully. If we consider the BAM scenario, it'll be halted until it >>>> save the stream definition in to cassandra. And if cassandra is >>>> down/slow/network latency affects the client side. But the success based on >>>> successful return of stream id OR failure based on >>>> DifferentStreamDefinitionAlreadyDefinedException, >>>> MalformedStreamDefinitionException, etc will be known to the client during >>>> the invocation since it's synchronous call. >>>> It adds the stream definition to the streamDefnCache and returns the >>>> generated streamId based on stream name and version to invocation. During >>>> the defineStream() invocation time, the stream wouldn't have actually >>>> defined and it'll be only stored in the cache. It'll actually define the >>>> stream, once the connection to the receiver was made successfully, and if >>>> any event was triggered to publish for this stream. Hence client won't be >>>> blocked at any casue during this, but always it'll be success and will not >>>> be able propagate the exception since it's asynchronous. >>>> >>>> _______________________________________________ >>>> Architecture mailing list >>>> [email protected] >>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture >>>> >>>> >>> >>> >>> -- >>> *Lasantha Fernando* >>> Software Engineer - Data Technologies Team >>> WSO2 Inc. http://wso2.com >>> >>> email: [email protected] >>> mobile: (+94) 71 5247551 >>> >> >> >> -- >> Sent from iPhone >> >> _______________________________________________ >> Architecture mailing list >> [email protected] >> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture >> >> > > > -- > *Lasantha Fernando* > Software Engineer - Data Technologies Team > WSO2 Inc. http://wso2.com > > email: [email protected] > mobile: (+94) 71 5247551 > > _______________________________________________ > Architecture mailing list > [email protected] > https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture > > -- *Sinthuja Rajendran* Software Engineer <http://wso2.com/> WSO2, Inc.:http://wso2.com Blog: http://sinthu-rajan.blogspot.com/ Mobile: +94774273955
_______________________________________________ Architecture mailing list [email protected] https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
