Author: aconway
Date: Thu Oct 11 19:23:30 2012
New Revision: 1397243

URL: http://svn.apache.org/viewvc?rev=1397243&view=rev
Log:
Bug 860701 - QPID-4350: HA handle auto-delete queues

Subscribed auto-delete queues are deleted by the backup.
Timed auto-delete queues are deleted after the timeout.

Modified:
    qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
    qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Event.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Event.cpp?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Event.cpp Thu Oct 11 
19:23:30 2012
@@ -35,8 +35,8 @@ using           qpid::management::Args;
 using           qpid::management::Mutex;
 using           std::string;
 
-string  Event/*MGEN:Event.NameCap*/::packageName  = string 
("/*MGEN:Event.NamePackageLower*/");
-string  Event/*MGEN:Event.NameCap*/::eventName    = string 
("/*MGEN:Event.Name*/");
+string  Event/*MGEN:Event.NameCap*/::PACKAGE_NAME  = string 
("/*MGEN:Event.NamePackageLower*/");
+string  Event/*MGEN:Event.NameCap*/::EVENT_NAME    = string 
("/*MGEN:Event.Name*/");
 uint8_t Event/*MGEN:Event.NameCap*/::md5Sum[16]   =
     {/*MGEN:Event.SchemaMD5*/};
 
@@ -54,7 +54,7 @@ namespace {
 
 void Event/*MGEN:Event.NameCap*/::registerSelf(ManagementAgent* agent)
 {
-    agent->registerEvent(packageName, eventName, md5Sum, writeSchema);
+    agent->registerEvent(PACKAGE_NAME, EVENT_NAME, md5Sum, writeSchema);
 }
 
 void Event/*MGEN:Event.NameCap*/::writeSchema (std::string& schema)
@@ -66,8 +66,8 @@ void Event/*MGEN:Event.NameCap*/::writeS
 
     // Schema class header:
     buf.putOctet       (CLASS_KIND_EVENT);
-    buf.putShortString (packageName); // Package Name
-    buf.putShortString (eventName);   // Event Name
+    buf.putShortString (PACKAGE_NAME); // Package Name
+    buf.putShortString (EVENT_NAME);   // Event Name
     buf.putBin128      (md5Sum);      // Schema Hash
     buf.putShort       (/*MGEN:Event.ArgCount*/); // Argument Count
 
@@ -102,5 +102,5 @@ void Event/*MGEN:Event.NameCap*/::mapEnc
 
 bool Event/*MGEN:Event.NameCap*/::match(const std::string& evt, const 
std::string& pkg)
 {
-    return eventName == evt && packageName == pkg;
+    return EVENT_NAME == evt && PACKAGE_NAME == pkg;
 }

Modified: qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Event.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Event.h?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Event.h (original)
+++ qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Event.h Thu Oct 11 
19:23:30 2012
@@ -33,21 +33,22 @@ QPID_BROKER_CLASS_EXTERN class Event/*MG
 {
   private:
     static void writeSchema (std::string& schema);
-    static std::string packageName;
-    static std::string eventName;
     static uint8_t md5Sum[MD5_LEN];
 
 /*MGEN:Event.ArgDeclarations*/
 
   public:
+    static std::string PACKAGE_NAME;
+    static std::string EVENT_NAME;
+
     writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; }
 
     QPID_BROKER_EXTERN 
Event/*MGEN:Event.NameCap*/(/*MGEN:Event.ConstructorArgs*/);
     QPID_BROKER_EXTERN ~Event/*MGEN:Event.NameCap*/() {};
 
     static void registerSelf(::qpid::management::ManagementAgent* agent);
-    std::string& getPackageName() const { return packageName; }
-    std::string& getEventName() const { return eventName; }
+    std::string& getPackageName() const { return PACKAGE_NAME; }
+    std::string& getEventName() const { return EVENT_NAME; }
     uint8_t* getMd5Sum() const { return md5Sum; }
     uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; }
     QPID_BROKER_EXTERN void encode(std::string& buffer) const;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Oct 11 19:23:30 2012
