Robbie/All
I've attached an update of this work incorporating Robbie's suggestions
for improving the getSessionAdapter() helper method.
Does that cover what you were thinking?
Frase
On 23/04/13 21:13, Robbie Gemmell wrote:
On 23 April 2013 18:59, Fraser Adams <[email protected]> wrote:
On 22/04/13 22:26, Robbie Gemmell wrote:
Looking this over it seems like it would work, but I can't say I am a big
fan of inspecting potentially every connection during every consumer
creation and removal.
Yeah I have to agree with you there, I wasn't wildly keen on it myself but
I couldn't see an obvious alternative when I was trying this stuff out.
Some of the associations get a bit convoluted don't they :-) Character
building I think it's called.
An alternative might be: the SessionModel interface has a method to get
the associated ConnectionModel, with which you could directly query for the
associated Connection[Adapter] from the Virtualhost since it already
maintains that mapping.
So I guess that you mean org.apache.qpid.server.**protocol.AMQSessionModel?
So I've got a reference to that passed into
the getSessionAdapter(**AMQSessionModel session) helper method. The
AMQSessionModel interface method you are alluding to is
public AMQConnectionModel getConnectionModel();
Is that correct?
Yep, those are the bits I meant.
Looking at VirtualHostAdapter.java there doesn't currently seem to be an
accessor to lookup ConnectionAdapter by AMQConnectionModel so I'd have to
add that, true? Clearly trivial given that the Map and registration stuff
already exist.
But it definitely seems better to do that than iterate through Connections
especially for non-trivial numbers of connections.
Yes, its not ideal adding the method, but I think its better than the
alternative. We can try to do something nicer later when we will be looking
to tidy that stuff up anyway.
I probably won't have time to revise this until Friday due to work and
family commitments, so if you can bear with me 'til then and I'll post a
revised patch using this approach.
Thanks for the feedback.
No problem, thanks for the patch :)
Frase
------------------------------**------------------------------**---------
To unsubscribe, e-mail:
[email protected].**org<[email protected]>
For additional commands, e-mail: [email protected]
Index: qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
===================================================================
--- qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (revision 1469902)
+++ qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (working copy)
@@ -68,20 +68,37 @@
{
if(!actualSessions.contains(session))
{
- _sessionAdapters.remove(session);
+ SessionAdapter adapter = _sessionAdapters.remove(session);
+ childRemoved(adapter); // Trigger corresponding ConfigurationChangeListener childRemoved() callback.
}
}
for(AMQSessionModel session : actualSessions)
{
if(!_sessionAdapters.containsKey(session))
{
- _sessionAdapters.put(session, new SessionAdapter(session, getTaskExecutor()));
+ SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor());
+ _sessionAdapters.put(session, adapter);
+ childAdded(adapter); // Trigger corresponding ConfigurationChangeListener childAdded() callback.
}
}
return new ArrayList<Session>(_sessionAdapters.values());
}
}
+ /**
+ * Retrieve the SessionAdapter instance keyed by the AMQSessionModel from this Connection.
+ * @param session the AMQSessionModel used to index the SessionAdapter.
+ * @return the requested SessionAdapter.
+ */
+ SessionAdapter getSessionAdapter(AMQSessionModel session)
+ {
+ synchronized (_sessionAdapters)
+ {
+ getSessions(); // Call getSessions() first to ensure _sessionAdapters state is up to date with actualSessions.
+ return _sessionAdapters.get(session);
+ }
+ }
+
public void delete()
{
try
Index: qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
===================================================================
--- qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (revision 1469902)
+++ qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (working copy)
@@ -21,8 +21,10 @@
package org.apache.qpid.server.model.adapter;
import java.security.AccessControlException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -34,6 +36,7 @@
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -44,6 +47,7 @@
private AMQSessionModel _session;
private SessionStatistics _statistics;
+ private Map<Subscription, ConsumerAdapter> _consumerAdapters = new HashMap<Subscription, ConsumerAdapter>();
public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor)
{
@@ -54,7 +58,10 @@
public Collection<Consumer> getSubscriptions()
{
- return null; //TODO
+ synchronized (_consumerAdapters)
+ {
+ return new ArrayList<Consumer>(_consumerAdapters.values());
+ }
}
public Collection<Publisher> getPublishers()
@@ -111,6 +118,37 @@
return 0; //TODO
}
+ /**
+ * Register a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
+ * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ * @param adapter the registered ConsumerAdapter.
+ */
+ void subscriptionRegistered(Subscription subscription, ConsumerAdapter adapter)
+ {
+ synchronized (_consumerAdapters)
+ {
+ _consumerAdapters.put(subscription, adapter);
+ }
+ childAdded(adapter);
+ }
+
+ /**
+ * Unregister a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
+ * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ */
+ void subscriptionUnregistered(Subscription subscription)
+ {
+ ConsumerAdapter adapter = null;
+ synchronized (_consumerAdapters)
+ {
+ adapter = _consumerAdapters.remove(subscription);
+ }
+ if (adapter != null)
+ {
+ childRemoved(adapter);
+ }
+ }
+
@Override
public Collection<String> getAttributeNames()
{
Index: qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
===================================================================
--- qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java (revision 1469902)
+++ qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java (working copy)
@@ -34,13 +34,17 @@
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFinder;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
+import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.util.MapValueConverter;
@@ -91,6 +95,38 @@
_queue.setNotificationListener(this);
}
+ /**
+ * Helper method to retrieve the SessionAdapter keyed by the AMQSessionModel.
+ * This method first finds the ConnectionAdapter associated with the Session from this QueueAdapter's parent vhost
+ * then it does a lookup on that to find the SessionAdapter keyed by the requested AMQSessionModel instance.
+ * @param session the AMQSessionModel used to index the SessionAdapter.
+ * @return the requested SessionAdapter or null if it can't be found.
+ */
+ private SessionAdapter getSessionAdapter(AMQSessionModel session)
+ {
+ // Retrieve the ConnectionModel associated with the SessionModel as a key to lookup the ConnectionAdapter.
+ AMQConnectionModel connectionKey = session.getConnectionModel();
+
+ // Lookup the ConnectionAdapter, from which we should be able to retrieve the SessionAdapter we really want.
+ ConnectionAdapter connectionAdapter = _vhost.getConnectionAdapter(connectionKey);
+ if (connectionAdapter == null)
+ {
+ return null; // If we can't find an associated ConnectionAdapter the SessionAdapter is a lost cause.
+ }
+ else
+ { // With a good ConnectionAdapter we can finally try to find the SessionAdapter we are actually looking for.
+ SessionAdapter sessionAdapter = connectionAdapter.getSessionAdapter(session);
+ if (sessionAdapter == null)
+ {
+ return null; // If the SessionAdapter isn't associated with the selected ConnectionAdapter give up.
+ }
+ else
+ {
+ return sessionAdapter;
+ }
+ }
+ }
+
private void populateConsumers()
{
Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers();
@@ -102,7 +138,13 @@
{
if(!_consumerAdapters.containsKey(subscription))
{
- _consumerAdapters.put(subscription, new ConsumerAdapter(this, subscription));
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
+ _consumerAdapters.put(subscription, adapter);
+ if (sessionAdapter != null)
+ { // Register ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionRegistered(subscription, adapter);
+ }
}
}
}
@@ -571,9 +613,13 @@
{
if(!_consumerAdapters.containsKey(subscription))
{
- adapter = new ConsumerAdapter(this, subscription);
- _consumerAdapters.put(subscription,adapter);
- // TODO - register with session
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
+ _consumerAdapters.put(subscription, adapter);
+ if (sessionAdapter != null)
+ { // Register ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionRegistered(subscription, adapter);
+ }
}
}
if(adapter != null)
@@ -589,10 +635,14 @@
synchronized (_consumerAdapters)
{
adapter = _consumerAdapters.remove(subscription);
- // TODO - register with session
}
if(adapter != null)
{
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ if (sessionAdapter != null)
+ { // Unregister ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionUnregistered(subscription);
+ }
childRemoved(adapter);
}
}
Index: qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
===================================================================
--- qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (revision 1469902)
+++ qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (working copy)
@@ -214,6 +214,19 @@
}
+ /**
+ * Retrieve the ConnectionAdapter instance keyed by the AMQConnectionModel from this VirtualHost.
+ * @param connection the AMQConnectionModel used to index the ConnectionAdapter.
+ * @return the requested ConnectionAdapter.
+ */
+ ConnectionAdapter getConnectionAdapter(AMQConnectionModel connection)
+ {
+ synchronized (_connectionAdapters)
+ {
+ return _connectionAdapters.get(connection);
+ }
+ }
+
public Collection<Queue> getQueues()
{
synchronized(_queueAdapters)
@@ -644,6 +657,10 @@
if(adapter != null)
{
+ // Call getSessions() first to ensure that any SessionAdapter children are cleanly removed and any
+ // corresponding ConfigurationChangeListener childRemoved() callback is called for child SessionAdapters.
+ adapter.getSessions();
+
childRemoved(adapter);
}
}
Index: qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
===================================================================
--- qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (revision 1469902)
+++ qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (working copy)
@@ -37,9 +37,11 @@
{
private final Subscription _subscription;
private final QueueAdapter _queue;
+ private final SessionAdapter _session;
private final ConsumerStatistics _statistics;
- public ConsumerAdapter(final QueueAdapter queueAdapter, final Subscription subscription)
+ public ConsumerAdapter(final QueueAdapter queueAdapter, final SessionAdapter sessionAdapter,
+ final Subscription subscription)
{
super(UUIDGenerator.generateConsumerUUID(queueAdapter.getVirtualHost().getName(),
queueAdapter.getName(),
@@ -48,6 +50,7 @@
subscription.getConsumerName()), queueAdapter.getTaskExecutor());
_subscription = subscription;
_queue = queueAdapter;
+ _session = sessionAdapter;
_statistics = new ConsumerStatistics();
//TODO
}
Index: qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java
===================================================================
--- qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java (revision 1469902)
+++ qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java (working copy)
@@ -169,6 +169,8 @@
_agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getSubscribeSchema());
_agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getUnsubscribeSchema());
+ _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Session.getSchema());
+
// Initialise QmfAgentData Objects and track changes to the broker Management Objects.
registerConfigurationChangeListeners();
}
@@ -176,10 +178,12 @@
catch (QmfException qmfe)
{
_log.info("QmfException {} caught in QmfManagementAgent Constructor", qmfe.getMessage());
+ _agent = null; // Causes isConnected() to be false and thus prevents the "QMF2 Management Ready" message.
}
catch (Exception e)
{
_log.info("Exception {} caught in QmfManagementAgent Constructor", e.getMessage());
+ _agent = null; // Causes isConnected() to be false and thus prevents the "QMF2 Management Ready" message.
}
}
@@ -214,7 +218,7 @@
for (VirtualHost vhost : _broker.getVirtualHosts())
{
// We don't add QmfAgentData VirtualHost objects. Possibly TODO, but it's a bit awkward at the moment
- // becase (as of Qpid 0.20) the C++ broker doesn't *seem* to do much with them and the command line
+ // because (as of Qpid 0.20) the C++ broker doesn't *seem* to do much with them and the command line
// tools such as qpid-config don't appear to be VirtualHost aware. A way to stay compatible is to
// mark queues, exchanges etc with [vhost:<vhost-name>/]<object-name> (see Constructor comments).
vhost.addChangeListener(this);
@@ -227,7 +231,6 @@
{
childAdded(connection, session);
- // session.getSubscriptions() returns null in Qpid 0.23 TODO fix that.
if (session.getSubscriptions() != null)
{
for (Consumer subscription : session.getSubscriptions())
@@ -330,7 +333,7 @@
* QMF2 Management Object if one doesn't already exist. In most cases it's a one-to-one mapping, but for
* Binding for example the Binding child is added to both Queue and Exchange so we only create the Binding
* QMF2 Management Object once and add the queueRef and exchangeRef reference properties referencing the Queue
- * and Exchange parent Objects respectively.
+ * and Exchange parent Objects respectively, Similarly for Consumer (AKA Subscription).
* <p>
* This method is also responsible for raising the appropriate QMF2 Events when Management Objects are created.
* @param object the parent object that the child is being added to.
@@ -362,8 +365,16 @@
agentConnection = false; // Only ignore the first Connection, which is the one from the Agent.
}
else if (child instanceof Session)
- { // TODO
-
+ {
+ if (!_objects.containsKey(child))
+ {
+ QmfAgentData ref = _objects.get(object); // Get the Connection QmfAgentData so we can get connectionRef.
+ if (ref != null)
+ {
+ data = new org.apache.qpid.server.qmf2.agentdata.Session((Session)child, ref.getObjectId());
+ _objects.put(child, data);
+ }
+ }
}
else if (child instanceof Exchange)
{
@@ -449,17 +460,15 @@
{
subscription.setQueueRef(ref.getObjectId(), (Queue)object);
// Raise a Subscribe Event - N.B. Need to do it *after* we've set the queueRef.
- _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Subscription)data).createSubscribeEvent());
+ _agent.raiseEvent(subscription.createSubscribeEvent());
}
- else if (object instanceof Session) // Won't get called in Qpid 0.20.
- { // TODO the association between Session and Subscription isn't implemented in the 0.20 Java Broker.
- //System.out.println("subscription.setSessionRef");
+ else if (object instanceof Session)
+ {
subscription.setSessionRef(ref.getObjectId());
}
}
}
-
try
{
// If we've created new QmfAgentData we register it with the Agent.
@@ -503,8 +512,8 @@
_agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Connection)data).createClientDisconnectEvent());
}
else if (child instanceof Session)
- { // TODO
-
+ {
+ // no-op, don't need to do anything specific when Session is removed.
}
else if (child instanceof Exchange)
{
@@ -588,5 +597,4 @@
}
}
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]