Author: aconway
Date: Thu Jan 19 23:09:26 2012
New Revision: 1233689

URL: http://svn.apache.org/viewvc?rev=1233689&view=rev
Log:
QPID-3603: Replicate unbind events.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1233689&r1=1233688&r2=1233689&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Jan 
19 23:09:26 2012
@@ -30,6 +30,7 @@
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
 #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -41,6 +42,7 @@ namespace qpid {
 namespace ha {
 
 using qmf::org::apache::qpid::broker::EventBind;
+using qmf::org::apache::qpid::broker::EventUnbind;
 using qmf::org::apache::qpid::broker::EventExchangeDeclare;
 using qmf::org::apache::qpid::broker::EventExchangeDelete;
 using qmf::org::apache::qpid::broker::EventQueueDeclare;
@@ -70,6 +72,7 @@ const string ARGUMENTS("arguments");
 const string AUTODEL("autoDel");
 const string AUTODELETE("autoDelete");
 const string BIND("bind");
+const string UNBIND("unbind");
 const string BINDING("binding");
 const string CREATED("created");
 const string DISP("disp");
@@ -171,6 +174,12 @@ void sendQuery(const string className, c
     sessionHandler.out->handle(header);
     sessionHandler.out->handle(content);
 }
+
+void translate(const Variant& value, framing::FieldTable& outArgs) {
+    if (!value.isVoid())
+        amqp_0_10::translate(value.asMap(), outArgs);
+}
+
 } // namespace
 
 BrokerReplicator::~BrokerReplicator() {}
@@ -237,21 +246,19 @@ void BrokerReplicator::route(Deliverable
                 else if (match<EventExchangeDeclare>(schema)) 
doEventExchangeDeclare(values);
                 else if (match<EventExchangeDelete>(schema)) 
doEventExchangeDelete(values);
                 else if (match<EventBind>(schema)) doEventBind(values);
-                // FIXME aconway 2011-11-21: handle unbind & all other 
relevant events.
+                else if (match<EventUnbind>(schema)) doEventUnbind(values);
             }
         } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); 
