Author: aconway
Date: Wed Jan 25 18:48:09 2012
New Revision: 1235867

URL: http://svn.apache.org/viewvc?rev=1235867&view=rev
Log:
QPID-3603: HA backup rejects client connections.

Added:
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObserver.h
      - copied, changed from r1235866, 
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h   
(with props)
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h   (with 
props)
Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am?rev=1235867&r1=1235866&r2=1235867&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/Makefile.am Wed Jan 25 18:48:09 2012
@@ -541,6 +541,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/Credit.h \
   qpid/broker/Credit.cpp \
   qpid/broker/ConsumerFactory.h \
+  qpid/broker/ConnectionObserver.h \
+  qpid/broker/ConnectionObservers.h \
   qpid/broker/Daemon.cpp \
   qpid/broker/Daemon.h \
   qpid/broker/Deliverable.h \

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk?rev=1235867&r1=1235866&r2=1235867&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk Wed Jan 25 18:48:09 2012
@@ -34,7 +34,8 @@ ha_la_SOURCES =                                       \
   qpid/ha/ReplicatingSubscription.h            \
   qpid/ha/ReplicatingSubscription.cpp          \
   qpid/ha/BrokerReplicator.cpp                 \
-  qpid/ha/BrokerReplicator.h
+  qpid/ha/BrokerReplicator.h                    \
+  qpid/ha/ConnectionExcluder.h
 
 ha_la_LIBADD = libqpidbroker.la
 ha_la_LDFLAGS = $(PLUGINLDFLAGS)

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.h?rev=1235867&r1=1235866&r2=1235867&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.h Wed Jan 25 
18:48:09 2012
@@ -38,6 +38,7 @@
 #include "qpid/broker/System.h"
 #include "qpid/broker/ExpiryPolicy.h"
 #include "qpid/broker/ConsumerFactory.h"
+#include "qpid/broker/ConnectionObservers.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qmf/org/apache/qpid/broker/Broker.h"
@@ -200,6 +201,7 @@ public:
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConnectionCounter connectionCounter;
     ConsumerFactories consumerFactories;
+    ConnectionObservers connectionObservers;
 
   public:
     virtual ~Broker();
@@ -360,6 +362,7 @@ public:
                 const std::string& connectionId);
 
     ConsumerFactories&  getConsumerFactories() { return consumerFactories; }
+    ConnectionObservers& getConnectionObservers() { return 
connectionObservers; }
 };
 
 }}

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1235867&r1=1235866&r2=1235867&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp Wed Jan 
25 18:48:09 2012
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/ConnectionObserver.h"
 #include "qpid/broker/SessionOutputException.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/Bridge.h"
@@ -162,8 +163,11 @@ void Connection::received(framing::AMQFr
         recordFromServer(frame);
     else
         recordFromClient(frame);
-    if (!wasOpen && isOpen())
+    if (!wasOpen && isOpen()) {
         doIoCallbacks(); // Do any callbacks registered before we opened.
+        // FIXME aconway 2012-01-18: generic observer points.
+        broker.getConnectionObservers().connect(*this);
+    }
 }
 
 void Connection::sent(const framing::AMQFrame& frame)

Copied: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObserver.h 
(from r1235866, qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h)
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObserver.h?p2=qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObserver.h&p1=qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h&r1=1235866&r2=1235867&rev=1235867&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObserver.h Wed 
Jan 25 18:48:09 2012
@@ -1,5 +1,5 @@
-#ifndef QPID_HA_SETTINGS_H
-#define QPID_HA_SETTINGS_H
+#ifndef QPID_BROKER_CONNECTIONOBSERVER_H
+#define QPID_BROKER_CONNECTIONOBSERVER_H
 
 /*
  *
@@ -22,26 +22,31 @@
  *
  */
 