@@ -1072,6 +1072,8 @@ void Queue::destroyed()
     notifyDeleted();
     {
         Mutex::ScopedLock lock(messageLock);
+        for_each(observers.begin(), observers.end(),
+                 boost::bind(&QueueObserver::destroy, _1));
         observers.clear();
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h Thu Oct 11 19:23:30 2012
@@ -69,6 +69,7 @@ class QueueObserver
     virtual void requeued(const Message&) = 0;
     virtual void consumerAdded( const Consumer& ) {};
     virtual void consumerRemoved( const Consumer& ) {};
+    virtual void destroy() {};
  private:
 };
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Oct 11 19:23:30 
2012
@@ -23,6 +23,7 @@
 #include "QueueReplicator.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/ConnectionObserver.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueSettings.h"
 #include "qpid/broker/Link.h"
@@ -84,25 +85,27 @@ const string ARGS("args");
 const string ARGUMENTS("arguments");
 const string AUTODEL("autoDel");
 const string AUTODELETE("autoDelete");
-const string EXCL("excl");
-const string EXCLUSIVE("exclusive");
 const string BIND("bind");
-const string UNBIND("unbind");
 const string BINDING("binding");
+const string BINDING_KEY("bindingKey");
 const string CREATED("created");
 const string DISP("disp");
+const string DEST("dest");
 const string DURABLE("durable");
 const string EXCHANGE("exchange");
+const string EXCL("excl");
+const string EXCLUSIVE("exclusive");
 const string EXNAME("exName");
 const string EXTYPE("exType");
+const string HA_BROKER("habroker");
 const string KEY("key");
-const string BINDING_KEY("bindingKey");
 const string NAME("name");
+const string PARTIAL("partial");
 const string QNAME("qName");
 const string QUEUE("queue");
 const string TYPE("type");
-const string HA_BROKER("habroker");
-const string PARTIAL("partial");
+const string UNBIND("unbind");
+const string CONSUMER_COUNT("consumerCount");
 
 const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
 const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
@@ -122,10 +125,7 @@ const string QMF_DEFAULT_DIRECT("qmf.def
 const string _QUERY_REQUEST("_query_request");
 const string BROKER("broker");
 const string MEMBERS("members");
-
-template <class T> bool match(Variant::Map& schema) {
-    return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
-}
+const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
 
 void sendQuery(const string& packageName, const string& className, const 
string& queueName,
                SessionHandler& sessionHandler)
@@ -174,22 +174,78 @@ Variant::Map asMapVoid(const Variant& va
 }
 } // namespace
 
+// Listens for errors on the bridge session.
+class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
+  public:
+    ErrorListener(const std::string& lp, BrokerReplicator& br) :
+        logPrefix(lp), brokerReplicator(br) {}
+
+    void connectionException(framing::connection::CloseCode, const 
std::string& msg) {
+        QPID_LOG(error, logPrefix << "Connection error: " << msg);
+    }
+    void channelException(framing::session::DetachCode, const std::string& 
msg) {
+        QPID_LOG(error, logPrefix << "Channel error: " << msg);
+    }
+    void executionException(framing::execution::ErrorCode, const std::string& 
msg) {
+        QPID_LOG(error, logPrefix << "Execution error: " << msg);
+    }
+    void detach() {
+        QPID_LOG(debug, logPrefix << "Session detached.");
+    }
+
+  private:
+    std::string logPrefix;
+    BrokerReplicator& brokerReplicator;
+};
+
+class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver
+{
+  public:
+    ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {}
+    virtual void connection(Connection&) {}
+    virtual void opened(Connection&) {}
+
+    virtual void closed(Connection& c) {
+        if (brokerReplicator.link && &c == brokerReplicator.connection)
+            brokerReplicator.disconnected();
+    }
+    virtual void forced(Connection& c, const std::string& /*message*/) { 
closed(c); }
+  private:
+    BrokerReplicator& brokerReplicator;
+};
+
+template <class E> BrokerReplicator::EventKey eventKey() {
+    return make_pair(E::PACKAGE_NAME, E::EVENT_NAME);
+}
+
 BrokerReplicator::BrokerReplicator(HaBroker& hb, const 
boost::shared_ptr<Link>& l)
     : Exchange(QPID_CONFIGURATION_REPLICATOR),
       logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
       haBroker(hb), broker(hb.getBroker()), link(l),
       initialized(false),
       alternates(hb.getBroker().getExchanges()),
