Author: kwall
Date: Fri May 30 16:24:58 2014
New Revision: 1598658
URL: http://svn.apache.org/r1598658
Log:
QPID-5795: [Java Broker] Prevent ConnectionAdapter leak when closing a
messaging connection
The leak was due to the fact that nothing was telling the virtualhost to
unregister the connection child (#unregisterChild)
when the connection was closed.
* Made ConnectionAdapter responsible for causing its own deletion (when the
underlying connection is closed). The call to #deleted() causes the child
to be unregistered from its parent (preventing the leak)
* Removed the now unnecessary _connectionAdapters map from the VH. This
needlessly duplicated information already held more generally by the ACO.
* Refactored SessionAdapter in sympathy with CA changes. SessionsAdapters
where _not_ being leaked as the session implementation were already telling the
model to delete.
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1598658&r1=1598657&r2=1598658&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
Fri May 30 16:24:58 2014
@@ -21,9 +21,7 @@
package org.apache.qpid.server.model.adapter;
import java.security.Principal;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -40,22 +38,31 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.SessionModelListener;
+import org.apache.qpid.server.util.Action;
public final class ConnectionAdapter extends
AbstractConfiguredObject<ConnectionAdapter> implements
Connection<ConnectionAdapter>,
SessionModelListener
{
private AMQConnectionModel _connection;
- private final Map<AMQSessionModel, SessionAdapter> _sessionAdapters =
- new HashMap<AMQSessionModel, SessionAdapter>();
-
private State _state = State.ACTIVE;
public ConnectionAdapter(final AMQConnectionModel conn)
{
super(parentsMap(conn.getVirtualHost()),createAttributes(conn));
_connection = conn;
+
+ conn.addDeleteTask(new Action()
+ {
+ @Override
+ public void performAction(final Object object)
+ {
+ conn.removeDeleteTask(this);
+ deleted();
+ }
+ });
open();
+
conn.addSessionListener(this);
}
@@ -137,23 +144,7 @@ public final class ConnectionAdapter ext
public Collection<Session> getSessions()
{
- synchronized (_sessionAdapters)
- {
- return new ArrayList<Session>(_sessionAdapters.values());
- }
- }
-
- /**
- * Retrieve the SessionAdapter instance keyed by the AMQSessionModel from
this Connection.
- * @param session the AMQSessionModel used to index the SessionAdapter.
- * @return the requested SessionAdapter.
- */
- SessionAdapter getSessionAdapter(AMQSessionModel session)
- {
- synchronized (_sessionAdapters)
- {
- return _sessionAdapters.get(session);
- }
+ return getChildren(Session.class);
}
@StateTransition( currentState = State.ACTIVE, desiredState =
State.DELETED)
@@ -171,19 +162,6 @@ public final class ConnectionAdapter ext
@Override
- public <C extends ConfiguredObject> Collection<C> getChildren(Class<C>
clazz)
- {
- if(clazz == Session.class)
- {
- return (Collection<C>) getSessions();
- }
- else
- {
- return Collections.emptySet();
- }
- }
-
- @Override
public <C extends ConfiguredObject> C addChild(Class<C> childClass,
Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == Session.class)
@@ -236,27 +214,13 @@ public final class ConnectionAdapter ext
@Override
public void sessionAdded(final AMQSessionModel<?, ?> session)
{
- synchronized (_sessionAdapters)
- {
- if(!_sessionAdapters.containsKey(session))
- {
- SessionAdapter adapter = new SessionAdapter(this, session);
- _sessionAdapters.put(session, adapter);
- childAdded(adapter);
- }
- }
+ SessionAdapter adapter = new SessionAdapter(this, session);
+ childAdded(adapter);
}
@Override
public void sessionRemoved(final AMQSessionModel<?, ?> session)
{
- synchronized (_sessionAdapters)
- {
- SessionAdapter adapter = _sessionAdapters.remove(session);
- if(adapter != null)
- {
- childRemoved(adapter);
- }
- }
+ // SessionAdapter installs delete task to cause session model object
to delete
}
}
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1598658&r1=1598657&r2=1598658&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
Fri May 30 16:24:58 2014
@@ -36,6 +36,7 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.util.Action;
final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter>
implements Session<SessionAdapter>
{
@@ -67,6 +68,16 @@ final class SessionAdapter extends Abstr
}
});
session.setModelObject(this);
+ session.addDeleteTask(new Action()
+ {
+ @Override
+ public void performAction(final Object object)
+ {
+ session.removeDeleteTask(this);
+ deleted();
+ }
+ });
+
open();
}
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1598658&r1=1598657&r2=1598658&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Fri May 30 16:24:58 2014
@@ -123,9 +123,6 @@ public abstract class AbstractVirtualHos
private final EventLogger _eventLogger;
- private final Map<AMQConnectionModel, ConnectionAdapter>
_connectionAdapters =
- new HashMap<AMQConnectionModel, ConnectionAdapter>();
-
private final List<VirtualHostAlias> _aliases = new
ArrayList<VirtualHostAlias>();
private final AtomicBoolean _deleted = new AtomicBoolean();
private final VirtualHostNode<?> _virtualHostNode;
@@ -362,26 +359,10 @@ public abstract class AbstractVirtualHos
public Collection<Connection> getConnections()
{
- synchronized(_connectionAdapters)
- {
- return new ArrayList<Connection>(_connectionAdapters.values());
- }
+ return getChildren(Connection.class);
}
- /**
- * Retrieve the ConnectionAdapter instance keyed by the AMQConnectionModel
from this VirtualHost.
- * @param connection the AMQConnectionModel used to index the
ConnectionAdapter.
- * @return the requested ConnectionAdapter.
- */
- ConnectionAdapter getConnectionAdapter(AMQConnectionModel connection)
- {
- synchronized (_connectionAdapters)
- {
- return _connectionAdapters.get(connection);
- }
- }
-
@Override
public State getState()
{
@@ -395,11 +376,7 @@ public abstract class AbstractVirtualHos
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C>
clazz)
{
- if(clazz == Connection.class)
- {
- return (Collection<C>) getConnections();
- }
- else if(clazz == VirtualHostAlias.class)
+ if(clazz == VirtualHostAlias.class)
{
return (Collection<C>) getAliases();
}
@@ -815,41 +792,15 @@ public abstract class AbstractVirtualHos
{
connection.block();
}
- ConnectionAdapter adapter = null;
- synchronized (_connectionAdapters)
- {
- if(!_connectionAdapters.containsKey(connection))
- {
- adapter = new ConnectionAdapter(connection);
- _connectionAdapters.put(connection, adapter);
- }
-
- }
- if(adapter != null)
- {
- childAdded(adapter);
- }
+ Connection c = new ConnectionAdapter(connection);
+ childAdded(c);
}
public void connectionUnregistered(final AMQConnectionModel connection)
{
- ConnectionAdapter adapter;
- synchronized (_connectionAdapters)
- {
- adapter = _connectionAdapters.remove(connection);
-
- }
-
- if(adapter != null)
- {
- // Call getSessions() first to ensure that any SessionAdapter
children are cleanly removed and any
- // corresponding ConfigurationChangeListener childRemoved()
callback is called for child SessionAdapters.
- adapter.getSessions();
-
- childRemoved(adapter);
- }
+ // ConnectionAdapter installs delete task to cause connection model
object to delete
}
public void event(final Event event)
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1598658&r1=1598657&r2=1598658&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Fri May 30 16:24:58 2014
@@ -299,7 +299,6 @@ public class ServerConnectionDelegate ex
stopAllSubscriptions(conn, dtc);
Session ssn = conn.getSession(dtc.getChannel());
((ServerSession)ssn).setClose(true);
- ((ServerSession)ssn).getModelObject().delete();
super.sessionDetach(conn, dtc);
}
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1598658&r1=1598657&r2=1598658&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Fri May 30 16:24:58 2014
@@ -739,10 +739,6 @@ public class AMQChannel<T extends AMQPro
_transaction.rollback();
- if(_modelObject != null)
- {
- _modelObject.delete();
- }
try
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]