Author: aconway
Date: Wed Oct 24 14:33:25 2012
New Revision: 1401711

URL: http://svn.apache.org/viewvc?rev=1401711&view=rev
Log:
Bug:868364 - QPID-4391: HA ignore stale responses.

Related issue discovered while fixing this bug:

The BrokerReplicater pulls management events and query responses from different
queues, there is no co-ordination between them. If a response is processed late,
after create and delete events, it will incorrectly re-create the deleted queue.

This patch ignores responses if we have already seen an event for the queue or
exchange.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1401711&r1=1401710&r2=1401711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Wed Oct 24 14:33:25 
2012
@@ -214,17 +214,75 @@ class BrokerReplicator::ConnectionObserv
     BrokerReplicator& brokerReplicator;
 };
 
-template <class E> pair<string,string> eventKey() {
+/** Keep track of queues or exchanges during the update process to solve 2
+ * problems.
+ *
+ * 1. Once all responses are processed, remove any queues/exchanges
+ * that were not mentioned as they no longer exist on the primary.
+ *
+ * 2. During the update if we see an event for an object we should
+ * ignore any subsequent responses for that object as they are out
+ * of date.
+ */
+class BrokerReplicator::UpdateTracker {
+  public:
+    typedef std::set<std::string> Names;
+    typedef boost::function<void (const std::string&)> CleanFn;
+
+    UpdateTracker(CleanFn f, const ReplicationTest& rt) : cleanFn(f), 
repTest(rt) {}
+
+    /** Destructor cleans up remaining initial queues. */
+    ~UpdateTracker() {
+        // Don't throw in a destructor.
+        try { for_each(initial.begin(), initial.end(), cleanFn); }
+        catch (const std::exception& e) {
+            QPID_LOG(error, "Error in cleanup of lost objects: " << e.what());
+        }
+    }
+
+    /** Add an exchange name */
+    void addExchange(Exchange::shared_ptr ex)  {
+        if (repTest.isReplicated(CONFIGURATION, *ex)) 
initial.insert(ex->getName());
+    }
+
+    /** Add a queue name. */
+    void addQueue(Queue::shared_ptr q) {
+        if (repTest.isReplicated(CONFIGURATION, *q)) 
initial.insert(q->getName());
+    }
+
+    /** Received an event for name */
+    void event(const std::string& name) {
+        initial.erase(name); // no longer a candidate for deleting
+        events.insert(name); // we have seen an event for this name
+    }
+
+    /** Received a response for name.
+     *@return true if this response should be processed, false if we have
+     *already seen an event for this object.
+     */
+    bool response(const std::string& name) {
+        initial.erase(name); // no longer a candidate for deleting
+        return events.find(name) == events.end(); // true if no event seen yet.
+    }
+
+  private:
+    Names initial, events;
+    CleanFn cleanFn;
+    ReplicationTest repTest;
+};
+
+template <class E> BrokerReplicator::EventKey eventKey() {
     return make_pair(E::PACKAGE_NAME, E::EVENT_NAME);
 }
 
 BrokerReplicator::BrokerReplicator(HaBroker& hb, const 
boost::shared_ptr<Link>& l)
     : Exchange(QPID_CONFIGURATION_REPLICATOR),
       logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
-      haBroker(hb), broker(hb.getBroker()), link(l),
+      haBroker(hb), broker(hb.getBroker()),
+      exchanges(broker.getExchanges()), queues(broker.getQueues()),
+      link(l),
       initialized(false),
       alternates(hb.getBroker().getExchanges()),
-      cleaner(*this),
       connection(0)
 {
     broker.getConnectionObservers().add(
@@ -298,9 +356,16 @@ void BrokerReplicator::initializeBridge(
              << " status:" << printable(haBroker.getStatus()));
     initialized = true;
 
-    // Scan for existing replicated queues and exchanges. Any that have not 
been seen by
-    // the time all reponses are received will be cleaned up.
-    cleaner.start();
+    exchangeTracker.reset(
+        new UpdateTracker(boost::bind(&BrokerReplicator::deleteExchange, this, 
_1),
+                          replicationTest));
+    exchanges.eachExchange(
+        boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
+
+    queueTracker.reset(
+        new UpdateTracker(boost::bind(&BrokerReplicator::deleteQueue, this, 
_1, true),
+                          replicationTest));
+    queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), 
_1));
 
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& 
args(bridge.getArgs());
@@ -369,12 +434,12 @@ void BrokerReplicator::route(Deliverable
             }
             if (MessageTransfer::isLastQMFResponse(msg.getMessage(), 
EXCHANGE)) {
                 QPID_LOG(debug, logPrefix << "All exchange responses 
received.")
-                cleaner.cleanExchanges(); // Clean up exchanges that no longer 
exist in the primary
+                exchangeTracker.reset(); // Clean up exchanges that no longer 
exist in the primary
                 alternates.clear();
             }
             if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
-                QPID_LOG(debug, logPrefix << "All queue responses received.")
-                cleaner.cleanQueues(); // Clean up queues that no longer exist 
in the primary
+                QPID_LOG(debug, logPrefix << "All queue responses received.");
+                queueTracker.reset(); // Clean up queues that no longer exist 
in the primary
             }
         }
     } catch (const std::exception& e) {
@@ -394,12 +459,12 @@ void BrokerReplicator::doEventQueueDecla
         string name = values[QNAME].asString();
         QueueSettings settings(values[DURABLE].asBool(), 
values[AUTODEL].asBool());
         QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
-        cleaner.forgetQueue(name);
+        if (queueTracker.get()) queueTracker->event(name);
         framing::FieldTable args;
         qpid::amqp_0_10::translate(argsMap, args);
         // If we already have a queue with this name, replace it.
         // The queue was definitely created on the primary.
-        if (broker.getQueues().find(name)) {
+        if (queues.find(name)) {
             QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << 
name);
             deleteQueue(name);
         }
@@ -412,7 +477,7 @@ boost::shared_ptr<QueueReplicator> Broke
     const std::string& qname)
 {
     string rname = QueueReplicator::replicatorName(qname);
-    boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
+    boost::shared_ptr<broker::Exchange> ex = exchanges.find(rname);
     return boost::dynamic_pointer_cast<QueueReplicator>(ex);
 }
 