-      cleaner(*this)
+      cleaner(*this),
+      connection(0)
 {
+    broker.getConnectionObservers().add(
+        boost::shared_ptr<broker::ConnectionObserver>(new 
ConnectionObserver(*this)));
     getArgs().setString(QPID_REPLICATE, printable(NONE).str());
+
+    dispatch[eventKey<EventQueueDeclare>()] = 
&BrokerReplicator::doEventQueueDeclare;
+    dispatch[eventKey<EventQueueDelete>()] = 
&BrokerReplicator::doEventQueueDelete;
+    dispatch[eventKey<EventExchangeDeclare>()] = 
&BrokerReplicator::doEventExchangeDeclare;
+    dispatch[eventKey<EventExchangeDelete>()] = 
&BrokerReplicator::doEventExchangeDelete;
+    dispatch[eventKey<EventBind>()] = &BrokerReplicator::doEventBind;
+    dispatch[eventKey<EventUnbind>()] = &BrokerReplicator::doEventUnbind;
+    dispatch[eventKey<EventMembersUpdate>()] = 
&BrokerReplicator::doEventMembersUpdate;
+    dispatch[eventKey<EventSubscribe>()] = &BrokerReplicator::doEventSubscribe;
 }
 
 void BrokerReplicator::initialize() {
     // Can't do this in the constructor because we need a shared_ptr to this.
     types::Uuid uuid(true);
     const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + 
uuid.str());
-    broker.getLinks().declare(
+    std::pair<Bridge::shared_ptr, bool> result = broker.getLinks().declare(
         name,               // name for bridge
         *link,              // parent
         false,              // durable
@@ -206,13 +262,28 @@ void BrokerReplicator::initialize() {
         // calls are run.
         boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), 
_1, _2)
     );
+    result.first->setErrorListener(
+        boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
 }
 
 BrokerReplicator::~BrokerReplicator() { shutdown(); }
 
+namespace {
+void collectQueueReplicators(
+    const boost::shared_ptr<Exchange> ex, 
set<boost::shared_ptr<QueueReplicator> >& collect)
+{
+    boost::shared_ptr<QueueReplicator> 
qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+    if (qr) collect.insert(qr);
+}
+} // namespace
+
 void BrokerReplicator::shutdown() {
     QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down.");
-    broker.getQueues().eachQueue(boost::bind(&BrokerReplicator::deactivate, 
this, _1));
+    set<boost::shared_ptr<QueueReplicator> > collect;
+    broker.getExchanges().eachExchange(
+        boost::bind(&collectQueueReplicators, _1, boost::ref(collect)));
+    for_each(collect.begin(), collect.end(),
+             boost::bind(&QueueReplicator::deactivate, _1));
 }
 
 // This is called in the connection IO thread when the bridge is started.
@@ -221,7 +292,8 @@ void BrokerReplicator::initializeBridge(
     // exchanges etc. We know link->getConnection() is non-zero because we are
     // being called in the connections thread context.
     //
-    assert(link->getConnection());
+    connection = link->getConnection();
+    assert(connection);
     userId = link->getConnection()->getUserId();
     remoteHost = link->getConnection()->getUrl();
 
@@ -280,13 +352,9 @@ void BrokerReplicator::route(Deliverable
                 QPID_LOG(trace, "Broker replicator event: " << map);
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
-                if      (match<EventQueueDeclare>(schema)) 
doEventQueueDeclare(values);
-                else if (match<EventQueueDelete>(schema)) 
doEventQueueDelete(values);
-                else if (match<EventExchangeDeclare>(schema)) 
doEventExchangeDeclare(values);
-                else if (match<EventExchangeDelete>(schema)) 
doEventExchangeDelete(values);
-                else if (match<EventBind>(schema)) doEventBind(values);
-                else if (match<EventUnbind>(schema)) doEventUnbind(values);
-                else if (match<EventMembersUpdate>(schema)) 
doEventMembersUpdate(values);
+                EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]);
+                EventDispatchMap::iterator i = dispatch.find(key);
+                if (i != dispatch.end()) (this->*(i->second))(values);
             }
         } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == 
QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); 
++i) {
@@ -337,10 +405,8 @@ void BrokerReplicator::doEventQueueDecla
             QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << 
name);
             deleteQueue(name);
         }
-        settings.populate(args, settings.storeSettings);
-        CreateQueueResult result = createQueue(
-            name, values[DURABLE].asBool(), autoDel, args, 
values[ALTEX].asString());
-        assert(result.second);  // Should be created since we destroed the 
previous queue above.
+        replicateQueue(name, values[DURABLE].asBool(), autoDel, args,
+                       values[ALTEX].asString());
     }
 }
 
@@ -447,6 +513,14 @@ void BrokerReplicator::doEventMembersUpd
     haBroker.setMembership(members);
 }
 
+void BrokerReplicator::doEventSubscribe(Variant::Map& values) {
+    // Ignore queue replicator subscriptions.
+    if (QueueReplicator::isReplicatorName(values[DEST].asString())) return;
+    QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]);
+    boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]);
+    if (qr) qr->setSubscribed();
+}
+
 namespace {
 
 // Get the alternate exchange from the exchange field of a queue or exchange 
response.
@@ -484,12 +558,15 @@ void BrokerReplicator::doResponseQueue(V
     }
     framing::FieldTable args;
     qpid::amqp_0_10::translate(argsMap, args);
-    CreateQueueResult result =
-        createQueue(name, values[DURABLE].asBool(), 
values[AUTODELETE].asBool(), args,
-                    getAltExchange(values[ALTEXCHANGE]));
-    // It is normal for the queue to already exist if we are failing over.
-    if (!result.second)
-        QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
+    boost::shared_ptr<QueueReplicator> qr = replicateQueue(
+        name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
+        getAltExchange(values[ALTEXCHANGE]));
+    if (qr) {
+        Variant::Map::const_iterator i = values.find(CONSUMER_COUNT);
+        if (i != values.end() && isIntegerType(i->second.getType())) {
+            if (i->second.asInt64()) qr->setSubscribed();
+        }
+    }
 }
 
 void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -580,7 +657,8 @@ void BrokerReplicator::doResponseHaBroke
     }
 }
 
-void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& 
queue)
+boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
+    const boost::shared_ptr<Queue>& queue)
 {
     if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == 
ALL) {
         boost::shared_ptr<QueueReplicator> qr(
@@ -588,30 +666,22 @@ void BrokerReplicator::startQueueReplica
         if (!broker.getExchanges().registerExchange(qr))
             throw Exception(QPID_MSG("Duplicate queue replicator " << 
qr->getName()));
         qr->activate();
+        return qr;
     }
+    return boost::shared_ptr<QueueReplicator>();
 }
 
-void BrokerReplicator::deactivateQueue(const std::string& queueName) {
-    boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(queueName);
-    if (qr) {
-        qr->deactivate();
-        // QueueReplicator's bridge is now queued for destruction but may not
-        // actually be destroyed.
-        broker.getExchanges().destroy(qr->getName());
-    }
-}
-
-void BrokerReplicator::deactivate(boost::shared_ptr<broker::Queue> q) {
-    deactivateQueue(q->getName());
-}
-
-void BrokerReplicator::deleteQueue(const std::string& name) {
-    deactivateQueue(name);
-    try {
+void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
+    boost::shared_ptr<QueueReplicator> qr(findQueueReplicator(name));
+    if (qr) qr->deactivate();
+    Queue::shared_ptr queue = broker.getQueues().find(name);
+    if (queue) {
+        // Purge before deleting to ensure that we don't reroute any
+        // messages. Any reroutes will be done at the primary and
+        // replicated as normal.
+        if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
         broker.deleteQueue(name, userId, remoteHost);
         QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
-    } catch (const framing::NotFoundException&) {
-        QPID_LOG(debug, logPrefix << "Queue not found for deletion: " << name);
     }
 }
 
@@ -624,7 +694,7 @@ void BrokerReplicator::deleteExchange(co
     }
 }
 
-BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
+boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue(
     const std::string& name,
     bool durable,
     bool autodelete,
@@ -641,14 +711,15 @@ BrokerReplicator::CreateQueueResult Brok
             string(), // Set alternate exchange below
             userId,
             remoteHost);
-    boost::shared_ptr<Queue> queue = result.first;
-    if (!findQueueReplicator(queue->getName())) startQueueReplicator(queue);
-    if (result.second && !alternateExchange.empty()) {
-        alternates.setAlternate(
-            alternateExchange,
-            boost::bind(&Queue::setAlternateExchange, result.first, _1));
+    boost::shared_ptr<QueueReplicator> qr;
+    if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first);
+    if (result.second) {
+        if (!alternateExchange.empty()) {
+            alternates.setAlternate(
+                alternateExchange, boost::bind(&Queue::setAlternateExchange, 
result.first, _1));
+        }
     }
-    return result;
+    return qr;
 }
 
 BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
@@ -720,8 +791,39 @@ void BrokerReplicator::Cleaner::cleanExc
 
 void BrokerReplicator::Cleaner::cleanQueues() {
     for_each(queues.begin(), queues.end(),
-             boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, 
_1));
+             boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, 
_1, true));
     queues.clear();
 }
 
+void BrokerReplicator::autoDeleteCheck(
+    boost::shared_ptr<Exchange> ex, set<string>& result)
+{
+    boost::shared_ptr<QueueReplicator> 
qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+    if (!qr) return;
+    assert(qr);
+    if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
+        if (qr->getQueue()->getSettings().autoDeleteDelay) {
+            // Start the auto-delete timer
+            Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId);
+        }
+        else {
+            // Mark for immediate deletion.
+            result.insert(qr->getQueue()->getName());
+        }
+    }
+}
+
+void BrokerReplicator::disconnected() {
+    QPID_LOG(info, logPrefix << "Disconnected");
+    connection = 0;
+    // Clean up auto-delete queues
+    set<string> deleteQueues;
+    
broker.getExchanges().eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck,
+                                           this, _1, 
boost::ref(deleteQueues)));
+    // Don't purge before deleting, the primary is gone so we need to
+    // reroute the deleted messages.
+    for_each(deleteQueues.begin(), deleteQueues.end(),
+             boost::bind(&BrokerReplicator::deleteQueue, this, _1, false));
+}
+
 }} // namespace broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Thu Oct 11 19:23:30 2012
@@ -40,6 +40,7 @@ class Broker;
 class Link;
 class Bridge;
 class SessionHandler;
+class Connection;
 }
 
 namespace framing {
@@ -83,6 +84,13 @@ class BrokerReplicator : public broker::
     typedef std::pair<boost::shared_ptr<broker::Queue>, bool> 
CreateQueueResult;
     typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> 
CreateExchangeResult;
 
+    typedef std::pair<std::string,std::string> EventKey;
+    typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&);
+    typedef std::map<EventKey, DispatchFunction> EventDispatchMap;
+
+    typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
+
+
     /** Keep track of queues and exchanges that need to be cleaned up. */
     class Cleaner {
       public:
@@ -94,7 +102,7 @@ class BrokerReplicator : public broker::
         // Forget a queue/exchange that does not need cleaning
         void forgetExchange(const std::string& name);
         void forgetQueue(const std::string& name);
-
+        // Clean up queues/exchange that are no longer on primary
         void cleanExchanges();
         void cleanQueues();
 
@@ -110,6 +118,9 @@ class BrokerReplicator : public broker::
     };
   friend class Cleaner;
 
+    class ErrorListener;
+    class ConnectionObserver;
+
     void initializeBridge(broker::Bridge&, broker::SessionHandler&);
 
     void doEventQueueDeclare(types::Variant::Map& values);
@@ -119,6 +130,7 @@ class BrokerReplicator : public broker::
     void doEventBind(types::Variant::Map&);
     void doEventUnbind(types::Variant::Map&);
     void doEventMembersUpdate(types::Variant::Map&);
