Currently in a clustered broker, the object ids used for management objects are not uniform across all nodes. This means that execution of management methods does not reliably occur on all nodes (if the id on which the method is invoked is for another object than intended on other nodes).

The root cause of this is the fact that the state dump process creates some management objects and as the traffic for that connection is not replicated these only exist on the joining node. The id allocation uses a sequential id, so these extra objects that only consume ids on a joining member skews the allocation.

Attached is a patch that fixes this. Its a bit of a hack, just to show the concept. I now plan to try and clean it up a little (essentially avoiding the special 'qpid-dump' strings everywhere). However I thought it would be worth sharing in case there are other issues or better ideas. (Alan, Ted I'm particularly keen on your viewpoints).

Index: src/qpid/cluster/DumpClient.cpp
===================================================================
--- src/qpid/cluster/DumpClient.cpp	(revision 736978)
+++ src/qpid/cluster/DumpClient.cpp	(working copy)
@@ -93,7 +93,7 @@
       connection(catchUpConnection()), shadowConnection(catchUpConnection()),
       done(ok), failed(fail)
 {
-    connection.open(url);
+    connection.open(url, "", "", "qpid-dump");
     session = connection.newSession("dump_shared");
 }
 
@@ -206,7 +206,7 @@
 
     broker::Connection& bc = dumpConnection->getBrokerConnection();
     // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
-    shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
+    shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "qpid-dump"/*vhost*/, bc.getFrameMax());
     bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
     ClusterConnectionProxy(shadowConnection).shadowReady(
         dumpConnection->getId().getMember(),
Index: src/qpid/broker/Exchange.cpp
===================================================================
--- src/qpid/broker/Exchange.cpp	(revision 736978)
+++ src/qpid/broker/Exchange.cpp	(working copy)
@@ -53,6 +53,8 @@
 }
 
 
