Author: kwall
Date: Fri May 30 16:24:58 2014
New Revision: 1598658

URL: http://svn.apache.org/r1598658
Log:
QPID-5795: [Java Broker] Prevent ConnectionAdapter leak when closing a 
messaging connection

The leak was due to the fact that nothing was telling the virtualhost to 
unregister the connection child (#unregisterChild)
when the connection was closed.

* Made ConnectionAdapter responsible for causing its own deletion (when the 
underlying connection is closed).  The call to #deleted() causes the child
 to be unregistered from its parent (preventing the leak)
* Removed the now unnecessary _connectionAdapters map from the VH.  This 
needlessly duplicated information already held more generally by the ACO.
* Refactored SessionAdapter in sympathy with CA changes.  SessionsAdapters 
where _not_ being leaked as the session implementation were already telling the 
model to delete.

Modified:
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1598658&r1=1598657&r2=1598658&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
 Fri May 30 16:24:58 2014
@@ -21,9 +21,7 @@
 package org.apache.qpid.server.model.adapter;
 
 import java.security.Principal;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -40,22 +38,31 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.SessionModelListener;
+import org.apache.qpid.server.util.Action;
 
 public final class ConnectionAdapter extends 
AbstractConfiguredObject<ConnectionAdapter> implements 
Connection<ConnectionAdapter>,
                                                                                
              SessionModelListener
 {
     private AMQConnectionModel _connection;
 
-    private final Map<AMQSessionModel, SessionAdapter> _sessionAdapters =
-            new HashMap<AMQSessionModel, SessionAdapter>();
-
     private State _state = State.ACTIVE;
 
     public ConnectionAdapter(final AMQConnectionModel conn)
     {
         super(parentsMap(conn.getVirtualHost()),createAttributes(conn));
         _connection = conn;
+
+        conn.addDeleteTask(new Action()
+        {
+            @Override
+            public void performAction(final Object object)
+            {
+                conn.removeDeleteTask(this);
+                deleted();
+            }
+        });
         open();
+
         conn.addSessionListener(this);
     }
 
@@ -137,23 +144,7 @@ public final class ConnectionAdapter ext
 
     public Collection<Session> getSessions()
     {
-        synchronized (_sessionAdapters)
-        {
-            return new ArrayList<Session>(_sessionAdapters.values());
-        }
-    }
-
-    /**
-     * Retrieve the SessionAdapter instance keyed by the AMQSessionModel from 
this Connection.
-     * @param session the AMQSessionModel used to index the SessionAdapter.
-     * @return the requested SessionAdapter.
-     */
-    SessionAdapter getSessionAdapter(AMQSessionModel session)
-    {
-        synchronized (_sessionAdapters)
-        {
-            return _sessionAdapters.get(session);
-        }
+        return getChildren(Session.class);
     }
 
     @StateTransition( currentState = State.ACTIVE, desiredState = 
State.DELETED)
@@ -171,19 +162,6 @@ public final class ConnectionAdapter ext
 
 
     @Override
-    public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> 
clazz)
-    {
-        if(clazz == Session.class)
-        {
-            return (Collection<C>) getSessions();
-        }
-        else
-        {
-            return Collections.emptySet();
-        }
-    }
-
-    @Override
     public <C extends ConfiguredObject> C addChild(Class<C> childClass, 
Map<String, Object> attributes, ConfiguredObject... otherParents)
     {
         if(childClass == Session.class)
@@ -236,27 +214,13 @@ public final class ConnectionAdapter ext
     @Override
     public void sessionAdded(final AMQSessionModel<?, ?> session)
     {
-        synchronized (_sessionAdapters)
-        {
-            if(!_sessionAdapters.containsKey(session))
-            {
-                SessionAdapter adapter = new SessionAdapter(this, session);
-                _sessionAdapters.put(session, adapter);
-                childAdded(adapter);
-            }
-        }
+        SessionAdapter adapter = new SessionAdapter(this, session);
+        childAdded(adapter);
     }
 
     @Override
     public void sessionRemoved(final AMQSessionModel<?, ?> session)
     {
-        synchronized (_sessionAdapters)
-        {
-            SessionAdapter adapter = _sessionAdapters.remove(session);
-            if(adapter != null)
-            {
-                childRemoved(adapter);
-            }
-        }
+        // SessionAdapter installs delete task to cause session model object 
to delete
     }
 }

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1598658&r1=1598657&r2=1598658&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
 Fri May 30 16:24:58 2014
@@ -36,6 +36,7 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.StateTransition;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.util.Action;
 
 final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> 
implements Session<SessionAdapter>
 {
@@ -67,6 +68,16 @@ final class SessionAdapter extends Abstr
             }
         });
         session.setModelObject(this);