+    void doEventSubscribe(types::Variant::Map&);
 
     void doResponseQueue(types::Variant::Map& values);
     void doResponseExchange(types::Variant::Map& values);
@@ -126,9 +138,9 @@ class BrokerReplicator : public broker::
     void doResponseHaBroker(types::Variant::Map& values);
 
     QueueReplicatorPtr findQueueReplicator(const std::string& qname);
-    void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+    QueueReplicatorPtr startQueueReplicator(const 
boost::shared_ptr<broker::Queue>&);
 
-    CreateQueueResult createQueue(
+    QueueReplicatorPtr replicateQueue(
         const std::string& name,
         bool durable,
         bool autodelete,
@@ -142,11 +154,13 @@ class BrokerReplicator : public broker::
         const qpid::framing::FieldTable& args,
         const std::string& alternateExchange);
 
-    void deactivateQueue(const std::string& name);
-    void deactivate(boost::shared_ptr<broker::Queue> q);
-    void deleteQueue(const std::string& name);
+    bool deactivate(boost::shared_ptr<broker::Exchange> ex, bool destroy);
+    void deleteQueue(const std::string& name, bool purge=true);
     void deleteExchange(const std::string& name);
 
+    void autoDeleteCheck(boost::shared_ptr<broker::Exchange>, 
std::set<std::string>&);
+    void disconnected();
+
     std::string logPrefix;
     std::string userId, remoteHost;
     ReplicationTest replicationTest;
@@ -159,6 +173,8 @@ class BrokerReplicator : public broker::
     typedef std::set<std::string> StringSet;
     StringSet replicatedExchanges; // exchanges that have been replicated.
     Cleaner cleaner;
+    broker::Connection* connection;
+    EventDispatchMap dispatch;
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Oct 11 19:23:30 2012
@@ -26,6 +26,7 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/broker/SessionHandler.h"
@@ -55,6 +56,10 @@ std::string QueueReplicator::replicatorN
     return QPID_REPLICATOR_ + queueName;
 }
 
+bool QueueReplicator::isReplicatorName(const std::string& name) {
+    return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0;
+}
+
 bool QueueReplicator::isEventKey(const std::string key) {
     const std::string& prefix = QPID_HA_EVENT_PREFIX;
     bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), 
prefix) == 0;
@@ -74,19 +79,33 @@ class QueueReplicator::ErrorListener : p
         QPID_LOG(error, logPrefix << "Execution error: " << msg);
     }
     void detach() {
-        QPID_LOG(error, logPrefix << "Unexpectedly detached.");
+        QPID_LOG(debug, logPrefix << "Session detached");
     }
   private:
     std::string logPrefix;
 };
 
+class QueueReplicator::QueueObserver : public broker::QueueObserver {
+  public:
+    QueueObserver(boost::shared_ptr<QueueReplicator> qr) : queueReplicator(qr) 
{}
+    void enqueued(const Message&) {}
+    void dequeued(const Message&) {}
+    void acquired(const Message&) {}
+    void requeued(const Message&) {}
+    void consumerAdded( const Consumer& ) {}
+    void consumerRemoved( const Consumer& ) {}
+    void destroy() { queueReplicator->deactivate(); }
+  private:
+    boost::shared_ptr<QueueReplicator> queueReplicator;
+};
+
 QueueReplicator::QueueReplicator(HaBroker& hb,
                                  boost::shared_ptr<Queue> q,
                                  boost::shared_ptr<Link> l)
     : Exchange(replicatorName(q->getName()), 0, q->getBroker()),
       haBroker(hb),
       logPrefix("Backup queue "+q->getName()+": "),
