Author: aconway
Date: Wed Jan 25 18:48:18 2012
New Revision: 1235868
URL: http://svn.apache.org/viewvc?rev=1235868&view=rev
Log:
QPID-3603: Refactor LinkRegistry to use a ConnectionObserver.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObserver.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.h?rev=1235868&r1=1235867&r2=1235868&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Broker.h Wed Jan 25
18:48:18 2012
@@ -179,6 +179,7 @@ public:
std::auto_ptr<MessageStore> store;
AclModule* acl;
DataDir dataDir;
+ ConnectionObservers connectionObservers;
QueueRegistry queues;
ExchangeRegistry exchanges;
@@ -201,7 +202,6 @@ public:
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
ConsumerFactories consumerFactories;
- ConnectionObservers connectionObservers;
public:
virtual ~Broker();
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1235868&r1=1235867&r2=1235868&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp Wed Jan
25 18:48:18 2012
@@ -104,8 +104,7 @@ Connection::Connection(ConnectionOutputH
outboundTracker(*this)
{
outboundTracker.wrap(out);
- if (link)
- links.notifyConnection(mgmtId, this);
+ broker.getConnectionObservers().connection(*this);
// In a cluster, allow adding the management object to be delayed.
if (!delayManagement) addManagementObject();
if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
@@ -143,8 +142,7 @@ Connection::~Connection()
if (!link && isClusterSafe())
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId,
ConnectionState::getUserId()));
}
- if (link)
- links.notifyClosed(mgmtId);
+ broker.getConnectionObservers().closed(*this);
if (heartbeatTimer)
heartbeatTimer->cancel();
@@ -165,8 +163,7 @@ void Connection::received(framing::AMQFr
recordFromClient(frame);
if (!wasOpen && isOpen()) {
doIoCallbacks(); // Do any callbacks registered before we opened.
- // FIXME aconway 2012-01-18: generic observer points.
- broker.getConnectionObservers().connect(*this);
+ broker.getConnectionObservers().opened(*this);
}
}
@@ -267,8 +264,7 @@ string Connection::getAuthCredentials()
void Connection::notifyConnectionForced(const string& text)
{
- if (link)
- links.notifyConnectionForced(mgmtId, text);
+ broker.getConnectionObservers().forced(*this, text);
}
void Connection::setUserId(const string& userId)
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObserver.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObserver.h?rev=1235868&r1=1235867&r2=1235868&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObserver.h
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObserver.h Wed
Jan 25 18:48:18 2012
@@ -37,14 +37,19 @@ class ConnectionObserver
public:
virtual ~ConnectionObserver() {}
- /** Called when a connection is opened and authentication has been
- * performed.
+ /** Called when a connection is first established. */
+ virtual void connection(Connection&) {}
+
+ /** Called when the opening negotiation is done and the connection is
authenticated.
* @exception Throwing an exception will abort the connection.
*/
- virtual void connect(Connection& connection) = 0;
+ virtual void opened(Connection&) {}
+
+ /** Called when a connection is closed. */
+ virtual void closed(Connection&) {}
- /** Called when a connection is torn down. */
- virtual void disconnect(Connection& connection) = 0;
+ /** Called when a connection is forced closed. */
+ virtual void forced(Connection&, const std::string& /*message*/) {}
};
}} // namespace qpid::broker
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h?rev=1235868&r1=1235867&r2=1235868&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/ConnectionObservers.h
Wed Jan 25 18:48:18 2012
@@ -38,17 +38,29 @@ class ConnectionObservers : public Conne
observers.push_back(observer);
}
- // implementation of ConnectionObserver interface
- void connect(Connection& c) {
- std::for_each(observers.begin(), observers.end(),
boost::bind(&ConnectionObserver::connect, _1, boost::ref(c)));
+ void connection(Connection& c) {
+ each(boost::bind(&ConnectionObserver::connection, _1, boost::ref(c)));
}
- void disconnect(Connection& c) {
- std::for_each(observers.begin(), observers.end(),
boost::bind(&ConnectionObserver::disconnect, _1, boost::ref(c)));
+
+ void opened(Connection& c) {
+ each(boost::bind(&ConnectionObserver::opened, _1, boost::ref(c)));
+ }
+
+ void closed(Connection& c) {
+ each(boost::bind(&ConnectionObserver::closed, _1, boost::ref(c)));
+ }
+
+ void forced(Connection& c, const std::string& text) {
+ each(boost::bind(&ConnectionObserver::forced, _1, boost::ref(c),
text));
}
private:
typedef std::vector<boost::shared_ptr<ConnectionObserver> > Observers;
Observers observers;
+
+ template <class F> void each(F f) {
+ std::for_each(observers.begin(), observers.end(), f);
+ }
};
}} // namespace qpid::broker
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1235868&r1=1235867&r2=1235868&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Wed Jan
25 18:48:18 2012
@@ -48,6 +48,16 @@ LinkRegistry::LinkRegistry () :
{
}
+namespace {
+struct ConnectionObserverImpl : public ConnectionObserver {
+ LinkRegistry& links;
+ ConnectionObserverImpl(LinkRegistry& l) : links(l) {}
+ void connection(Connection& c) { links.notifyConnection(c.getMgmtId(),
&c); }
+ void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); }
+ void forced(Connection& c, const string& text) {
links.notifyConnectionForced(c.getMgmtId(), text); }
+};
+}
+
LinkRegistry::LinkRegistry (Broker* _broker) :
broker(_broker), timer(&broker->getTimer()),
maintenanceTask(new Periodic(*this)),
@@ -55,6 +65,8 @@ LinkRegistry::LinkRegistry (Broker* _bro
realm(broker->getOptions().realm)
{
timer->add(maintenanceTask);
+ broker->getConnectionObservers().add(
+ boost::shared_ptr<ConnectionObserver>(new
ConnectionObserverImpl(*this)));
}
LinkRegistry::~LinkRegistry()
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h?rev=1235868&r1=1235867&r2=1235868&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.h Wed Jan
25 18:48:18 2012
@@ -42,7 +42,7 @@ class ConnectionExcluder : public broker
ConnectionExcluder(string adminUser_, PrimaryTest isPrimary_)
: adminUser(adminUser_), isPrimary(isPrimary_) {}
- void connect(broker::Connection& connection) {
+ void opened(broker::Connection& connection) {
if (!isPrimary() && !connection.isLink()
&& !connection.isAuthenticatedUser(adminUser))
{
@@ -59,8 +59,6 @@ class ConnectionExcluder : public broker
}
}
- void disconnect(broker::Connection&) {}
-
private:
string adminUser;
PrimaryTest isPrimary;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]