-#include <string>
-
 namespace qpid {
-namespace ha {
+namespace broker {
 
-using std::string;
+class Connection;
 
 /**
- * Configurable settings for HA.
+ * Observer that is informed of connection events.  For use by
+ * plug-ins that want to be notified of, or influence, connection
+ * events.
  */
-class Settings
+class ConnectionObserver
 {
   public:
-    Settings() : enabled(false) {}
-    bool enabled;
-    string clientUrl;
-    string brokerUrl;
-    string username, password, mechanism;
-  private:
+    virtual ~ConnectionObserver() {}
+
+    /** Called when a connection is opened and authentication has been
+     * performed.
+     * @exception Throwing an exception will abort the connection.
+     */
+    virtual void connect(Connection& connection) = 0;
+
+    /** Called when a connection is torn down. */
+    virtual void disconnect(Connection& connection) = 0;
 };
-}} // namespace qpid::ha
 
-#endif  /*!QPID_HA_SETTINGS_H*/
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_CONNECTIONOBSERVER_H*/

Added: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h?rev=1235867&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h 
(added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h 
Wed Jan 25 18:48:09 2012
@@ -0,0 +1,56 @@
+#ifndef QPID_BROKER_CONNECTIONOBSERVERS_H
+#define QPID_BROKER_CONNECTIONOBSERVERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionObserver.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A collection of connection observers.
+ * Calling a ConnectionObserver function will call that function on each 
observer.
+ */
+class ConnectionObservers : public ConnectionObserver {
+  public:
+    // functions for managing the collection of observers
+    void add(boost::shared_ptr<ConnectionObserver> observer) {
+        observers.push_back(observer);
+    }
+
+    // implementation of ConnectionObserver interface
+    void connect(Connection& c) {
+        std::for_each(observers.begin(), observers.end(), 
boost::bind(&ConnectionObserver::connect, _1, boost::ref(c)));
+    }
+    void disconnect(Connection& c) {
+        std::for_each(observers.begin(), observers.end(), 
boost::bind(&ConnectionObserver::disconnect, _1, boost::ref(c)));
+    }
+
+  private:
+    typedef std::vector<boost::shared_ptr<ConnectionObserver> > Observers;
+    Observers observers;
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_CONNECTIONOBSERVERS_H*/

Propchange: 
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h?rev=1235867&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h (added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h Wed Jan 
25 18:48:09 2012
@@ -0,0 +1,70 @@
+#ifndef QPID_HA_CONNECTIONEXCLUDER_H
+#define QPID_HA_CONNECTIONEXCLUDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/ConnectionObserver.h"
+#include "qpid/broker/Connection.h"
+#include <boost/function.hpp>
+#include <sstream>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Exclude normal connections to a backup broker.
+ * Connections as ha-admin user are allowed.
+ */
+class ConnectionExcluder : public broker::ConnectionObserver
+{
+  public:
+    typedef boost::function<bool()> PrimaryTest;
+
+    ConnectionExcluder(string adminUser_, PrimaryTest isPrimary_)
+        : adminUser(adminUser_), isPrimary(isPrimary_) {}
+
+    void connect(broker::Connection& connection) {
+        if (!isPrimary() && !connection.isLink()
+            && !connection.isAuthenticatedUser(adminUser))
+        {
+            throw Exception(
+                QPID_MSG(
+                    "HA: Backup broker rejected connection "
+                    << connection.getMgmtId() << " by user " << 
connection.getUserId()
+                    << ". Only " << adminUser << " can connect to a backup."));
+        }
+        else {
+            QPID_LOG(debug, "HA: Backup broker accepted connection"
+                     << connection.getMgmtId() << " by user "
+                     << connection.getUserId());
+        }
+    }
+
+    void disconnect(broker::Connection&) {}
+
+  private:
+    string adminUser;
+    PrimaryTest isPrimary;
+};
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_CONNECTIONEXCLUDER_H*/

Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1235867&r1=1235866&r2=1235867&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp Wed Jan 25 
18:48:09 2012
@@ -19,6 +19,7 @@
  *
  */
 #include "Backup.h"
+#include "ConnectionExcluder.h"
 #include "HaBroker.h"
 #include "Settings.h"
 #include "ReplicatingSubscription.h"
@@ -60,15 +61,20 @@ HaBroker::HaBroker(broker::Broker& b, co
         ma->addObject(mgmtObject);
     }
     // FIXME aconway 2011-11-22: temporary hack to identify primary.
-    bool isPrimary = (s.brokerUrl == "primary");
-    QPID_LOG(notice, "HA: " << (isPrimary ? "Primary" : "Backup")
+    bool primary = (s.brokerUrl == "primary");
+    QPID_LOG(notice, "HA: " << (primary ? "Primary" : "Backup")
              << " initialized: client-url=" << clientUrl
              << " broker-url=" << brokerUrl);
-    if (!isPrimary) backup.reset(new Backup(broker, s));
+    if (!primary) backup.reset(new Backup(broker, s));
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(
         boost::shared_ptr<ReplicatingSubscription::Factory>(
             new ReplicatingSubscription::Factory()));
+    // Register a connection excluder
+    broker.getConnectionObservers().add(
+        boost::shared_ptr<broker::ConnectionObserver>(
+            new ConnectionExcluder(
+                s.adminUser, boost::bind(&HaBroker::isPrimary, this))));
 }
 
 HaBroker::~HaBroker() {}
@@ -87,4 +93,8 @@ Manageable::status_t HaBroker::Managemen
     return Manageable::STATUS_OK;
 }
 
+bool HaBroker::isPrimary() const {
+    return !backup.get();       // TODO aconway 2012-01-18: temporary test.
+}
+
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1235867&r1=1235866&r2=1235867&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h Wed Jan 25 
18:48:09 2012
@@ -52,6 +52,7 @@ class HaBroker : public management::Mana
     management::Manageable::status_t ManagementMethod (
         uint32_t methodId, management::Args& args, std::string& text);
 
+    bool isPrimary() const;
   private:
     broker::Broker& broker;
     Url clientUrl, brokerUrl;

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1235867&r1=1235866&r2=1235867&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp Wed Jan 25 
18:48:09 2012
@@ -37,6 +37,7 @@ struct Options : public qpid::Options {
             ("ha-username", optValue(settings.username, "USER"), "Username for 
connections between brokers")
             ("ha-password", optValue(settings.password, "PASS"), "Password for 
connections between brokers")
             ("ha-mechanism", optValue(settings.mechanism, "MECH"), 
"Authentication mechanism for connections between brokers")
+            ("ha-admin-user", optValue(settings.adminUser, "USER"), "User 
allowed to perform HA administration tasks")
             ;
     }
 };

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h?rev=1235867&r1=1235866&r2=1235867&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h Wed Jan 25 
18:48:09 2012
@@ -35,11 +35,12 @@ using std::string;
 class Settings
 {
   public:
-    Settings() : enabled(false) {}
+    Settings() : enabled(false), adminUser("qpid-ha-admin") {}
     bool enabled;
     string clientUrl;
     string brokerUrl;
     string username, password, mechanism;
+    string adminUser;
   private:
 };
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1235867&r1=1235866&r2=1235867&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Wed Jan 25 
18:48:09 2012
@@ -19,7 +19,7 @@
 #
 
 import os, signal, sys, time, imp, re, subprocess, glob, random, logging, 
shutil
-from qpid.messaging import Message, NotFound
+from qpid.messaging import Message, NotFound, ConnectionError
 from brokertest import *
 from threading import Thread, Lock, Condition
 from logging import getLogger
@@ -54,6 +54,10 @@ class ShortTests(BrokerTest):
             self.fail("Should not have been replicated: %s"%(address))
         except NotFound: pass
 
+    def connect_admin(self, backup, **kwargs):
+        """Connect to a backup broker as the admin user"""
+        return backup.connect(username="qpid-ha-admin", password="dummy", 
mechanism="PLAIN", **kwargs)
+
     def test_replication(self):
         """Test basic replication of wiring and messages before and
         after backup has connected"""
@@ -116,7 +120,7 @@ class ShortTests(BrokerTest):
         setup(p, "2", primary)
 
         # Verify the data on the backup
-        b = backup.connect().session()
+        b = self.connect_admin(backup, ).session()
         verify(b, "1", p)
         verify(b, "2", p)
 
@@ -162,10 +166,10 @@ class ShortTests(BrokerTest):
         s.sync()
 
         msgs = [str(i) for i in range(30)]
-        b1 = backup1.connect().session()
+        b1 = self.connect_admin(backup1).session()
         self.wait(b1, "q");
         self.assert_browse_retry(b1, "q", msgs)
-        b2 = backup2.connect().session()
+        b2 = self.connect_admin(backup2).session()
         self.wait(b2, "q");
         self.assert_browse_retry(b2, "q", msgs)
 
@@ -192,14 +196,26 @@ class ShortTests(BrokerTest):
             self.assertEqual(receiver.wait(), 0)
             expect = [long(i) for i in range(991, 1001)]
             sn = lambda m: m.properties["sn"]
-            self.assert_browse_retry(backup1.connect().session(), "q", expect, 
transform=sn)
-            self.assert_browse_retry(backup2.connect().session(), "q", expect, 
transform=sn)
+            self.assert_browse_retry(self.connect_admin(backup1).session(), 
"q", expect, transform=sn)
+            self.assert_browse_retry(self.connect_admin(backup2).session(), 
"q", expect, transform=sn)
         except:
             print self.browse(primary.connect().session(), "q", transform=sn)
-            print self.browse(backup1.connect().session(), "q", transform=sn)
-            print self.browse(backup2.connect().session(), "q", transform=sn)
+            print self.browse(self.connect_admin(backup1).session(), "q", 
transform=sn)
+            print self.browse(self.connect_admin(backup2).session(), "q", 
transform=sn)
             raise
 
+    def test_exclude(self):
+        """Verify that backup rejects connections"""
+        primary = self.ha_broker(name="primary", broker_url="primary") # Temp 
hack to identify primary
+        backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+        # Admin is allowed
+        self.connect_admin(backup)
+        # Others are not
+        try:
+            backup.connect()
+            self.fail("Expected connection to backup to fail")
+        except ConnectionError: pass
+
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)
     os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + 
sys.argv[1:])



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to