++i) {
                 string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
                 Variant::Map& values = i->asMap()[VALUES].asMap();
                 framing::FieldTable args;
-                amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+                translate(values[ARGUMENTS].asMap(), args);
                 if      (type == QUEUE) doResponseQueue(values);
                 else if (type == EXCHANGE) doResponseExchange(values);
                 else if (type == BINDING) doResponseBind(values);
-                else QPID_LOG(error, "HA: Backup received unknown response: 
type=" << type
+                else QPID_LOG(error, "HA: Backup received unknown response 
type=" << type
                               << " values=" << values);
-
-                // FIXME aconway 2011-12-06: handle all relevant response 
types.
             }
         } else QPID_LOG(error, "HA: Backup received unexpected message: " << 
*headers);
     } catch (const std::exception& e) {
@@ -264,7 +271,7 @@ void BrokerReplicator::doEventQueueDecla
     Variant::Map argsMap = values[ARGS].asMap();
     if (values[DISP] == CREATED && replicateLevel(argsMap)) {
         framing::FieldTable args;
-        amqp_0_10::translate(argsMap, args);
+        translate(argsMap, args);
         std::pair<boost::shared_ptr<Queue>, bool> result =
             broker.createQueue(
                 name,
@@ -312,7 +319,7 @@ void BrokerReplicator::doEventExchangeDe
     if (values[DISP] == CREATED && replicateLevel(argsMap)) {
         string name = values[EXNAME].asString();
         framing::FieldTable args;
-        amqp_0_10::translate(argsMap, args);
+        translate(argsMap, args);
         if (broker.createExchange(
                 name,
                 values[EXTYPE].asString(),
@@ -356,7 +363,7 @@ void BrokerReplicator::doEventBind(Varia
         queue && replicateLevel(queue->getSettings()))
     {
         framing::FieldTable args;
-        amqp_0_10::translate(values[ARGS].asMap(), args);
+        translate(values[ARGS].asMap(), args);
         string key = values[KEY].asString();
         QPID_LOG(debug, "HA: Backup replicated binding exchange=" << 
exchange->getName()
                  << " queue=" << queue->getName()
@@ -365,12 +372,32 @@ void BrokerReplicator::doEventBind(Varia
     }
 }
 
+void BrokerReplicator::doEventUnbind(Variant::Map& values) {
+    boost::shared_ptr<Exchange> exchange =
+        broker.getExchanges().find(values[EXNAME].asString());
+    boost::shared_ptr<Queue> queue =
+        broker.getQueues().find(values[QNAME].asString());
+    // We only replicate unbinds for a replicated queue to replicated
+    // exchange that both exist locally.
+    if (exchange && replicateLevel(exchange->getArgs()) &&
+        queue && replicateLevel(queue->getSettings()))
+    {
+        framing::FieldTable args;
+        translate(values[ARGS].asMap(), args);
+        string key = values[KEY].asString();
+        QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << 
exchange->getName()
+                 << " queue=" << queue->getName()
+                 << " key=" << key);
+        exchange->unbind(queue, key, &args);
+    }
+}
+
 void BrokerReplicator::doResponseQueue(Variant::Map& values) {
     // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate 
replication
     Variant::Map argsMap(values[ARGUMENTS].asMap());
     if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
-    amqp_0_10::translate(argsMap, args);
+    translate(argsMap, args);
     string name(values[NAME].asString());
     std::pair<boost::shared_ptr<Queue>, bool> result =
         broker.createQueue(
@@ -396,7 +423,7 @@ void BrokerReplicator::doResponseExchang
     Variant::Map argsMap(values[ARGUMENTS].asMap());
     if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
-    amqp_0_10::translate(argsMap, args);
+    translate(argsMap, args);
     if (broker.createExchange(
             values[NAME].asString(),
             values[TYPE].asString(),
@@ -445,7 +472,7 @@ void BrokerReplicator::doResponseBind(Va
         queue && replicateLevel(queue->getSettings()))
     {
         framing::FieldTable args;
-        amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+        translate(values[ARGUMENTS].asMap(), args);
         string key = values[KEY].asString();
         exchange->bind(queue, key, &args);
         QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << 
exchange->getName()

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1233689&r1=1233688&r2=1233689&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.h Thu Jan 
19 23:09:26 2012
@@ -63,14 +63,18 @@ class BrokerReplicator : public broker::
 
   private:
     void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+
     void doEventQueueDeclare(types::Variant::Map& values);
     void doEventQueueDelete(types::Variant::Map& values);
     void doEventExchangeDeclare(types::Variant::Map& values);
     void doEventExchangeDelete(types::Variant::Map& values);
     void doEventBind(types::Variant::Map&);
+    void doEventUnbind(types::Variant::Map&);
+
     void doResponseQueue(types::Variant::Map& values);
     void doResponseExchange(types::Variant::Map& values);
     void doResponseBind(types::Variant::Map& values);
+
     void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
 
     broker::Broker& broker;

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233689&r1=1233688&r2=1233689&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19 
23:09:26 2012
@@ -55,12 +55,15 @@ class ShortTests(BrokerTest):
         except NotFound: pass
 
     def test_replication(self):
+        """Test basic replication of wiring and messages before and
+        after backup has connected"""
+
         def queue(name, replicate):
             return 
"%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, 
replicate)
 
         def exchange(name, replicate, bindq):
             
return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s},
 type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, 
name, bindq)
-        def setup(p, prefix):
+        def setup(p, prefix, primary):
             """Create config, send messages on the primary p"""
             s = p.sender(queue(prefix+"q1", "all"))
             for m in ["a", "b", "1"]: s.send(Message(m))
@@ -71,6 +74,14 @@ class ShortTests(BrokerTest):
             p.sender(queue(prefix+"q3", "none")).send(Message("3"))
             p.sender(exchange(prefix+"e1", "all", 
prefix+"q1")).send(Message("4"))
             p.sender(exchange(prefix+"e2", "all", 
prefix+"q2")).send(Message("5"))
+            # Test  unbind
+            p.sender(queue(prefix+"q4", "all")).send(Message("6"))
+            s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4"))
+            s3.send(Message("7"))
+            # Use old connection to unbind
+            us = primary.connect_old().session(str(qpid.datatypes.uuid4()))
+            us.exchange_unbind(exchange=prefix+"e4", binding_key="", 
queue=prefix+"q4")
+            p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
             # FIXME aconway 2011-11-24: need a marker so we can wait till sync 
is done.
             p.sender(queue(prefix+"x", "wiring"))
 
@@ -93,13 +104,16 @@ class ShortTests(BrokerTest):
             b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds 
with replicate=wiring
             self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
 
+            b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
+            self.assert_browse_retry(b, prefix+"q4", ["6","7"])
+
         primary = self.ha_broker(name="primary", broker_url="primary") # Temp 
hack to identify primary
         p = primary.connect().session()
         # Create config, send messages before starting the backup, to test 
catch-up replication.
-        setup(p, "1")
+        setup(p, "1", primary)
         backup  = self.ha_broker(name="backup", broker_url=primary.host_port())
         # Create config, send messages after starting the backup, to test 
steady-state replication.
-        setup(p, "2")
+        setup(p, "2", primary)
 
         # Verify the data on the backup
         b = backup.connect().session()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to