+        session.addDeleteTask(new Action()
+        {
+            @Override
+            public void performAction(final Object object)
+            {
+                session.removeDeleteTask(this);
+                deleted();
+            }
+        });
+
         open();
     }
 

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1598658&r1=1598657&r2=1598658&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 Fri May 30 16:24:58 2014
@@ -123,9 +123,6 @@ public abstract class AbstractVirtualHos
 
     private final EventLogger _eventLogger;
 
-    private final Map<AMQConnectionModel, ConnectionAdapter> 
_connectionAdapters =
-            new HashMap<AMQConnectionModel, ConnectionAdapter>();
-
     private final List<VirtualHostAlias> _aliases = new 
ArrayList<VirtualHostAlias>();
     private final AtomicBoolean _deleted = new AtomicBoolean();
     private final VirtualHostNode<?> _virtualHostNode;
@@ -362,26 +359,10 @@ public abstract class AbstractVirtualHos
 
     public Collection<Connection> getConnections()
     {
-        synchronized(_connectionAdapters)
-        {
-            return new ArrayList<Connection>(_connectionAdapters.values());
-        }
+        return getChildren(Connection.class);
 
     }
 
-    /**
-     * Retrieve the ConnectionAdapter instance keyed by the AMQConnectionModel 
from this VirtualHost.
-     * @param connection the AMQConnectionModel used to index the 
ConnectionAdapter.
-     * @return the requested ConnectionAdapter.
-     */
-    ConnectionAdapter getConnectionAdapter(AMQConnectionModel connection)
-    {
-        synchronized (_connectionAdapters)
-        {
-            return _connectionAdapters.get(connection);
-        }
-    }
-
     @Override
     public State getState()
     {
@@ -395,11 +376,7 @@ public abstract class AbstractVirtualHos
     @Override
     public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> 
clazz)
     {
-        if(clazz == Connection.class)
-        {
-            return (Collection<C>) getConnections();
-        }
-        else if(clazz == VirtualHostAlias.class)
+        if(clazz == VirtualHostAlias.class)
         {
             return (Collection<C>) getAliases();
         }
@@ -815,41 +792,15 @@ public abstract class AbstractVirtualHos
         {
             connection.block();
         }
-        ConnectionAdapter adapter = null;
-        synchronized (_connectionAdapters)
-        {
-            if(!_connectionAdapters.containsKey(connection))
-            {
-                adapter = new ConnectionAdapter(connection);
-                _connectionAdapters.put(connection, adapter);
 
-            }
-
-        }
-        if(adapter != null)
-        {
-            childAdded(adapter);
-        }
+        Connection c = new ConnectionAdapter(connection);
+        childAdded(c);
 
     }
 
     public void connectionUnregistered(final AMQConnectionModel connection)
     {
-        ConnectionAdapter adapter;
-        synchronized (_connectionAdapters)
-        {
-            adapter = _connectionAdapters.remove(connection);
-
-        }
-
-        if(adapter != null)
-        {
-            // Call getSessions() first to ensure that any SessionAdapter 
children are cleanly removed and any
-            // corresponding ConfigurationChangeListener childRemoved() 
callback is called for child SessionAdapters.
-            adapter.getSessions();
-
-            childRemoved(adapter);
-        }
+        // ConnectionAdapter installs delete task to cause connection model 
object to delete
     }
 
     public void event(final Event event)

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1598658&r1=1598657&r2=1598658&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 Fri May 30 16:24:58 2014
@@ -299,7 +299,6 @@ public class ServerConnectionDelegate ex
         stopAllSubscriptions(conn, dtc);
         Session ssn = conn.getSession(dtc.getChannel());
         ((ServerSession)ssn).setClose(true);
-        ((ServerSession)ssn).getModelObject().delete();
         super.sessionDetach(conn, dtc);
     }
 

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1598658&r1=1598657&r2=1598658&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 Fri May 30 16:24:58 2014
@@ -739,10 +739,6 @@ public class AMQChannel<T extends AMQPro
 
 
         _transaction.rollback();
-        if(_modelObject != null)
-        {
-            _modelObject.delete();
-        }
 
         try
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to