Author: aconway
Date: Fri Jan 27 19:14:07 2012
New Revision: 1236846

URL: http://svn.apache.org/viewvc?rev=1236846&view=rev
Log:
QPID-3603: Get rid of broker_url="primary" hack, promote primaries via 
management.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
    qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-ha-status

Modified: 
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h?rev=1236846&r1=1236845&r2=1236846&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h 
Fri Jan 27 19:14:07 2012
@@ -23,6 +23,9 @@
  */
 
 #include "ConnectionObserver.h"
+#include "qpid/sys/Mutex.h"
+#include <set>
+#include <algorithm>
 
 namespace qpid {
 namespace broker {
@@ -30,12 +33,18 @@ namespace broker {
 /**
  * A collection of connection observers.
  * Calling a ConnectionObserver function will call that function on each 
observer.
+ * THREAD SAFE.
  */
 class ConnectionObservers : public ConnectionObserver {
   public:
-    // functions for managing the collection of observers
     void add(boost::shared_ptr<ConnectionObserver> observer) {
-        observers.push_back(observer);
+        sys::Mutex::ScopedLock l(lock);
+        observers.insert(observer);
+    }
+
+    void remove(boost::shared_ptr<ConnectionObserver> observer) {
+        sys::Mutex::ScopedLock l(lock);
+        observers.erase(observer);
     }
 
     void connection(Connection& c) {
@@ -55,10 +64,12 @@ class ConnectionObservers : public Conne
     }
 
   private:
-    typedef std::vector<boost::shared_ptr<ConnectionObserver> > Observers;
+    typedef std::set<boost::shared_ptr<ConnectionObserver> > Observers;
+    sys::Mutex lock;
     Observers observers;
 
     template <class F> void each(F f) {
+        sys::Mutex::ScopedLock l(lock);
         std::for_each(observers.begin(), observers.end(), f);
     }
 };

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1236846&r1=1236845&r2=1236846&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Fri Jan 27 
19:14:07 2012
@@ -22,6 +22,7 @@
 #include "Settings.h"
 #include "BrokerReplicator.h"
 #include "ReplicatingSubscription.h"
+#include "ConnectionExcluder.h"
 #include "qpid/Url.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/broker/Bridge.h"
@@ -42,11 +43,12 @@ using namespace broker;
 using types::Variant;
 using std::string;
 
-Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
+Backup::Backup(broker::Broker& b, const Settings& s) :
+    broker(b), settings(s), excluder(new ConnectionExcluder())
+{
     Url url(s.brokerUrl);
     string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
 
-    // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
     // Declare the link
     std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
         url[0].host, url[0].port, protocol,
@@ -54,9 +56,16 @@ Backup::Backup(broker::Broker& b, const 
         s.mechanism, s.username, s.password);
     assert(result.second);  // FIXME aconway 2011-11-23: error handling
     link = result.first;
-    boost::shared_ptr<BrokerReplicator> wr(new BrokerReplicator(link));
-    broker.getExchanges().registerExchange(wr);
+
+    replicator.reset(new BrokerReplicator(link));
+    broker.getExchanges().registerExchange(replicator);
+
+    broker.getConnectionObservers().add(excluder);
 }
 
+Backup::~Backup() {
+    broker.getExchanges().destroy(replicator->getName());
+    broker.getConnectionObservers().remove(excluder); // Allows client 
connections.
+}
 
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h?rev=1236846&r1=1236845&r2=1236846&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h Fri Jan 27 19:14:07 
2012
@@ -27,6 +27,7 @@
 #include <boost/shared_ptr.hpp>
 
 namespace qpid {
+
 namespace broker {
 class Broker;
 class Link;
@@ -34,6 +35,8 @@ class Link;
 
 namespace ha {
 class Settings;
+class ConnectionExcluder;
+class BrokerReplicator;
 
 /**
  * State associated with a backup broker. Manages connections to primary.
@@ -45,12 +48,16 @@ class Backup
 {
   public:
     Backup(broker::Broker&, const Settings&);
+    ~Backup();
 
   private:
     broker::Broker& broker;
     Settings settings;
     boost::shared_ptr<broker::Link> link;
+    boost::shared_ptr<BrokerReplicator> replicator;
+    boost::shared_ptr<ConnectionExcluder> excluder;
 };
+
 }} // namespace qpid::ha
 
 #endif  /*!QPID_HA_BACKUP_H*/

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp?rev=1236846&r1=1236845&r2=1236846&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp Fri 
Jan 27 19:14:07 2012
@@ -27,11 +27,10 @@
 namespace qpid {
 namespace ha {
 
-ConnectionExcluder::ConnectionExcluder(PrimaryTest isPrimary_) : 
isPrimary(isPrimary_) {}
+ConnectionExcluder::ConnectionExcluder() {}
 
 void ConnectionExcluder::opened(broker::Connection& connection) {
-    if (!isPrimary() && !connection.isLink()
-        && !connection.getClientProperties().isSet(ADMIN_TAG))
+    if (!connection.isLink() && 
!connection.getClientProperties().isSet(ADMIN_TAG))
         throw Exception(
             QPID_MSG("HA: Backup broker rejected connection " << 
connection.getMgmtId()));
 }

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h?rev=1236846&r1=1236845&r2=1236846&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h Fri Jan 
27 19:14:07 2012
@@ -41,15 +41,12 @@ namespace ha {
 class ConnectionExcluder : public broker::ConnectionObserver
 {
   public:
-    typedef boost::function<bool()> PrimaryTest;
-
-    ConnectionExcluder(PrimaryTest isPrimary_);
+    ConnectionExcluder();
 
     void opened(broker::Connection& connection);
 
   private:
     static const std::string ADMIN_TAG;
-    PrimaryTest isPrimary;
 };
 
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1236846&r1=1236845&r2=1236846&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Jan 27 
19:14:07 2012
@@ -55,22 +55,16 @@ HaBroker::HaBroker(broker::Broker& b, co
       settings(s),
       clientUrl(url(s.clientUrl, "ha-client-url")),
       brokerUrl(url(s.brokerUrl, "ha-broker-url")),
+      backup(new Backup(b, s)),
       mgmtObject(0)
 {
-    // FIXME aconway 2011-11-22: temporary hack to identify primary.
-    bool primary = (settings.brokerUrl == PRIMARY);
-    QPID_LOG(notice, "HA: " << (primary ? "Primary" : "Backup")
-             << " initialized: client-url=" << clientUrl
+    // Note all HA brokers start out in backup mode.
+    QPID_LOG(notice, "HA: Backup initialized: client-url=" << clientUrl
              << " broker-url=" << brokerUrl);
-    if (!primary) backup.reset(new Backup(broker, s));
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(
         boost::shared_ptr<ReplicatingSubscription::Factory>(
             new ReplicatingSubscription::Factory()));
-    // Register a connection excluder
-    broker.getConnectionObservers().add(
-        boost::shared_ptr<broker::ConnectionObserver>(
-            new ConnectionExcluder(boost::bind(&HaBroker::isPrimary, this))));
 
     ManagementAgent* ma = broker.getManagementAgent();
     if (!ma)
@@ -78,8 +72,7 @@ HaBroker::HaBroker(broker::Broker& b, co
     if (ma) {
         _qmf::Package  packageInit(ma);
         mgmtObject = new _qmf::HaBroker(ma, this);
-        // FIXME aconway 2011-11-11: Placeholder - initialize cluster role.
-        mgmtObject->set_status(isPrimary() ? PRIMARY : BACKUP);
+        mgmtObject->set_status(BACKUP);
         ma->addObject(mgmtObject);
     }
 }
@@ -87,21 +80,22 @@ HaBroker::HaBroker(broker::Broker& b, co
 HaBroker::~HaBroker() {}
 
 Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& 
args, string&) {
+    sys::Mutex::ScopedLock l(lock);
     switch (methodId) {
       case _qmf::HaBroker::METHOD_SETSTATUS: {
           std::string status = 
dynamic_cast<_qmf::ArgsHaBrokerSetStatus&>(args).i_status;
           if (status == PRIMARY) {
-              if (!isPrimary()) {
+              if (backup.get()) {
+                  // FIXME aconway 2012-01-26: create primary state before 
resetting backup
+                  // as it allows client connections.
                   backup.reset();
-                  QPID_LOG(notice, "HA Primary: promoted from backup");
+                  QPID_LOG(notice, "HA: Primary promoted from backup");
               }
           } else if (status == BACKUP) {
-              if (isPrimary()) {
-                  backup.reset(new Backup(broker, settings));
-                  QPID_LOG(notice, "HA Backup: demoted from primary.");
-              }
+              if (!backup.get())
+                  throw Exception("HA: Primary cannot be demoted");
           } else {
-              QPID_LOG(error, "Attempt to set invalid HA status: " << status);
+              throw Exception("Invalid HA status: "+status);
           }
           mgmtObject->set_status(status);
           break;
@@ -112,8 +106,4 @@ Manageable::status_t HaBroker::Managemen
     return Manageable::STATUS_OK;
 }
 
-bool HaBroker::isPrimary() const {
-    return !backup.get();       // TODO aconway 2012-01-18: temporary test.
-}
-
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1236846&r1=1236845&r2=1236846&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h Fri Jan 27 
19:14:07 2012
@@ -24,6 +24,7 @@
 
 #include "Settings.h"
 #include "qpid/Url.h"
+#include "qpid/sys/Mutex.h"
 #include "qmf/org/apache/qpid/ha/HaBroker.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h"
 #include "qpid/management/Manageable.h"
@@ -52,8 +53,8 @@ class HaBroker : public management::Mana
     management::Manageable::status_t ManagementMethod (
         uint32_t methodId, management::Args& args, std::string& text);
 
-    bool isPrimary() const;
   private:
+    sys::Mutex lock;
     broker::Broker& broker;
     Settings settings;
     Url clientUrl, brokerUrl;

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py?rev=1236846&r1=1236845&r2=1236846&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py Fri Jan 27 
19:14:07 2012
@@ -513,7 +513,7 @@ class BrokerTest(TestCase):
         finally: r.close()
         return contents
 
-    def assert_browse(self, session, queue, expect_contents, timeout=0, 
transform=lambda d:m.content):
+    def assert_browse(self, session, queue, expect_contents, timeout=0, 
transform=lambda m: m.content):
         """Assert that the contents of messages on queue (as retrieved
         using session and timeout) exactly match the strings in
         expect_contents"""

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1236846&r1=1236845&r2=1236846&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Fri Jan 27 
19:14:07 2012
@@ -56,8 +56,8 @@ class ShortTests(BrokerTest):
         self.wait(bs, address)
         bs.connection.close()
 
-    def set_ha_status(self, address, status):
-        os.system("qpid-ha-status %s %s"%(address, status))
+    def promote(self, broker):
+        os.system("qpid-ha-status %s primary"%(broker.host_port()))
 
     def assert_missing(self, session, address):
         try:
@@ -122,7 +122,8 @@ class ShortTests(BrokerTest):
             b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
             self.assert_browse_retry(b, prefix+"q4", ["6","7"])
 
-        primary = self.ha_broker(name="primary", broker_url="primary") # Temp 
hack to identify primary
+        primary = self.ha_broker(name="primary")
+        self.promote(primary)
         p = primary.connect().session()
 
         # Create config, send messages before starting the backup, to test 
catch-up replication.
@@ -164,7 +165,8 @@ class ShortTests(BrokerTest):
     def test_sync(self):
         def queue(name, replicate):
             return "%s;{create:always,%s}"%(name, 
self.qpid_replicate(replicate))
-        primary = self.ha_broker(name="primary", broker_url="primary") # Temp 
hack to identify primary
+        primary = self.ha_broker(name="primary")
+        self.promote(primary)
         p = primary.connect().session()
         s = p.sender(queue("q","messages"))
         for m in [str(i) for i in range(0,10)]: s.send(m)
@@ -186,7 +188,8 @@ class ShortTests(BrokerTest):
 
     def test_send_receive(self):
         """Verify sequence numbers of messages sent by qpid-send"""
-        primary = self.ha_broker(name="primary", broker_url="primary")
+        primary = self.ha_broker(name="primary")
+        self.promote(primary)
         backup1 = self.ha_broker(name="backup1", 
broker_url=primary.host_port())
         backup2 = self.ha_broker(name="backup2", 
broker_url=primary.host_port())
         sender = self.popen(
@@ -219,7 +222,8 @@ class ShortTests(BrokerTest):
     def test_failover(self):
         """Verify that backups rejects connections and that fail-over works in 
python client"""
         getLogger().setLevel(ERROR) # Disable WARNING log messages due to 
failover
-        primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, 
broker_url="primary") # Temp hack to identify primary
+        primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL)
+        self.promote(primary)
         backup = self.ha_broker(name="backup", broker_url=primary.host_port())
         # Check that backup rejects normal connections
         try:
@@ -237,12 +241,13 @@ class ShortTests(BrokerTest):
         sender.send("foo")
         primary.kill()
         assert retry(lambda: not is_running(primary.pid))
-        self.set_ha_status(backup.host_port(), "primary")         # Promote 
the backup
+        self.promote(backup)
         self.assert_browse_retry(s, "q", ["foo"])
         c.close()
 
     def test_failover_cpp(self):
-        primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, 
broker_url="primary") # Temp hack to identify primary
+        primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL)
+        self.promote(primary)
         backup = self.ha_broker(name="backup", broker_url=primary.host_port())
         url="%s,%s"%(primary.host_port(), backup.host_port())
         
primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
@@ -257,7 +262,7 @@ class ShortTests(BrokerTest):
 
         primary.kill()
         assert retry(lambda: not is_running(primary.pid)) # Wait for primary 
to die
-        self.set_ha_status(backup.host_port(), "primary")
+        self.promote(backup)
         n = receiver.received       # Make sure we are still running
         assert retry(lambda: receiver.received > n + 10)
         sender.stop()

Modified: qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-ha-status
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-ha-status?rev=1236846&r1=1236845&r2=1236846&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-ha-status (original)
+++ qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-ha-status Fri Jan 27 
19:14:07 2012
@@ -38,11 +38,8 @@ def validate_status(value):
 class HaBroker:
     def __init__(self, broker, session):
         self.session = session
-        try:
-           self.qmf_broker = self.session.addBroker(
-                   broker, client_properties={"qpid.ha-admin":1})
-        except Exception, e:
-            raise Exception("Can't connect to %s: %s"%(broker,e))
+        self.qmf_broker = self.session.addBroker(
+            broker, client_properties={"qpid.ha-admin":1})
         ha_brokers=self.session.getObjects(_class="habroker", 
_package="org.apache.qpid.ha")
         if (not ha_brokers): raise Exception("Broker does not have HA 
enabled.")
         self.ha_broker = ha_brokers[0];



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to