@@ -420,10 +485,10 @@ void BrokerReplicator::doEventQueueDelet
     // The remote queue has already been deleted so replicator
     // sessions may be closed by a "queue deleted" exception.
     string name = values[QNAME].asString();
-    boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
+    boost::shared_ptr<Queue> queue = queues.find(name);
     if (queue && 
replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
         QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
-        cleaner.forgetQueue(name);
+        if (queueTracker.get()) queueTracker->event(name);
         deleteQueue(name);
     }
 }
@@ -434,12 +499,12 @@ void BrokerReplicator::doEventExchangeDe
     if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
         string name = values[EXNAME].asString();
         QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
-        cleaner.forgetExchange(name);
+        if (exchangeTracker.get()) exchangeTracker->event(name);
         framing::FieldTable args;
         qpid::amqp_0_10::translate(argsMap, args);
         // If we already have a exchange with this name, replace it.
         // The exchange was definitely created on the primary.
-        if (broker.getExchanges().find(name)) {
+        if (exchanges.find(name)) {
             deleteExchange(name);
             QPID_LOG(warning, logPrefix << "Replaced existing exchange: " << 
name);
         }
@@ -453,14 +518,14 @@ void BrokerReplicator::doEventExchangeDe
 
 void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
     string name = values[EXNAME].asString();
-    boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+    boost::shared_ptr<Exchange> exchange = exchanges.find(name);
     if (!exchange) {
         QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << 
name);
     } else if (!replicationTest.replicateLevel(exchange->getArgs())) {
         QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: 
" << name);
     } else {
         QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
-        cleaner.forgetExchange(name);
+        if (exchangeTracker.get()) exchangeTracker->event(name);
         deleteExchange(name);
         replicatedExchanges.erase(name);
     }
