Currently in a clustered broker, the object ids used for management
objects are not uniform across all nodes. This means that execution of
management methods does not reliably occur on all nodes (if the id on
which the method is invoked is for another object than intended on other
nodes).
The root cause of this is the fact that the state dump process creates
some management objects and as the traffic for that connection is not
replicated these only exist on the joining node. The id allocation uses
a sequential id, so these extra objects that only consume ids on a
joining member skews the allocation.
Attached is a patch that fixes this. Its a bit of a hack, just to show
the concept. I now plan to try and clean it up a little (essentially
avoiding the special 'qpid-dump' strings everywhere). However I thought
it would be worth sharing in case there are other issues or better
ideas. (Alan, Ted I'm particularly keen on your viewpoints).
Index: src/qpid/cluster/DumpClient.cpp
===================================================================
--- src/qpid/cluster/DumpClient.cpp (revision 736978)
+++ src/qpid/cluster/DumpClient.cpp (working copy)
@@ -93,7 +93,7 @@
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail)
{
- connection.open(url);
+ connection.open(url, "", "", "qpid-dump");
session = connection.newSession("dump_shared");
}
@@ -206,7 +206,7 @@
broker::Connection& bc = dumpConnection->getBrokerConnection();
// FIXME aconway 2008-10-20: What authentication info to use on reconnect?
- shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
+ shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "qpid-dump"/*vhost*/, bc.getFrameMax());
bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
dumpConnection->getId().getMember(),
Index: src/qpid/broker/Exchange.cpp
===================================================================
--- src/qpid/broker/Exchange.cpp (revision 736978)
+++ src/qpid/broker/Exchange.cpp (working copy)
@@ -53,6 +53,8 @@
}
+
+
Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
if (parent){
if (parent->sequence || parent->ive) parent->sequenceLock.lock();
@@ -88,7 +90,7 @@
if (parent != 0)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- if (agent != 0)
+ if (agent != 0 && name.find("qpid-dump") == std::string::npos)
{
mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable);
agent->addObject (mgmtExchange);
@@ -104,7 +106,7 @@
if (parent != 0)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- if (agent != 0)
+ if (agent != 0 && name.find("qpid-dump") == std::string::npos)
{
mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable);
mgmtExchange->set_arguments(args);
Index: src/qpid/broker/SessionState.cpp
===================================================================
--- src/qpid/broker/SessionState.cpp (revision 736978)
+++ src/qpid/broker/SessionState.cpp (working copy)
@@ -59,7 +59,7 @@
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- if (agent != 0) {
+ if (agent != 0 && id.getName().find("dump_shared") == std::string::npos) {
mgmtObject = new _qmf::Session
(agent, this, parent, getId().getName());
mgmtObject->set_attached (0);
@@ -108,7 +108,7 @@
void SessionState::attach(SessionHandler& h) {
QPID_LOG(debug, getId() << ": attached on broker.");
handler = &h;
- if (mgmtObject != 0)
+ if (mgmtObject != 0 && h.getConnection().GetManagementObject())
{
mgmtObject->set_attached (1);
mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
Index: src/qpid/broker/Queue.cpp
===================================================================
--- src/qpid/broker/Queue.cpp (revision 736978)
+++ src/qpid/broker/Queue.cpp (working copy)
@@ -97,7 +97,7 @@
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- if (agent != 0)
+ if (agent != 0 && name.find("qpid-dump") == std::string::npos)
{
mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0);
Index: src/qpid/broker/DirectExchange.cpp
===================================================================
--- src/qpid/broker/DirectExchange.cpp (revision 736978)
+++ src/qpid/broker/DirectExchange.cpp (working copy)
@@ -78,9 +78,10 @@
if (bk.queues.add_unless(b, MatchQueue(queue))) {
propagate = bk.fedBinding.addOrigin(fedOrigin);
- if (mgmtExchange != 0) {
+ _qmf::Queue* mgmtQueue = (_qmf::Queue*) queue->GetManagementObject();
+ if (mgmtExchange != 0 && mgmtQueue != 0) {
mgmtExchange->inc_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ mgmtQueue->inc_bindingCount();
}
} else {
return false;
Index: src/qpid/broker/ConnectionHandler.cpp
===================================================================
--- src/qpid/broker/ConnectionHandler.cpp (revision 736978)
+++ src/qpid/broker/ConnectionHandler.cpp (working copy)
@@ -165,9 +165,10 @@
connection.setHeartbeatInterval(heartbeat);
}
-void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
+void ConnectionHandler::Handler::open(const string& virtualHost,
const framing::Array& /*capabilities*/, bool /*insist*/)
{
+ connection.setVirtualHost(virtualHost);
std::vector<Url> urls = connection.broker.getKnownBrokers();
framing::Array array(0x95); // str16 array
for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i)
@@ -179,6 +180,7 @@
std::auto_ptr<SecurityLayer> sl = authenticator->getSecurityLayer(connection.getFrameMax());
if (sl.get()) secured->activateSecurityLayer(sl);
}
+ connection.opened();
}
Index: src/qpid/broker/Connection.h
===================================================================
--- src/qpid/broker/Connection.h (revision 736978)
+++ src/qpid/broker/Connection.h (working copy)
@@ -67,6 +67,9 @@
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
~Connection ();
+ /** Called by ConnectionHandler when the connection handshaking has completed */
+ void opened();
+
/** Get the SessionHandler for channel. Create if it does not already exist */
SessionHandler& getChannel(framing::ChannelId channel);
Index: src/qpid/broker/ConnectionState.h
===================================================================
--- src/qpid/broker/ConnectionState.h (revision 736978)
+++ src/qpid/broker/ConnectionState.h (working copy)
@@ -25,6 +25,7 @@
#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/ConnectionOutputHandlerPtr.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/management/Manageable.h"
#include "qpid/Url.h"
@@ -56,11 +57,13 @@
uint16_t getHeartbeat() const { return heartbeat; }
uint16_t getHeartbeatMax() const { return heartbeatmax; }
uint64_t getStagingThreshold() const { return stagingThreshold; }
+ const std::string& getVirtualHost() { return virtualHost; }
- void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
+ void setFrameMax(uint32_t fm) { framemax = fm > qpid::framing::AMQFrame::frameOverhead() ? fm : (uint32_t) 4096; }
void setHeartbeat(uint16_t hb) { heartbeat = hb; }
void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
+ void setVirtualHost(const std::string& vh) { virtualHost = vh; }
virtual void setUserId(const string& uid) { userId = uid; }
const string& getUserId() const { return userId; }
@@ -98,6 +101,7 @@
bool federationLink;
string federationPeerTag;
std::vector<Url> knownHosts;
+ std::string virtualHost;
};
}}
Index: src/qpid/broker/Connection.cpp
===================================================================
--- src/qpid/broker/Connection.cpp (revision 736978)
+++ src/qpid/broker/Connection.cpp (working copy)
@@ -59,20 +59,27 @@
agent(0),
timer(broker_.getTimer())
{
- Manageable* parent = broker.GetVhostObject();
-
- if (isLink)
+ if (isLink) {
links.notifyConnection(mgmtId, this);
+ //consider outgoing links opened at this point; for incoming
+ //connections wait until connection.open is received
+ opened();
+ }
+}
+void Connection::opened()
+{
+ Manageable* parent = broker.GetVhostObject();
if (parent != 0)
{
agent = ManagementAgent::Singleton::getInstance();
// TODO set last bool true if system connection
- if (agent != 0)
+ if (agent != 0 && getVirtualHost().find("qpid-dump") == std::string::npos) {
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
- agent->addObject(mgmtObject);
+ agent->addObject(mgmtObject);
+ }
ConnectionState::setUrl(mgmtId);
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]