Hi,

When there are certain publishers which does send messages to the message
broker extensively, and the consumers are relatively slower, there is high
possibility that the server might go to inconsistent ( Due to
TimeoutExceptions at the time of writing data into Cassandra) or out of
memory state due to the load. Therefore we have identified in several
recent scenarios that it is needed to introduce publisher flow control into
WSO2 Message Broker, where the extensive publisher clients needs to be
temporarily blocked till the already accepted messages are successfully
persisted into the Cassandra storage.

With the current MB architecture flow control can be applied into the
broker in following two ways.

1. By using Channel.Flow command in AMQP 0-9-1 specification.
2. By applying TCP Backpressure for connection throttling in Mina.

According to the AMQP spec, "Flow control is an emergency procedure used to
halt the flow of messages from a peer. It works in the same way between
client and server and is implemented by the Channel.Flow command. Flow
control is
the only mechanism that can stop an over-producing publisher. "

Hence with this whenever there is a exception occurs at the time of adding
message content, meta data etc. into the Cassandra CFs, we are sending a
ChannelFlowBoday(boolean active) into the publishing client, to inform that
broker is activating flow control. The client session checks whether
session is flow controlled at the time of publishing next message, and if
flow control is enforced it will wait on publishing for a specified time (
5s currently) and retry back. After retrying for 12 times the client will
throw an exception stating that message publishing cannot be continued due
to flow control. To remove the broker level flow control, we continuously
try to persist the already accepted messages into Cassandra and once that
is done a ChannelFlowBoday frame will be resent with active=false command,
where the blocked session can continue publishing here after. This is
applied in per channel level.

Regarding applying TCP backpressure in Mina level for throttling, when
there are extensive packets receiving, we stop reading from that socket
temporarily till the load is reduced. I have asked on how this can be done
in Mina 1.1.7 in the Apache Mina mail list [1] and according to the
response ConnectionThrottleFilter can be used for this. I am going to
implement this into MB and will be updating the progress here.

We are going to add this [2] into the next releasing version of WSO2
Message Broker.

[1] http://www.mail-archive.com/[email protected]/msg22844.html
[2] https://wso2.org/jira/browse/MB-427

Thanks!
Ishara

-- 
Ishara Premasada
Software Engineer,
WSO2 Inc. http://wso2.com/


*Blog   :  http://isharapremadasa.blogspot.com/
<http://isharapremadasa.blogspot.com/>Twitter       :
https://twitter.com/ishadil <https://twitter.com/ishadil>Mobile       : +94
714445832*
_______________________________________________
Architecture mailing list
[email protected]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture

Reply via email to