Hi, When there are certain publishers which does send messages to the message broker extensively, and the consumers are relatively slower, there is high possibility that the server might go to inconsistent ( Due to TimeoutExceptions at the time of writing data into Cassandra) or out of memory state due to the load. Therefore we have identified in several recent scenarios that it is needed to introduce publisher flow control into WSO2 Message Broker, where the extensive publisher clients needs to be temporarily blocked till the already accepted messages are successfully persisted into the Cassandra storage.
With the current MB architecture flow control can be applied into the broker in following two ways. 1. By using Channel.Flow command in AMQP 0-9-1 specification. 2. By applying TCP Backpressure for connection throttling in Mina. According to the AMQP spec, "Flow control is an emergency procedure used to halt the flow of messages from a peer. It works in the same way between client and server and is implemented by the Channel.Flow command. Flow control is the only mechanism that can stop an over-producing publisher. " Hence with this whenever there is a exception occurs at the time of adding message content, meta data etc. into the Cassandra CFs, we are sending a ChannelFlowBoday(boolean active) into the publishing client, to inform that broker is activating flow control. The client session checks whether session is flow controlled at the time of publishing next message, and if flow control is enforced it will wait on publishing for a specified time ( 5s currently) and retry back. After retrying for 12 times the client will throw an exception stating that message publishing cannot be continued due to flow control. To remove the broker level flow control, we continuously try to persist the already accepted messages into Cassandra and once that is done a ChannelFlowBoday frame will be resent with active=false command, where the blocked session can continue publishing here after. This is applied in per channel level. Regarding applying TCP backpressure in Mina level for throttling, when there are extensive packets receiving, we stop reading from that socket temporarily till the load is reduced. I have asked on how this can be done in Mina 1.1.7 in the Apache Mina mail list [1] and according to the response ConnectionThrottleFilter can be used for this. I am going to implement this into MB and will be updating the progress here. We are going to add this [2] into the next releasing version of WSO2 Message Broker. [1] http://www.mail-archive.com/[email protected]/msg22844.html [2] https://wso2.org/jira/browse/MB-427 Thanks! Ishara -- Ishara Premasada Software Engineer, WSO2 Inc. http://wso2.com/ *Blog : http://isharapremadasa.blogspot.com/ <http://isharapremadasa.blogspot.com/>Twitter : https://twitter.com/ishadil <https://twitter.com/ishadil>Mobile : +94 714445832*
_______________________________________________ Architecture mailing list [email protected] https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