+
+
 Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
     if (parent){
         if (parent->sequence || parent->ive) parent->sequenceLock.lock();
@@ -88,7 +90,7 @@
     if (parent != 0)
     {
         ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-        if (agent != 0)
+        if (agent != 0 && name.find("qpid-dump") == std::string::npos)
         {
             mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable);
             agent->addObject (mgmtExchange);
@@ -104,7 +106,7 @@
     if (parent != 0)
     {
         ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-        if (agent != 0)
+        if (agent != 0 && name.find("qpid-dump") == std::string::npos)
         {
             mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable);
             mgmtExchange->set_arguments(args);
Index: src/qpid/broker/SessionState.cpp
===================================================================
--- src/qpid/broker/SessionState.cpp	(revision 736978)
+++ src/qpid/broker/SessionState.cpp	(working copy)
@@ -59,7 +59,7 @@
     Manageable* parent = broker.GetVhostObject ();
     if (parent != 0) {
         ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-        if (agent != 0) {
+        if (agent != 0 && id.getName().find("dump_shared") == std::string::npos) {
             mgmtObject = new _qmf::Session
                 (agent, this, parent, getId().getName());
             mgmtObject->set_attached (0);
@@ -108,7 +108,7 @@
 void SessionState::attach(SessionHandler& h) {
     QPID_LOG(debug, getId() << ": attached on broker.");
     handler = &h;
-    if (mgmtObject != 0)
+    if (mgmtObject != 0 && h.getConnection().GetManagementObject())
     {
         mgmtObject->set_attached (1);
         mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
Index: src/qpid/broker/Queue.cpp
===================================================================
--- src/qpid/broker/Queue.cpp	(revision 736978)
+++ src/qpid/broker/Queue.cpp	(working copy)
@@ -97,7 +97,7 @@
     {
         ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
 
-        if (agent != 0)
+        if (agent != 0 && name.find("qpid-dump") == std::string::npos)
         {
             mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0);
 
Index: src/qpid/broker/DirectExchange.cpp
===================================================================
--- src/qpid/broker/DirectExchange.cpp	(revision 736978)
+++ src/qpid/broker/DirectExchange.cpp	(working copy)
@@ -78,9 +78,10 @@
 
         if (bk.queues.add_unless(b, MatchQueue(queue))) {
             propagate = bk.fedBinding.addOrigin(fedOrigin);
-            if (mgmtExchange != 0) {
+            _qmf::Queue* mgmtQueue = (_qmf::Queue*) queue->GetManagementObject();
+            if (mgmtExchange != 0 && mgmtQueue != 0) {
                 mgmtExchange->inc_bindingCount();
-                ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+                mgmtQueue->inc_bindingCount();
             }
         } else {
             return false;
Index: src/qpid/broker/ConnectionHandler.cpp
===================================================================
--- src/qpid/broker/ConnectionHandler.cpp	(revision 736978)
+++ src/qpid/broker/ConnectionHandler.cpp	(working copy)
@@ -165,9 +165,10 @@
     connection.setHeartbeatInterval(heartbeat);
 }
 
-void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
+void ConnectionHandler::Handler::open(const string& virtualHost,
                                       const framing::Array& /*capabilities*/, bool /*insist*/)
 {
+    connection.setVirtualHost(virtualHost);
     std::vector<Url> urls = connection.broker.getKnownBrokers();
     framing::Array array(0x95); // str16 array
     for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i) 
@@ -179,6 +180,7 @@
         std::auto_ptr<SecurityLayer> sl = authenticator->getSecurityLayer(connection.getFrameMax());
         if (sl.get()) secured->activateSecurityLayer(sl);
     }
+    connection.opened();
 }
 
         
Index: src/qpid/broker/Connection.h
===================================================================
--- src/qpid/broker/Connection.h	(revision 736978)
+++ src/qpid/broker/Connection.h	(working copy)
@@ -67,6 +67,9 @@
     Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
     ~Connection ();
 
+    /** Called by ConnectionHandler when the connection handshaking has completed */
+    void opened();
+
     /** Get the SessionHandler for channel. Create if it does not already exist */
     SessionHandler& getChannel(framing::ChannelId channel);
 
Index: src/qpid/broker/ConnectionState.h
===================================================================
--- src/qpid/broker/ConnectionState.h	(revision 736978)
+++ src/qpid/broker/ConnectionState.h	(working copy)
@@ -25,6 +25,7 @@
 
 #include "qpid/sys/AggregateOutput.h"
 #include "qpid/sys/ConnectionOutputHandlerPtr.h"
+#include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/Url.h"
@@ -56,11 +57,13 @@
     uint16_t getHeartbeat() const { return heartbeat; }
     uint16_t getHeartbeatMax() const { return heartbeatmax; }
     uint64_t getStagingThreshold() const { return stagingThreshold; }
+    const std::string& getVirtualHost() { return virtualHost; }
 
-    void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
+    void setFrameMax(uint32_t fm) { framemax = fm > qpid::framing::AMQFrame::frameOverhead() ? fm : (uint32_t) 4096; }
     void setHeartbeat(uint16_t hb) { heartbeat = hb; }
     void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
     void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
+    void setVirtualHost(const std::string& vh) { virtualHost = vh; }
 
     virtual void setUserId(const string& uid) {  userId = uid; }
     const string& getUserId() const { return userId; }
@@ -98,6 +101,7 @@
     bool federationLink;
     string federationPeerTag;
     std::vector<Url> knownHosts;
+    std::string virtualHost;
 };
 
 }}
Index: src/qpid/broker/Connection.cpp
===================================================================
--- src/qpid/broker/Connection.cpp	(revision 736978)
+++ src/qpid/broker/Connection.cpp	(working copy)
@@ -59,20 +59,27 @@
     agent(0),
     timer(broker_.getTimer())
 {
-    Manageable* parent = broker.GetVhostObject();
-
-    if (isLink)
+    if (isLink) {
         links.notifyConnection(mgmtId, this);
+        //consider outgoing links opened at this point; for incoming
+        //connections wait until connection.open is received
+        opened();
+    }
+}
 
+void Connection::opened()
+{
+    Manageable* parent = broker.GetVhostObject();
     if (parent != 0)
     {
         agent = ManagementAgent::Singleton::getInstance();
 		
 		
         // TODO set last bool true if system connection
-        if (agent != 0)
+        if (agent != 0 && getVirtualHost().find("qpid-dump") == std::string::npos) {
             mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
-        agent->addObject(mgmtObject);
+            agent->addObject(mgmtObject);
+        }
         ConnectionState::setUrl(mgmtId);
     }
 }

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to