@@ -468,9 +533,9 @@ void BrokerReplicator::doEventExchangeDe
 
 void BrokerReplicator::doEventBind(Variant::Map& values) {
     boost::shared_ptr<Exchange> exchange =
-        broker.getExchanges().find(values[EXNAME].asString());
+        exchanges.find(values[EXNAME].asString());
     boost::shared_ptr<Queue> queue =
-        broker.getQueues().find(values[QNAME].asString());
+        queues.find(values[QNAME].asString());
     // We only replicate binds for a replicated queue to replicated
     // exchange that both exist locally.
     if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
@@ -488,9 +553,9 @@ void BrokerReplicator::doEventBind(Varia
 
 void BrokerReplicator::doEventUnbind(Variant::Map& values) {
     boost::shared_ptr<Exchange> exchange =
-        broker.getExchanges().find(values[EXNAME].asString());
+        exchanges.find(values[EXNAME].asString());
     boost::shared_ptr<Queue> queue =
-        broker.getQueues().find(values[QNAME].asString());
+        queues.find(values[QNAME].asString());
     // We only replicate unbinds for a replicated queue to replicated
     // exchange that both exist locally.
     if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
@@ -544,7 +609,9 @@ void BrokerReplicator::doResponseQueue(V
             values[EXCLUSIVE].asBool()))
         return;
     string name(values[NAME].asString());
-    cleaner.forgetQueue(name);
+    if (!queueTracker.get())
+        throw Exception(QPID_MSG("Unexpected queue response: " << values));
+    if (!queueTracker->response(name)) return; // Response is out-of-date
     QPID_LOG(debug, logPrefix << "Queue response: " << name);
     if (broker.getQueues().find(name)) { // Already exists
         if (findQueueReplicator(name))
@@ -571,7 +638,9 @@ void BrokerReplicator::doResponseExchang
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
     if (!replicationTest.replicateLevel(argsMap)) return;
     string name = values[NAME].asString();
-    cleaner.forgetExchange(name);
+    if (!exchangeTracker.get())
+        throw Exception(QPID_MSG("Unexpected exchange response: " << values));
+    if (!exchangeTracker->response(name)) return; // Response is out of date.
     QPID_LOG(debug, logPrefix << "Exchange response: " << name);
     if (broker.getExchanges().find(name)) {
         if (replicatedExchanges.find(name) != replicatedExchanges.end())
@@ -615,8 +684,8 @@ const std::string QUEUE_REF("queueRef");
 void BrokerReplicator::doResponseBind(Variant::Map& values) {
     std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
     std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
-    boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
-    boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+    boost::shared_ptr<Exchange> exchange = exchanges.find(exName);
+    boost::shared_ptr<Queue> queue = queues.find(qName);
 
     // Automatically replicate binding if queue and exchange exist and are 
replicated
     if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
@@ -661,7 +730,7 @@ boost::shared_ptr<QueueReplicator> Broke
     if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == 
ALL) {
         boost::shared_ptr<QueueReplicator> qr(
             new QueueReplicator(haBroker, queue, link));
-        if (!broker.getExchanges().registerExchange(qr))
+        if (!exchanges.registerExchange(qr))
             throw Exception(QPID_MSG("Duplicate queue replicator " << 
qr->getName()));
         qr->activate();
         return qr;
@@ -670,7 +739,7 @@ boost::shared_ptr<QueueReplicator> Broke
 }
 
 void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
-    Queue::shared_ptr queue = broker.getQueues().find(name);
+    Queue::shared_ptr queue = queues.find(name);
     if (queue) {
         // Purge before deleting to ensure that we don't reroute any
         // messages. Any reroutes will be done at the primary and
@@ -749,48 +818,6 @@ bool BrokerReplicator::isBound(boost::sh
 
 string BrokerReplicator::getType() const { return 
QPID_CONFIGURATION_REPLICATOR; }
 
-
-// BrokerReplicator::Cleaner
-
-BrokerReplicator::Cleaner::Cleaner(BrokerReplicator& br) : 
brokerReplicator(br) {}
-
-void BrokerReplicator::Cleaner::start() {
-    queues.clear();
-    exchanges.clear();
-    
brokerReplicator.broker.getExchanges().eachExchange(boost::bind(&Cleaner::addExchange,
 this, _1));
-    
brokerReplicator.broker.getQueues().eachQueue(boost::bind(&Cleaner::addQueue, 
this, _1));
-}
-
-void 
BrokerReplicator::Cleaner::addExchange(boost::shared_ptr<broker::Exchange> ex) {
-    if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *ex))
-        exchanges.insert(ex->getName());
-}
-
-void BrokerReplicator::Cleaner::addQueue(boost::shared_ptr<broker::Queue> q) {
-    if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *q))
-        queues.insert(q->getName());
-}
-
-void BrokerReplicator::Cleaner::forgetExchange(const std::string& name) {
-    exchanges.erase(name);
-}
-
-void BrokerReplicator::Cleaner::forgetQueue(const std::string& name) {
-    queues.erase(name);
-}
-
-void BrokerReplicator::Cleaner::cleanExchanges() {
-    for_each(exchanges.begin(), exchanges.end(),
-             boost::bind(&BrokerReplicator::deleteExchange, &brokerReplicator, 
_1));
-    exchanges.clear();
-}
-
-void BrokerReplicator::Cleaner::cleanQueues() {
-    for_each(queues.begin(), queues.end(),
-             boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, 
_1, true));
-    queues.clear();
-}
-
 void BrokerReplicator::autoDeleteCheck(
     boost::shared_ptr<Exchange> ex, set<string>& result)
 {
@@ -814,7 +841,7 @@ void BrokerReplicator::disconnected() {
     connection = 0;
     // Clean up auto-delete queues
     set<string> deleteQueues;
-    
broker.getExchanges().eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck,
+    exchanges.eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck,
                                            this, _1, 
boost::ref(deleteQueues)));
     // Don't purge before deleting, the primary is gone so we need to
     // reroute the deleted messages.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1401711&r1=1401710&r2=1401711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Wed Oct 24 14:33:25 2012
@@ -41,6 +41,8 @@ class Link;
 class Bridge;
 class SessionHandler;
 class Connection;
+class QueueRegistry;
+class ExchangeRegistry;
 }
 
 namespace framing {
@@ -90,34 +92,7 @@ class BrokerReplicator : public broker::
 
     typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
 
-
-    /** Keep track of queues and exchanges that need to be cleaned up. */
-    class Cleaner {
-      public:
-        Cleaner(BrokerReplicator&);
-
-        /** Scan for existing queues and exchanges. */
-        void start();
-
-        // Forget a queue/exchange that does not need cleaning
-        void forgetExchange(const std::string& name);
-        void forgetQueue(const std::string& name);
-        // Clean up queues/exchange that are no longer on primary
-        void cleanExchanges();
-        void cleanQueues();
-
-      private:
-        typedef std::set<std::string> Names;
-
-        // add a queue/exchange that may need cleaning.
-        void addExchange(boost::shared_ptr<broker::Exchange>);
-        void addQueue(boost::shared_ptr<broker::Queue>);
-
-        BrokerReplicator& brokerReplicator;
-        Names queues, exchanges;
-    };
-  friend class Cleaner;
-
+    class UpdateTracker;
     class ErrorListener;
     class ConnectionObserver;
 
@@ -166,15 +141,18 @@ class BrokerReplicator : public broker::
     ReplicationTest replicationTest;
     HaBroker& haBroker;
     broker::Broker& broker;
+    broker::ExchangeRegistry& exchanges;
+    broker::QueueRegistry& queues;
     boost::shared_ptr<broker::Link> link;
     bool initialized;
     AlternateExchangeSetter alternates;
     qpid::Address primary;
     typedef std::set<std::string> StringSet;
     StringSet replicatedExchanges; // exchanges that have been replicated.
-    Cleaner cleaner;
     broker::Connection* connection;
     EventDispatchMap dispatch;
+    std::auto_ptr<UpdateTracker> queueTracker;
+    std::auto_ptr<UpdateTracker> exchangeTracker;
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1401711&r1=1401710&r2=1401711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Wed Oct 24 14:33:25 2012
@@ -29,7 +29,21 @@ from logging import getLogger, WARN, ERR
 from qpidtoollibs import BrokerAgent
 from uuid import UUID
 
-class ReplicationTests(BrokerTest):
+log = getLogger(__name__)
+
+def grep(filename, regexp):
+    for line in open(filename).readlines():
+        if (regexp.search(line)): return True
+    return False
+
+class HaBrokerTest(BrokerTest):
+    """Base class for HA broker tests"""
+    def assert_log_no_errors(self, broker):
+        log = broker.get_log()
+        if grep(log, re.compile("] error|] critical")):
+            self.fail("Errors in log file %s"%(log))
+
+class ReplicationTests(HaBrokerTest):
     """Correctness tests for  HA replication."""
 
     def test_replication(self):
@@ -774,6 +788,19 @@ acl deny all all
         cluster.start()
         send_ttl_messages()
 
+    def test_stale_response(self):
+        """Check for race condition where a stale response is processed after 
an
+        event for the same queue/exchange """
+        cluster = HaCluster(self, 2)
+        s = cluster[0].connect().session()
+        s.sender("keep;{create:always}") # Leave this queue in place.
+        for i in xrange(100):            # FIXME aconway 2012-10-23: ??? IS 
this an issue?
+            s.sender("deleteme%s;{create:always,delete:always}"%(i)).close()
+        # It is possible for the backup to attempt to subscribe after the queue
+        # is deleted. This is not an error, but is logged as an error on the 
primary.
+        # The backup does not log this as an error so we only check the backup 
log for errors.
+        self.assert_log_no_errors(cluster[1])
+
 def fairshare(msgs, limit, levels):
     """
     Generator to return prioritised messages in expected order for a given 
fairshare limit
@@ -808,7 +835,7 @@ def priority_level(value, levels):
     offset = 5-math.ceil(levels/2.0)
     return min(max(value - offset, 0), levels-1)
 
-class LongTests(BrokerTest):
+class LongTests(HaBrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
 
     def duration(self):
@@ -891,7 +918,7 @@ class LongTests(BrokerTest):
             if unexpected_dead:
                 raise Exception("Brokers not running: %s"%unexpected_dead)
 
-class RecoveryTests(BrokerTest):
+class RecoveryTests(HaBrokerTest):
     """Tests for recovery after a failure."""
 
     def test_queue_hold(self):



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to