Author: aconway
Date: Wed Jan 25 22:46:54 2012
New Revision: 1235976

URL: http://svn.apache.org/viewvc?rev=1235976&view=rev
Log:
QPID-3603: Change replication level names, update doc notes.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/design_docs/new-ha-design.txt
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603-2/qpid/cpp/design_docs/new-ha-design.txt
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/design_docs/new-ha-design.txt?rev=1235976&r1=1235975&r2=1235976&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/design_docs/new-ha-design.txt (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/design_docs/new-ha-design.txt Wed Jan 25 
22:46:54 2012
@@ -327,25 +327,49 @@ Notes to seed initial user documentation
 some points mentioned in the doc may not be implemented yet.
 
 ** High Availability Overview
-Explain basic concepts: hot standby, primary/backup, replicated queue/exchange.
-Network topology: backup links, corosync, separate client/cluster networks.
+
+HA is implemented using a 'hot standby' approach. Clients are directed
+to a single "primary" broker. The primary executes client requests and
+also replicates them to one or more "backup" brokers. If the primary
+fails, one of the backups takes over the role of primary carrying on
+from where the primary left off. Clients will fail over to the new
+primary automatically and continue their work.
+
+TODO: at least once, deduplication.
+
+** Enabling replication on the client.
+
+To enable replication set the qpid.replicate argument when creating a
+queue or exchange.
+
+This can have one of 3 values
+- none: the object is not replicated
+- configuration: queues, exchanges and bindings are replicated but messages 
are not.
+- messages: configuration and messages are replicated.
+
+TODO: examples
+TODO: more options for default value of qpid.replicate
+
+A HA client connection has multiple addresses, one for each broker. If
+the it fails to connect to an address, or the connection breaks,
+it will automatically fail-over to another address.
+
+Only the primary broker accepts connections, the backup brokers abort
+connection attempts. That ensures clients connect to the primary only.
+
+TODO: using multiple-address connections, examples c++, python, java.
+
+TODO: dynamic cluster addressing?
+
+TODO: need de-duplication.
+
+** Enabling replication on the broker.
+
+Network topology: backup links, separate client/broker networks.
 Describe failover mechanisms.
 - Client view: URLs, failover, exclusion & discovery.
 - Broker view: similar.
-Role of rmganager & corosync.
-
-** Client view.
-Clients use multi-address URL in base case.
-Clients can't connect to backups, retry till they find primary.
-Only qpid.cluster-admin can connect to backup, must not mess with replicated 
queues.
-Note connection known-hosts returns client URL, as does amq.failover exchange.
-
-Creating replicated queues & exchanges:
-- qpid.replicate argument,
-- examples using addressing and qpid-config)
-
-** Configuring corosync
-Must be on same network as backup links.
+Role of rmganager
 
 ** Configuring rgmanager
 

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1235976&r1=1235975&r2=1235976&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Wed Jan 
25 22:46:54 2012
@@ -55,7 +55,7 @@ using namespace broker;
 
 namespace {
 
-const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
 const string QPID_REPLICATE("qpid.replicate");
 
 const string CLASS_NAME("_class_name");
@@ -113,16 +113,16 @@ template <class T> bool match(Variant::M
     return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
 }
 
-enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL };
+enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES };
 const string S_NONE="none";