-      queue(q), link(l), brokerInfo(hb.getBrokerInfo())
+      queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false)
 {
     args.setString(QPID_REPLICATE, printable(NONE).str());
     Uuid uuid(true);
@@ -118,18 +137,21 @@ void QueueReplicator::activate() {
     bridge = result.first;
     bridge->setErrorListener(
         boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
+    boost::shared_ptr<QueueObserver> observer(new 
QueueObserver(shared_from_this()));
+    queue->addObserver(observer);
 }
 
 QueueReplicator::~QueueReplicator() { deactivate(); }
 
 void QueueReplicator::deactivate() {
-    // destroy the route
+    QPID_LOG(debug, logPrefix << "Deactivated");
     sys::Mutex::ScopedLock l(lock);
-    if (bridge) {
-        bridge->close();
-        bridge.reset();
-        QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
-    }
+    if (bridge) bridge->close();
+    // Need to drop shared pointers to avoid pointer cycles keeping this in 
memory.
+    queue.reset();
+    link.reset();
+    bridge.reset();
+    getBroker()->getExchanges().destroy(getName());
 }
 
 // Called in a broker connection thread when the bridge is created.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Oct 11 19:23:30 2012
@@ -58,6 +58,8 @@ class QueueReplicator : public broker::E
     static const std::string DEQUEUE_EVENT_KEY;
     static const std::string POSITION_EVENT_KEY;
     static std::string replicatorName(const std::string& queueName);
+    static bool isReplicatorName(const std::string&);
+
     /** Test if a string is an event key */
     static bool isEventKey(const std::string key);
 
@@ -77,8 +79,16 @@ class QueueReplicator : public broker::E
     void route(broker::Deliverable&);
     bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, 
const framing::FieldTable* const);
 
+    // Set if the queue has ever been subscribed to, used for auto-delete 
cleanup.
+    void setSubscribed() { subscribed = true; }
+    bool isSubscribed() { return subscribed; }
+
+    boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
+
   private:
     class ErrorListener;
+    class QueueObserver;
+
     void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& 
sessionHandler);
     void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
 
@@ -90,6 +100,7 @@ class QueueReplicator : public broker::E
     boost::shared_ptr<broker::Link> link;
     boost::shared_ptr<broker::Bridge> bridge;
     BrokerInfo brokerInfo;
+    bool subscribed;
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Oct 11 
19:23:30 2012
@@ -377,7 +377,14 @@ bool ReplicatingSubscription::doDispatch
         Mutex::ScopedLock l(lock);
         if (!dequeues.empty()) sendDequeueEvent(l);
     }
-    return ConsumerImpl::doDispatch();
+    try {
+        return ConsumerImpl::doDispatch();
+    }
+    catch (const std::exception& e) {
+        // FIXME aconway 2012-10-05: detect queue deletion, no warning.
+        QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what());
+        return false;
+    }
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Thu Oct 11 19:23:30 2012
@@ -203,8 +203,8 @@ class Popen(subprocess.Popen):
         self.wait()
 
     def kill(self):
-        try:
-            subprocess.Popen.kill(self)
+        self.expect = EXPECT_EXIT_FAIL
+        try: subprocess.Popen.kill(self)
         except AttributeError:          # No terminate method
             try:
                 os.kill( self.pid , signal.SIGKILL)

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Thu Oct 11 19:23:30 2012
@@ -129,6 +129,16 @@ class HaBroker(Broker):
         assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
             self, status, self._status)
 
