On 06/28/2011 04:35 PM, Kerry Bonin wrote:
Irregardless of how it is accomplished, more control is needed over
failover to prevent network splits. The current model is
essentially to accept the broker as a single point of failure, or
deploy Linux clustering.
Without replication or persistence of the queue state broker failure
implies potential message loss. In the case of persistence the
availability of the messages is tied to the availability of a broker
using that store.
Assuming you can tolerate message loss, the issue is simply to ensure
that communication remains possible i.e. that producers and consumers
always use (or at least gravitate to) the same broker instance.
You could perhaps use a QMF based approach to this.
E.g. you could have an application that connected to all the brokers in
a list, kept track of their availability and retried periodically to
connect to unavailable brokers in the list. It would then control which
of these brokers was the 'primary' and would be able to close all other
connections on the other brokers using QMF commands.
Of course you would need to ensure that this application didn't itself
become a single point of failure. However it would be simple enough to
have a couple of redundant instances waiting to take over.
The one fly in the ointment at present is that a QMF close of a
connection will result in a connection exception on the associated
client rather than triggering failover. However adding another command,
abort say, that simply disconnected the client with no explicit
handshake would fix that (e.g. see attached patch).
Does this approach sound workable for you? The benefit is that it
doesn't require any client modification and would also provide a quite
valuable tool for centralised monitoring of general failover behaviour.
I'd recommend an ordered broker list with monitoring and automatic
fallback, subject to some flap mitigation rules. I also would expose
more client state, I understand the desire to hide stuff so people
don't use unsupported interfaces, but it is useful for
serviceability and diagnostics to have a library expose basic health
information.
Yes, I agree that being able to determine the remote peer address would
be valuable, as well as perhaps other aspects of retry such as time
since last connected etc. (You can at present determine whether you are
currently connected using Connection::isOpen()).
Out of honest curiosity, why don't you like callbacks? We've had to
use threading with spin loops to get around the lack of callbacks
for the messaging APIs, and we don't like having CPUs loads float
high under low load even if the spin loop load drops gracefully
under pressure, it feels inelegant and it raises power consumption.
Yes, I don't like the need to spin either. I want to expose a more
general notification system to avoid the need for any polling for
changes (including changes to failover/connection related state). I
prefer that to callbacks at the level the API is operating at. Callback
based approaches could then be built on top of this.
Out of curiosity, what is it you are spinning for?
Index: specs/management-schema.xml
===================================================================
--- specs/management-schema.xml (revision 1139306)
+++ specs/management-schema.xml (working copy)
@@ -271,6 +271,7 @@
<statistic name="msgsToClient" type="count64"/>
<method name="close"/>
+ <method name="abort"/>
</class>
<!--
Index: cpp/src/qpid/broker/Connection.h
===================================================================
--- cpp/src/qpid/broker/Connection.h (revision 1139306)
+++ cpp/src/qpid/broker/Connection.h (working copy)
@@ -172,6 +172,7 @@
ConnectionHandler adapter;
const bool isLink;
bool mgmtClosing;
+ bool mgmtAborting;
const std::string mgmtId;
sys::Mutex ioCallbackLock;
std::queue<boost::function0<void> > ioCallbacks;
Index: cpp/src/qpid/broker/Connection.cpp
===================================================================
--- cpp/src/qpid/broker/Connection.cpp (revision 1139306)
+++ cpp/src/qpid/broker/Connection.cpp (working copy)
@@ -92,6 +92,7 @@
adapter(*this, isLink_, shadow_),
isLink(isLink_),
mgmtClosing(false),
+ mgmtAborting(false),
mgmtId(mgmtId_),
mgmtObject(0),
links(broker_.getLinks()),
@@ -358,6 +359,8 @@
if (mgmtClosing) {
closed();
close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");
+ } else if (mgmtAborting) {
+ abort();
} else {
//then do other output as needed:
return outputTasks.doOutput();
@@ -409,6 +412,11 @@
out.activateOutput();
status = Manageable::STATUS_OK;
break;
+ case _qmf::Connection::METHOD_ABORT :
+ mgmtAborting = true;
+ out.activateOutput();
+ status = Manageable::STATUS_OK;
+ break;
}
return status;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]