Author: rgodfrey
Date: Tue Jun 16 13:43:15 2015
New Revision: 1685835

URL: http://svn.apache.org/r1685835
Log:
QPID-6594 : [Java Broker] Eliminate ConnecitonRegistry - work by Rob Godfrey & 
Lorenz Quack

Removed:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
Modified:
    
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java

Modified: 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
 Tue Jun 16 13:43:15 2015
@@ -30,7 +30,6 @@ import java.util.concurrent.ScheduledFut
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.MessageDestination;
@@ -307,12 +306,6 @@ public class BDBHAReplicaVirtualHostImpl
     }
 
     @Override
-    public IConnectionRegistry getConnectionRegistry()
-    {
-        return null;
-    }
-
-    @Override
     public AMQQueue<?> getQueue(final String name)
     {
         return null;
@@ -550,6 +543,18 @@ public class BDBHAReplicaVirtualHostImpl
         return _principal;
     }
 
+    @Override
+    public void registerConnection(final Connection<?> connection)
+    {
+        throwUnsupportedForReplica();
+    }
+
+    @Override
+    public void deregisterConnection(final Connection<?> connection)
+    {
+        throwUnsupportedForReplica();
+    }
+
     private void throwUnsupportedForReplica()
     {
         throw new IllegalStateException("The virtual host state of " + 
getState()

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
 Tue Jun 16 13:43:15 2015
@@ -22,6 +22,8 @@ package org.apache.qpid.server.model;
 
 import java.util.Collection;
 
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+
 @ManagedObject( creatable = false )
 public interface Connection<X extends Connection<X>> extends 
ConfiguredObject<X>
 {
@@ -104,5 +106,5 @@ public interface Connection<X extends Co
     Collection<Session> getSessions();
 
 
-
+    AMQConnectionModel<?,?> getUnderlyingConnection();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
 Tue Jun 16 13:43:15 2015
@@ -27,11 +27,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
 
 @ManagedObject( defaultType = "ProvidedStore", description = 
VirtualHost.CLASS_DESCRIPTION)
 public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends 
Queue<?>, E extends Exchange<?> > extends ConfiguredObject<X>
@@ -150,7 +148,6 @@ public interface VirtualHost<X extends V
 
     //children
     Collection<VirtualHostAlias> getAliases();
-    Collection<Connection> getConnections();
     Collection<Q> getQueues();
     Collection<E> getExchanges();
 
@@ -162,6 +159,9 @@ public interface VirtualHost<X extends V
 
     Collection<String> getExchangeTypeNames();
 
+    Collection<Connection> getConnections();
+
+
     void start();
 
     void stop();
@@ -170,6 +170,9 @@ public interface VirtualHost<X extends V
 
     Principal getPrincipal();
 
+    void registerConnection(Connection<?> connection);
+    void deregisterConnection(Connection<?> connection);
+
     public static interface Transaction
     {
         void dequeue(MessageInstance entry);

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
 Tue Jun 16 13:43:15 2015
@@ -42,6 +42,7 @@ import org.apache.qpid.server.model.Sess
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.StateTransition;
 import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.SessionModelListener;
@@ -78,6 +79,11 @@ public final class ConnectionAdapter ext
         setState(State.ACTIVE);
     }
 
+    public void virtualHostAssociated()
+    {
+        _underlyingConnection.getVirtualHost().registerConnection(this);
+    }
+
     private static Map<String, Object> createAttributes(final 
AMQConnectionModel _connection)
     {
         Map<String,Object> attributes = new HashMap<String, Object>();
@@ -154,6 +160,12 @@ public final class ConnectionAdapter ext
         return _underlyingConnection.getPort();
     }
 
+    public VirtualHost<?,?,?> getVirtualHost()
+    {
+        return _underlyingConnection.getVirtualHost();
+    }
+
+
     public Collection<Session> getSessions()
     {
         return getChildren(Session.class);
@@ -292,4 +304,10 @@ public final class ConnectionAdapter ext
     {
         // SessionAdapter installs delete task to cause session model object 
to delete
     }
+
+    @Override
+    public AMQConnectionModel<?,?> getUnderlyingConnection()
+    {
+        return _underlyingConnection;
+    }
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 Tue Jun 16 13:43:15 2015
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import static java.util.Collections.newSetFromMap;
+
 import java.io.File;
 import java.security.Principal;
 import java.security.PrivilegedAction;
@@ -28,12 +30,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -52,10 +56,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import 
org.apache.qpid.pool.SuppressingInheritedAccessControlContextThreadFactory;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.connection.ConnectionRegistry;
-import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.DefaultDestination;
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.logging.EventLogger;
@@ -69,7 +72,6 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.*;
 import org.apache.qpid.server.model.Connection;
-import org.apache.qpid.server.model.adapter.ConnectionAdapter;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.ConnectionValidator;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
@@ -101,10 +103,13 @@ import org.apache.qpid.server.util.Conne
 import org.apache.qpid.server.util.MapValueConverter;
 
 public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> 
extends AbstractConfiguredObject<X>
-        implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, 
IConnectionRegistry.RegistryChangeListener, EventListener
+        implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, 
EventListener
 {
     private final Collection<ConnectionValidator> _connectionValidators = new 
ArrayList<>();
 
+
+    private final Set<Connection<?>> _connections = newSetFromMap(new 
ConcurrentHashMap<Connection<?>, Boolean>());
+
     private static enum BlockingType { STORE, FILESYSTEM };
 
     private static final String USE_ASYNC_RECOVERY = 
"use_async_message_store_recovery";
@@ -122,8 +127,6 @@ public abstract class AbstractVirtualHos
 
     private final Broker<?> _broker;
 
-    private final ConnectionRegistry _connectionRegistry;
-
     private final DtxRegistry _dtxRegistry;
 
     private final SystemNodeRegistry _systemNodeRegistry = new 
SystemNodeRegistry();
@@ -210,9 +213,6 @@ public abstract class AbstractVirtualHos
 
         _eventLogger.message(VirtualHostMessages.CREATED(getName()));
 
-        _connectionRegistry = new ConnectionRegistry();
-        _connectionRegistry.addRegistryChangeListener(this);
-
         _defaultDestination = new DefaultDestination(this);
 
         _messagesDelivered = new StatisticsCounter("messages-delivered-" + 
getName());
@@ -464,11 +464,6 @@ public abstract class AbstractVirtualHos
         return _messageStoreLogSubject;
     }
 
-    public IConnectionRegistry getConnectionRegistry()
-    {
-        return _connectionRegistry;
-    }
-
     public Collection<Connection> getConnections()
     {
         return getChildren(Connection.class);
@@ -871,7 +866,7 @@ public abstract class AbstractVirtualHos
     protected void onClose()
     {
         //Stop Connections
-        _connectionRegistry.close();
+        closeConnections("VirtualHost is closing");
         _dtxRegistry.close();
         closeMessageStore();
         shutdownHouseKeeping();
@@ -885,6 +880,40 @@ public abstract class AbstractVirtualHos
         stopLogging(_virtualHostLoggersToClose);
     }
 
+
+    public void closeConnections(final String replyText)
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Closing connection registry :" + 
_connections.size() + " connections.");
+        }
+        for(Connection conn : _connections)
+        {
+            conn.getUnderlyingConnection().stop();
+        }
+
+        while (!_connections.isEmpty())
+        {
+            Iterator<Connection<?>> itr = _connections.iterator();
+            while(itr.hasNext())
+            {
+                Connection<?> connection = itr.next();
+                try
+                {
+                    
connection.getUnderlyingConnection().closeAsync(AMQConstant.CONNECTION_FORCED, 
replyText);
+                }
+                catch (Exception e)
+                {
+                    _logger.warn("Exception closing connection " + 
connection.getName() + " from " + connection.getRemoteAddress(), e);
+                }
+                finally
+                {
+                    itr.remove();
+                }
+            }
+        }
+    }
+
     private void closeMessageStore()
     {
         if (getMessageStore() != null)
@@ -952,9 +981,9 @@ public abstract class AbstractVirtualHos
         _messagesReceived.reset();
         _dataReceived.reset();
 
-        for (AMQConnectionModel connection : 
_connectionRegistry.getConnections())
+        for (Connection<?> connection : _connections)
         {
-            connection.resetStatistics();
+            connection.getUnderlyingConnection().resetStatistics();
         }
     }
 
@@ -976,14 +1005,14 @@ public abstract class AbstractVirtualHos
 
     private void block(BlockingType blockingType)
     {
-        synchronized (_connectionRegistry)
+        synchronized (_connections)
         {
             _blockingReasons.add(blockingType);
             if(!_blocked.compareAndSet(false,true))
             {
-                for(AMQConnectionModel conn : 
_connectionRegistry.getConnections())
+                for(Connection<?> conn : _connections)
                 {
-                    conn.block();
+                    conn.getUnderlyingConnection().block();
                 }
             }
         }
@@ -993,38 +1022,19 @@ public abstract class AbstractVirtualHos
     private void unblock(BlockingType blockingType)
     {
 
-        synchronized (_connectionRegistry)
+        synchronized (_connections)
         {
             _blockingReasons.remove(blockingType);
             if(_blockingReasons.isEmpty() && 
_blocked.compareAndSet(true,false))
             {
-                for(AMQConnectionModel conn : 
_connectionRegistry.getConnections())
+                for(Connection<?> conn : _connections)
                 {
-                    conn.unblock();
+                    conn.getUnderlyingConnection().unblock();
                 }
             }
         }
     }
 
-    public void connectionRegistered(final AMQConnectionModel connection)
-    {
-        if(_blocked.get())
-        {
-            connection.block();
-        }
-
-        ConnectionAdapter c = new ConnectionAdapter(connection);
-        c.create();
-        childAdded(c);
-        connection.setScheduler(_networkConnectionScheduler);
-
-    }
-
-    public void connectionUnregistered(final AMQConnectionModel connection)
-    {
-        // ConnectionAdapter installs delete task to cause connection model 
object to delete
-    }
-
     public void event(final Event event)
     {
         switch(event)
@@ -1117,13 +1127,13 @@ public abstract class AbstractVirtualHos
                     }
                 }
             }
-            for (AMQConnectionModel<?,?> connection : 
getConnectionRegistry().getConnections())
+            for (Connection<?> connection : _connections)
             {
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Checking for long running open transactions 
on connection " + connection);
                 }
-                for (AMQSessionModel<?,?> session : 
connection.getSessionModels())
+                for (AMQSessionModel<?,?> session : 
connection.getUnderlyingConnection().getSessionModels())
                 {
                     if (_logger.isDebugEnabled())
                     {
@@ -1351,7 +1361,7 @@ public abstract class AbstractVirtualHos
     @Override
     public long getConnectionCount()
     {
-        return getConnectionRegistry().getConnections().size();
+        return _connections.size();
     }
 
     @Override
@@ -1665,6 +1675,30 @@ public abstract class AbstractVirtualHos
         return _principal;
     }
 
+    @Override
+    public void registerConnection(final Connection<?> connection)
+    {
+        childAdded(connection);
+
+        _connections.add(connection);
+
+        AMQConnectionModel<?,?> underlyingConnection = 
connection.getUnderlyingConnection();
+        if(_blocked.get())
+        {
+            underlyingConnection.block();
+        }
+
+        underlyingConnection.setScheduler(_networkConnectionScheduler);
+
+    }
+
+    @Override
+    public void deregisterConnection(final Connection<?> connection)
+    {
+        _connections.remove(connection);
+    }
+
+
     private long calculateTotalEnqueuedSize(final Collection<AMQQueue<?>> 
queues)
     {
         long total = 0;

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
 Tue Jun 16 13:43:15 2015
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
 
-import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.EventLoggerProvider;
@@ -40,8 +39,6 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.transport.NetworkConnectionScheduler;
-import org.apache.qpid.server.transport.NonBlockingConnection;
 import org.apache.qpid.server.txn.DtxRegistry;
 
 public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends 
AMQQueue<?>, E extends ExchangeImpl<?> >
@@ -51,8 +48,6 @@ public interface VirtualHostImpl< X exte
 {
     String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
 
-    IConnectionRegistry getConnectionRegistry();
-
     String getName();
 
     Q getQueue(String name);

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
 Tue Jun 16 13:43:15 2015
@@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledFut
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.MessageDestination;
@@ -308,12 +307,6 @@ class RedirectingVirtualHostImpl
     }
 
     @Override
-    public IConnectionRegistry getConnectionRegistry()
-    {
-        return null;
-    }
-
-    @Override
     public AMQQueue<?> getQueue(final String name)
     {
         return null;
@@ -552,6 +545,18 @@ class RedirectingVirtualHostImpl
         return _principal;
     }
 
+    @Override
+    public void registerConnection(final Connection<?> connection)
+    {
+        throwUnsupportedForRedirector();
+    }
+
+    @Override
+    public void deregisterConnection(final Connection<?> connection)
+    {
+        throwUnsupportedForRedirector();
+    }
+
     private void throwUnsupportedForRedirector()
     {
         throw new IllegalStateException("The virtual host state of " + 
getState()

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
 Tue Jun 16 13:43:15 2015
@@ -52,7 +52,7 @@ import org.apache.qpid.protocol.AMQConst
 import 
org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import 
org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener;
+import org.apache.qpid.server.model.adapter.ConnectionAdapter;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -230,10 +230,11 @@ public class VirtualHostTest extends Qpi
         assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
 
         AMQConnectionModel connection = 
createMockProtocolConnection(virtualHost);
-
         assertEquals("Unexpected number of connections before connection 
registered", 0, virtualHost.getChildren(Connection.class).size());
 
-        ((RegistryChangeListener)virtualHost).connectionRegistered(connection);
+        ConnectionAdapter modelConnection = new ConnectionAdapter(connection);
+        modelConnection.create();
+        virtualHost.registerConnection(modelConnection);
 
         assertEquals("Unexpected number of connections after connection 
registered", 1, virtualHost.getChildren(
                 Connection.class).size());
@@ -256,10 +257,13 @@ public class VirtualHostTest extends Qpi
         assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
 
         AMQConnectionModel connection = 
createMockProtocolConnection(virtualHost);
+        assertEquals("Unexpected number of connections before connection 
registered",
+                     0,
+                     virtualHost.getChildren(Connection.class).size());
 
-        assertEquals("Unexpected number of connections before connection 
registered", 0, virtualHost.getChildren(Connection.class).size());
-
-        ((RegistryChangeListener)virtualHost).connectionRegistered(connection);
+        ConnectionAdapter modelConnection = new ConnectionAdapter(connection);
+        modelConnection.create();
+        virtualHost.registerConnection(modelConnection);
 
         assertEquals("Unexpected number of connections after connection 
registered", 1, virtualHost.getChildren(Connection.class).size());
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 Tue Jun 16 13:43:15 2015
@@ -44,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.adapter.ConnectionAdapter;
 import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -109,6 +110,7 @@ public class ServerConnection extends Co
     private ProtocolEngine_0_10 _protocolEngine;
     private boolean _ignoreFutureInput;
     private boolean _ignoreAllButConnectionCloseOk;
+    private ConnectionAdapter _adapter;
 
     public ServerConnection(final long connectionId,
                             Broker<?> broker,
@@ -173,14 +175,17 @@ public class ServerConnection extends Co
                                                              true,
                                                              true));
 
-            getVirtualHost().getConnectionRegistry().registerConnection(this);
+            _adapter = new ConnectionAdapter(this);
+            _adapter.create();
+            _adapter.virtualHostAssociated();
+
         }
 
         if (state == State.CLOSE_RCVD || state == State.CLOSED || state == 
State.CLOSING)
         {
-            if(_virtualHost != null)
+            if(_adapter != null)
             {
-                
_virtualHost.getConnectionRegistry().deregisterConnection(this);
+                _virtualHost.deregisterConnection(_adapter);
             }
         }
         if(state == State.CLOSING)

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 Tue Jun 16 13:43:15 2015
@@ -367,11 +367,12 @@ public class ServerConnectionDelegate ex
         final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
         final String userId = authorizedPrincipal == null ? "" : 
authorizedPrincipal.getName();
 
-        final Iterator<AMQConnectionModel> connections =
-                        
((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
+        final Iterator<org.apache.qpid.server.model.Connection> connections =
+                        
((ServerConnection)conn).getVirtualHost().getConnections().iterator();
         while(connections.hasNext())
         {
-            final AMQConnectionModel amqConnectionModel = connections.next();
+            final org.apache.qpid.server.model.Connection<?> modelConnnection 
= connections.next();
+            final AMQConnectionModel amqConnectionModel = 
modelConnnection.getUnderlyingConnection();
             if (amqConnectionModel instanceof ServerConnection)
             {
                 ServerConnection otherConnection = 
(ServerConnection)amqConnectionModel;

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 Tue Jun 16 13:43:15 2015
@@ -60,6 +60,7 @@ import org.apache.qpid.common.ServerProp
 import org.apache.qpid.framing.*;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.adapter.ConnectionAdapter;
 import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.configuration.BrokerProperties;
@@ -105,6 +106,7 @@ public class AMQProtocolEngine implement
 
 
     private final AggregateTicker _aggregateTicker;
+    private ConnectionAdapter _adapter;
 
     enum ConnectionState
     {
@@ -849,7 +851,7 @@ public class AMQProtocolEngine implement
             {
                 if (_virtualHost != null)
                 {
-                    
_virtualHost.getConnectionRegistry().deregisterConnection(this);
+                    _virtualHost.deregisterConnection(_adapter);
                 }
                 closeAllChannels();
             }
@@ -1056,9 +1058,9 @@ public class AMQProtocolEngine implement
     public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost)
     {
         _virtualHost = virtualHost;
-
-        _virtualHost.getConnectionRegistry().registerConnection(this);
-
+        _adapter = new ConnectionAdapter(this);
+        _adapter.create();
+        _adapter.virtualHostAssociated();
 
         _messageCompressionThreshold = 
virtualHost.getContextValue(Integer.class,
                                                                    
Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 Tue Jun 16 13:43:15 2015
@@ -51,6 +51,7 @@ import org.apache.qpid.server.connection
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.adapter.ConnectionAdapter;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -110,7 +111,7 @@ public class Connection_1_0 implements C
 
 
     private boolean _closedOnOpen;
-
+    private ConnectionAdapter _adapter;
 
 
     public Connection_1_0(Broker<?> broker,
@@ -171,7 +172,10 @@ public class Connection_1_0 implements C
             }
             else
             {
-                _vhost.getConnectionRegistry().registerConnection(this);
+                _adapter = new ConnectionAdapter(this);
+                _adapter.create();
+                _adapter.virtualHostAssociated();
+
             }
         }
     }
@@ -262,7 +266,7 @@ public class Connection_1_0 implements C
 
         if(_vhost != null)
         {
-            _vhost.getConnectionRegistry().deregisterConnection(this);
+            _vhost.deregisterConnection(_adapter);
         }
 
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
 Tue Jun 16 13:43:15 2015
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -40,8 +39,8 @@ import java.util.Map;
 import java.util.UUID;
 
 import javax.security.auth.Subject;
-import javax.security.sasl.Sasl;
 
+import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
@@ -54,25 +53,19 @@ import org.apache.qpid.amqp_1_0.type.Sym
 import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.amqp_1_0.type.security.SaslInit;
 import org.apache.qpid.amqp_1_0.type.transport.Open;
-import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
-import org.apache.qpid.server.connection.ConnectionRegistry;
-import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.model.AuthenticationProvider;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import 
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
 import 
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory;
 import 
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
-import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.transport.NonBlockingConnection;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -90,8 +83,8 @@ public class ProtocolEngine_1_0_0Test ex
     private AuthenticationProvider _authenticationProvider;
     private List<ByteBuffer> _sentBuffers;
     private FrameWriter _frameWriter;
-    private IConnectionRegistry _connectionRegistry;
-    private Connection_1_0 _connection;
+    private Connection _connection;
+    private VirtualHostImpl _virtualHost;
 
     @Override
     public void setUp() throws Exception
@@ -109,22 +102,21 @@ public class ProtocolEngine_1_0_0Test ex
         _subjectCreator = mock(SubjectCreator.class);
         _authenticationProvider = mock(AuthenticationProvider.class);
         
when(_port.getAuthenticationProvider()).thenReturn(_authenticationProvider);
-        VirtualHostImpl virtualHost = mock(VirtualHostImpl.class);
-
-        _connectionRegistry = mock(IConnectionRegistry.class);
-        final ArgumentCaptor<AMQConnectionModel> connectionCaptor = 
ArgumentCaptor.forClass(AMQConnectionModel.class);
+        _virtualHost = mock(VirtualHostImpl.class);
+        when(_virtualHost.getChildExecutor()).thenReturn(taskExecutor);
+        when(_virtualHost.getModel()).thenReturn(BrokerModel.getInstance());
+        final ArgumentCaptor<Connection> connectionCaptor = 
ArgumentCaptor.forClass(Connection.class);
         doAnswer(new Answer()
         {
             @Override
             public Object answer(final InvocationOnMock invocation) throws 
Throwable
             {
-                _connection = (Connection_1_0) connectionCaptor.getValue();
+                _connection = connectionCaptor.getValue();
                 return null;
             }
-        
}).when(_connectionRegistry).registerConnection(connectionCaptor.capture());
-        
when(virtualHost.getConnectionRegistry()).thenReturn(_connectionRegistry);
-        
when(virtualHost.getPrincipal()).thenReturn(mock(VirtualHostPrincipal.class));
-        when(_port.getVirtualHost(anyString())).thenReturn(virtualHost);
+        }).when(_virtualHost).registerConnection(connectionCaptor.capture());
+        
when(_virtualHost.getPrincipal()).thenReturn(mock(VirtualHostPrincipal.class));
+        when(_port.getVirtualHost(anyString())).thenReturn(_virtualHost);
         
when(_authenticationProvider.getSubjectCreator(anyBoolean())).thenReturn(_subjectCreator);
 
         final ArgumentCaptor<Principal> userCaptor = 
ArgumentCaptor.forClass(Principal.class);
@@ -179,8 +171,8 @@ public class ProtocolEngine_1_0_0Test ex
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
 
-        
verify(_connectionRegistry).registerConnection(any(AMQConnectionModel.class));
-        AuthenticatedPrincipal principal = (AuthenticatedPrincipal) 
_connection.getAuthorizedPrincipal();
+        verify(_virtualHost).registerConnection(any(Connection.class));
+        AuthenticatedPrincipal principal = (AuthenticatedPrincipal) 
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
         assertNotNull(principal);
         assertEquals(principal, new 
AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
     }
@@ -202,7 +194,7 @@ public class ProtocolEngine_1_0_0Test ex
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
 
-        verify(_connectionRegistry, 
never()).registerConnection(any(AMQConnectionModel.class));
+        verify(_virtualHost, 
never()).registerConnection(any(Connection.class));
         verify(_networkConnection).close();
     }
 
@@ -232,8 +224,8 @@ public class ProtocolEngine_1_0_0Test ex
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
 
-        
verify(_connectionRegistry).registerConnection(any(AMQConnectionModel.class));
-        AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal) 
_connection.getAuthorizedPrincipal();
+        verify(_virtualHost).registerConnection(any(Connection.class));
+        AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal) 
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
         assertNotNull(authPrincipal);
         assertEquals(authPrincipal, new AuthenticatedPrincipal(principal));
     }
@@ -274,8 +266,8 @@ public class ProtocolEngine_1_0_0Test ex
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
 
-        
verify(_connectionRegistry).registerConnection(any(AMQConnectionModel.class));
-        AuthenticatedPrincipal principal = (AuthenticatedPrincipal) 
_connection.getAuthorizedPrincipal();
+        verify(_virtualHost).registerConnection(any(Connection.class));
+        AuthenticatedPrincipal principal = (AuthenticatedPrincipal) 
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
         assertNotNull(principal);
         assertEquals(principal, new 
AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to