-const string S_WIRING="wiring";
-const string S_ALL="all";
+const string S_CONFIGURATION="configuration";
+const string S_MESSAGES="messages";
 
 ReplicateLevel replicateLevel(const string& level) {
-    ReplicateLevel rl = RL_NONE;
-    if (level == S_WIRING) rl = RL_WIRING;
-    else if (level == S_ALL) rl = RL_ALL;
-    return rl;
+    if (level == S_NONE) return RL_NONE;
+    if (level == S_CONFIGURATION) return RL_CONFIGURATION;
+    if (level == S_MESSAGES) return RL_MESSAGES;
+    throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
 }
 
 ReplicateLevel replicateLevel(const framing::FieldTable& f) {
@@ -184,15 +184,15 @@ Variant::Map asMapVoid(const Variant& va
 BrokerReplicator::~BrokerReplicator() {}
 
 BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
-    : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
+    : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l)
 {
     QPID_LOG(info, "HA: Backup replicating from " <<
              link->getTransport() << ":" << link->getHost() << ":" << 
link->getPort());
     broker.getLinks().declare(
         link->getHost(), link->getPort(),
         false,              // durable
-        QPID_WIRING_REPLICATOR, // src
-        QPID_WIRING_REPLICATOR, // dest
+        QPID_CONFIGURATION_REPLICATOR, // src
+        QPID_CONFIGURATION_REPLICATOR, // dest
         "",                 // key
         false,              // isQueue
         false,              // isLocal
@@ -222,7 +222,7 @@ void BrokerReplicator::initializeBridge(
     sendQuery(QUEUE, queueName, sessionHandler);
     sendQuery(EXCHANGE, queueName, sessionHandler);
     sendQuery(BINDING, queueName, sessionHandler);
-    QPID_LOG(debug, "HA: Backup activated wiring bridge: " << queueName);
+    QPID_LOG(debug, "HA: Backup activated configuration bridge: " << 
queueName);
 }
 
 // FIXME aconway 2011-12-02: error handling in route.
@@ -481,7 +481,7 @@ void BrokerReplicator::doResponseBind(Va
 }
 
 void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& 
queue) {
-    if (replicateLevel(queue->getSettings()) == RL_ALL) {
+    if (replicateLevel(queue->getSettings()) == RL_MESSAGES) {
         boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, 
link));
         broker.getExchanges().registerExchange(qr);
         qr->activate();
@@ -492,6 +492,6 @@ bool BrokerReplicator::bind(boost::share
 bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const 
framing::FieldTable*) { return false; }
 bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, 
const framing::FieldTable* const) { return false; }
 
-string BrokerReplicator::getType() const { return QPID_WIRING_REPLICATOR; }
+string BrokerReplicator::getType() const { return 
QPID_CONFIGURATION_REPLICATOR; }
 
 }} // namespace broker

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1235976&r1=1235975&r2=1235976&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/BrokerReplicator.h Wed Jan 
25 22:46:54 2012
@@ -38,7 +38,7 @@ class SessionHandler;
 namespace ha {
 
 /**
- * Replicate wiring on a backup broker.
+ * Replicate configuration on a backup broker.
  *
  * Implemented as an exchange that subscribes to receive QMF
  * configuration events from the primary. It configures local queues

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1235976&r1=1235975&r2=1235976&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Wed Jan 25 
22:46:54 2012
@@ -39,7 +39,7 @@ class ShortTests(BrokerTest):
                                   ] + args,
                       **kwargs)
 
-    # FIXME aconway 2011-11-15: work around async wiring replication.
+    # FIXME aconway 2011-11-15: work around async configuration replication.
     # Wait for an address to become valid.
     def wait(self, session, address):
         def check():
@@ -70,7 +70,7 @@ class ShortTests(BrokerTest):
         return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
 
     def test_replication(self):
-        """Test basic replication of wiring and messages before and
+        """Test basic replication of configuration and messages before and
         after backup has connected"""
 
         def queue(name, replicate):
@@ -80,30 +80,30 @@ class ShortTests(BrokerTest):
             
return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s},
 type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, 
name, bindq)
         def setup(p, prefix, primary):
             """Create config, send messages on the primary p"""
-            s = p.sender(queue(prefix+"q1", "all"))
+            s = p.sender(queue(prefix+"q1", "messages"))
             for m in ["a", "b", "1"]: s.send(Message(m))
             # Test replication of dequeue
             self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, 
"a")
             p.acknowledge()
-            p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
+            p.sender(queue(prefix+"q2", "configuration")).send(Message("2"))
             p.sender(queue(prefix+"q3", "none")).send(Message("3"))
-            p.sender(exchange(prefix+"e1", "all", 
prefix+"q1")).send(Message("4"))
-            p.sender(exchange(prefix+"e2", "all", 
prefix+"q2")).send(Message("5"))
+            p.sender(exchange(prefix+"e1", "messages", 
prefix+"q1")).send(Message("4"))
+            p.sender(exchange(prefix+"e2", "messages", 
prefix+"q2")).send(Message("5"))
             # Test  unbind
-            p.sender(queue(prefix+"q4", "all")).send(Message("6"))
-            s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4"))
+            p.sender(queue(prefix+"q4", "messages")).send(Message("6"))
+            s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4"))
             s3.send(Message("7"))
             # Use old connection to unbind
             us = primary.connect_old().session(str(qpid.datatypes.uuid4()))
             us.exchange_unbind(exchange=prefix+"e4", binding_key="", 
queue=prefix+"q4")
             p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
             # FIXME aconway 2011-11-24: need a marker so we can wait till sync 
is done.
-            p.sender(queue(prefix+"x", "wiring"))
+            p.sender(queue(prefix+"x", "configuration"))
 
         def verify(b, prefix, p):
             """Verify setup was replicated to backup b"""
 
-            # FIXME aconway 2011-11-21: wait for wiring to replicate.
+            # FIXME aconway 2011-11-21: wait for configuration to replicate.
             self.wait(b, prefix+"x");
             # FIXME aconway 2011-11-24: assert_browse_retry to deal with async 
replication.
             self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
@@ -112,11 +112,11 @@ class ShortTests(BrokerTest):
             p.acknowledge()
             self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
 
-            self.assert_browse_retry(b, prefix+"q2", []) # wiring only
+            self.assert_browse_retry(b, prefix+"q2", []) # configuration only
             self.assert_missing(b, prefix+"q3")
             b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds 
with replicate=all
             self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
-            b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds 
with replicate=wiring
+            b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds 
with replicate=configuration
             self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
 
             b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
@@ -136,7 +136,7 @@ class ShortTests(BrokerTest):
         verify(b, "1", p)
         verify(b, "2", p)
         # Test a series of messages, enqueue all then dequeue all.
-        s = p.sender(queue("foo","all"))
+        s = p.sender(queue("foo","messages"))
         self.wait(b, "foo")
         msgs = [str(i) for i in range(10)]
         for m in msgs: s.send(Message(m))
@@ -158,7 +158,7 @@ class ShortTests(BrokerTest):
             self.assert_browse_retry(p, "foo", msgs[i+1:])
             self.assert_browse_retry(b, "foo", msgs[i+1:])
 
-    def qpid_replicate(self, value="all"):
+    def qpid_replicate(self, value="messages"):
         return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
 
     def test_sync(self):
@@ -166,7 +166,7 @@ class ShortTests(BrokerTest):
             return "%s;{create:always,%s}"%(name, 
self.qpid_replicate(replicate))
         primary = self.ha_broker(name="primary", broker_url="primary") # Temp 
hack to identify primary
         p = primary.connect().session()
-        s = p.sender(queue("q","all"))
+        s = p.sender(queue("q","messages"))
         for m in [str(i) for i in range(0,10)]: s.send(m)
         s.sync()
         backup1 = self.ha_broker(name="backup1", 
broker_url=primary.host_port())
@@ -192,14 +192,14 @@ class ShortTests(BrokerTest):
         sender = self.popen(
             ["qpid-send",
              "--broker", primary.host_port(),
-             "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")),
+             "--address", 
"q;{create:always,%s}"%(self.qpid_replicate("messages")),
              "--messages=1000",
              "--content-string=x"
              ])
         receiver = self.popen(
             ["qpid-receive",
              "--broker", primary.host_port(),
-             "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")),
+             "--address", 
"q;{create:always,%s}"%(self.qpid_replicate("messages")),
              "--messages=990",
              "--timeout=10"
              ])



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to