+    def wait_queue(self, queue, timeout=1):
+        """ Wait for queue to be visible via QMF"""
+        agent = self.agent()
+        assert retry(lambda: agent.getQueue(queue) is not None, 
timeout=timeout)
+
+    def wait_no_queue(self, queue, timeout=1):
+        """ Wait for queue to be invisible via QMF"""
+        agent = self.agent()
+        assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout)
+
     # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
     def qpid_config(self, args):
         assert subprocess.call(
@@ -185,6 +195,9 @@ class HaBroker(Broker):
     def ready(self):
         return Broker.ready(self, client_properties={"qpid.ha-admin":1})
 
+    def kill(self):
+        self._agent = None
+        return Broker.kill(self)
 
 class HaCluster(object):
     _cluster_count = 0
@@ -225,7 +238,6 @@ class HaCluster(object):
 
     def kill(self, i, promote_next=True):
         """Kill broker i, promote broker i+1"""
-        self[i].expect = EXPECT_EXIT_FAIL
         self[i].kill()
         if promote_next: self[(i+1) % len(self)].promote()
 
@@ -254,12 +266,11 @@ class HaCluster(object):
     def __getitem__(self,index): return self._brokers[index]
     def __iter__(self): return self._brokers.__iter__()
 
+
 def wait_address(session, address):
     """Wait for an address to become valid."""
     def check():
-        try:
-            session.sender(address)
-            return True
+        try: session.sender(address); return True
         except NotFound: return False
     assert retry(check), "Timed out waiting for address %s"%(address)
 
@@ -269,3 +280,5 @@ def valid_address(session, address):
         session.receiver(address)
         return True
     except NotFound: return False
+
+

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1397243&r1=1397242&r2=1397243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Oct 11 19:23:30 2012
@@ -652,9 +652,9 @@ acl deny all all
         self.assertRaises(NotFound, s.receiver, ("e2"));
 
 
-    def test_auto_delete_qpid_4285(self):
-        """Regression test for QPID-4285: an auto delete queue gets stuck in
-        a partially deleted state and causes replication errors."""
+    def test_delete_qpid_4285(self):
+        """Regression test for QPID-4285: on deleting a queue it gets stuck in 
a
+        partially deleted state and causes replication errors."""
         cluster = HaCluster(self,2)
         cluster[1].wait_status("ready")
         s = cluster[0].connect().session()
@@ -669,6 +669,72 @@ acl deny all all
         except NotFound: pass
         assert not cluster[1].agent().getQueue("q") # Should not be in QMF
 
+    def alt_setup(self, session, suffix):
+        # Create exchange to use as alternate and a queue bound to it.
+        # altex exchange: acts as alternate exchange
+        
session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix))
+        # altq queue bound to altex, collect re-routed messages.
+        
session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix))
+
+    def test_auto_delete_close(self):
+        """Verify auto-delete queues are deleted on backup if auto-deleted
+        on primary"""
+        cluster=HaCluster(self, 2)
+        p = cluster[0].connect().session()
+        self.alt_setup(p, "1")
+        r = 
p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}",
 capacity=1)
+        s = p.sender("adq1")
+        for m in ["aa","bb","cc"]: s.send(m)
+        p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
+        cluster[1].wait_queue("adq1")
+        cluster[1].wait_queue("adq2")
+        r.close()               # trigger auto-delete of adq1
+        cluster[1].wait_no_queue("adq1")
+        cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
+        cluster[1].wait_queue("adq2")
+
+    def test_auto_delete_crash(self):
+        """Verify auto-delete queues are deleted on backup if the primary 
crashes"""
+        cluster=HaCluster(self, 2)
+        p = cluster[0].connect().session()
+        self.alt_setup(p,"1")
+
+        # adq1 is subscribed so will be auto-deleted.
+        r = 
p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}",
 capacity=1)
+        s = p.sender("adq1")
+        for m in ["aa","bb","cc"]: s.send(m)
+        # adq2 is subscribed after cluster[2] starts.
+        p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
+        # adq3 is never subscribed.
+        p.sender("adq3;{create:always,node:{x-declare:{auto-delete:True}}}")
+
+        cluster.start()
+        cluster[2].wait_status("ready")
+
+        p.receiver("adq2")      # Subscribed after cluster[2] joined
+
+        for q in ["adq1","adq2","adq3","altq1"]: cluster[1].wait_queue(q)
+        for q in ["adq1","adq2","adq3","altq1"]: cluster[2].wait_queue(q)
+        cluster[0].kill()
+
+        cluster[1].wait_no_queue("adq1")
+        cluster[1].wait_no_queue("adq2")
+        cluster[1].wait_queue("adq3")
+
+        cluster[2].wait_no_queue("adq1")
+        cluster[2].wait_no_queue("adq2")
+        cluster[2].wait_queue("adq3")
+
+        cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
+        cluster[2].assert_browse_backup("altq1", ["aa","bb","cc"])
+
+    def test_auto_delete_timeout(self):
+        cluster = HaCluster(self, 2)
+        s = 
cluster[0].connect().session().receiver("q;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+        cluster[1].wait_queue("q")
+        cluster[0].kill()
+        cluster[1].wait_queue("q")    # Not timed out yet
+        cluster[1].wait_no_queue("q") # Wait for timeout
 
 def fairshare(msgs, limit, levels):
     """



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to