Author: rgodfrey
Date: Tue Jun 16 13:43:15 2015
New Revision: 1685835
URL: http://svn.apache.org/r1685835
Log:
QPID-6594 : [Java Broker] Eliminate ConnecitonRegistry - work by Rob Godfrey &
Lorenz Quack
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
Tue Jun 16 13:43:15 2015
@@ -30,7 +30,6 @@ import java.util.concurrent.ScheduledFut
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageDestination;
@@ -307,12 +306,6 @@ public class BDBHAReplicaVirtualHostImpl
}
@Override
- public IConnectionRegistry getConnectionRegistry()
- {
- return null;
- }
-
- @Override
public AMQQueue<?> getQueue(final String name)
{
return null;
@@ -550,6 +543,18 @@ public class BDBHAReplicaVirtualHostImpl
return _principal;
}
+ @Override
+ public void registerConnection(final Connection<?> connection)
+ {
+ throwUnsupportedForReplica();
+ }
+
+ @Override
+ public void deregisterConnection(final Connection<?> connection)
+ {
+ throwUnsupportedForReplica();
+ }
+
private void throwUnsupportedForReplica()
{
throw new IllegalStateException("The virtual host state of " +
getState()
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
Tue Jun 16 13:43:15 2015
@@ -22,6 +22,8 @@ package org.apache.qpid.server.model;
import java.util.Collection;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+
@ManagedObject( creatable = false )
public interface Connection<X extends Connection<X>> extends
ConfiguredObject<X>
{
@@ -104,5 +106,5 @@ public interface Connection<X extends Co
Collection<Session> getSessions();
-
+ AMQConnectionModel<?,?> getUnderlyingConnection();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
Tue Jun 16 13:43:15 2015
@@ -27,11 +27,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
@ManagedObject( defaultType = "ProvidedStore", description =
VirtualHost.CLASS_DESCRIPTION)
public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends
Queue<?>, E extends Exchange<?> > extends ConfiguredObject<X>
@@ -150,7 +148,6 @@ public interface VirtualHost<X extends V
//children
Collection<VirtualHostAlias> getAliases();
- Collection<Connection> getConnections();
Collection<Q> getQueues();
Collection<E> getExchanges();
@@ -162,6 +159,9 @@ public interface VirtualHost<X extends V
Collection<String> getExchangeTypeNames();
+ Collection<Connection> getConnections();
+
+
void start();
void stop();
@@ -170,6 +170,9 @@ public interface VirtualHost<X extends V
Principal getPrincipal();
+ void registerConnection(Connection<?> connection);
+ void deregisterConnection(Connection<?> connection);
+
public static interface Transaction
{
void dequeue(MessageInstance entry);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
Tue Jun 16 13:43:15 2015
@@ -42,6 +42,7 @@ import org.apache.qpid.server.model.Sess
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.SessionModelListener;
@@ -78,6 +79,11 @@ public final class ConnectionAdapter ext
setState(State.ACTIVE);
}
+ public void virtualHostAssociated()
+ {
+ _underlyingConnection.getVirtualHost().registerConnection(this);
+ }
+
private static Map<String, Object> createAttributes(final
AMQConnectionModel _connection)
{
Map<String,Object> attributes = new HashMap<String, Object>();
@@ -154,6 +160,12 @@ public final class ConnectionAdapter ext
return _underlyingConnection.getPort();
}
+ public VirtualHost<?,?,?> getVirtualHost()
+ {
+ return _underlyingConnection.getVirtualHost();
+ }
+
+
public Collection<Session> getSessions()
{
return getChildren(Session.class);
@@ -292,4 +304,10 @@ public final class ConnectionAdapter ext
{
// SessionAdapter installs delete task to cause session model object
to delete
}
+
+ @Override
+ public AMQConnectionModel<?,?> getUnderlyingConnection()
+ {
+ return _underlyingConnection;
+ }
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Tue Jun 16 13:43:15 2015
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.virtualhost;
+import static java.util.Collections.newSetFromMap;
+
import java.io.File;
import java.security.Principal;
import java.security.PrivilegedAction;
@@ -28,12 +30,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -52,10 +56,9 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import
org.apache.qpid.pool.SuppressingInheritedAccessControlContextThreadFactory;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.connection.ConnectionRegistry;
-import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.DefaultDestination;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
@@ -69,7 +72,6 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.Connection;
-import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
@@ -101,10 +103,13 @@ import org.apache.qpid.server.util.Conne
import org.apache.qpid.server.util.MapValueConverter;
public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>>
extends AbstractConfiguredObject<X>
- implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>,
IConnectionRegistry.RegistryChangeListener, EventListener
+ implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>,
EventListener
{
private final Collection<ConnectionValidator> _connectionValidators = new
ArrayList<>();
+
+ private final Set<Connection<?>> _connections = newSetFromMap(new
ConcurrentHashMap<Connection<?>, Boolean>());
+
private static enum BlockingType { STORE, FILESYSTEM };
private static final String USE_ASYNC_RECOVERY =
"use_async_message_store_recovery";
@@ -122,8 +127,6 @@ public abstract class AbstractVirtualHos
private final Broker<?> _broker;
- private final ConnectionRegistry _connectionRegistry;
-
private final DtxRegistry _dtxRegistry;
private final SystemNodeRegistry _systemNodeRegistry = new
SystemNodeRegistry();
@@ -210,9 +213,6 @@ public abstract class AbstractVirtualHos
_eventLogger.message(VirtualHostMessages.CREATED(getName()));
- _connectionRegistry = new ConnectionRegistry();
- _connectionRegistry.addRegistryChangeListener(this);
-
_defaultDestination = new DefaultDestination(this);
_messagesDelivered = new StatisticsCounter("messages-delivered-" +
getName());
@@ -464,11 +464,6 @@ public abstract class AbstractVirtualHos
return _messageStoreLogSubject;
}
- public IConnectionRegistry getConnectionRegistry()
- {
- return _connectionRegistry;
- }
-
public Collection<Connection> getConnections()
{
return getChildren(Connection.class);
@@ -871,7 +866,7 @@ public abstract class AbstractVirtualHos
protected void onClose()
{
//Stop Connections
- _connectionRegistry.close();
+ closeConnections("VirtualHost is closing");
_dtxRegistry.close();
closeMessageStore();
shutdownHouseKeeping();
@@ -885,6 +880,40 @@ public abstract class AbstractVirtualHos
stopLogging(_virtualHostLoggersToClose);
}
+
+ public void closeConnections(final String replyText)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Closing connection registry :" +
_connections.size() + " connections.");
+ }
+ for(Connection conn : _connections)
+ {
+ conn.getUnderlyingConnection().stop();
+ }
+
+ while (!_connections.isEmpty())
+ {
+ Iterator<Connection<?>> itr = _connections.iterator();
+ while(itr.hasNext())
+ {
+ Connection<?> connection = itr.next();
+ try
+ {
+
connection.getUnderlyingConnection().closeAsync(AMQConstant.CONNECTION_FORCED,
replyText);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception closing connection " +
connection.getName() + " from " + connection.getRemoteAddress(), e);
+ }
+ finally
+ {
+ itr.remove();
+ }
+ }
+ }
+ }
+
private void closeMessageStore()
{
if (getMessageStore() != null)
@@ -952,9 +981,9 @@ public abstract class AbstractVirtualHos
_messagesReceived.reset();
_dataReceived.reset();
- for (AMQConnectionModel connection :
_connectionRegistry.getConnections())
+ for (Connection<?> connection : _connections)
{
- connection.resetStatistics();
+ connection.getUnderlyingConnection().resetStatistics();
}
}
@@ -976,14 +1005,14 @@ public abstract class AbstractVirtualHos
private void block(BlockingType blockingType)
{
- synchronized (_connectionRegistry)
+ synchronized (_connections)
{
_blockingReasons.add(blockingType);
if(!_blocked.compareAndSet(false,true))
{
- for(AMQConnectionModel conn :
_connectionRegistry.getConnections())
+ for(Connection<?> conn : _connections)
{
- conn.block();
+ conn.getUnderlyingConnection().block();
}
}
}
@@ -993,38 +1022,19 @@ public abstract class AbstractVirtualHos
private void unblock(BlockingType blockingType)
{
- synchronized (_connectionRegistry)
+ synchronized (_connections)
{
_blockingReasons.remove(blockingType);
if(_blockingReasons.isEmpty() &&
_blocked.compareAndSet(true,false))
{
- for(AMQConnectionModel conn :
_connectionRegistry.getConnections())
+ for(Connection<?> conn : _connections)
{
- conn.unblock();
+ conn.getUnderlyingConnection().unblock();
}
}
}
}
- public void connectionRegistered(final AMQConnectionModel connection)
- {
- if(_blocked.get())
- {
- connection.block();
- }
-
- ConnectionAdapter c = new ConnectionAdapter(connection);
- c.create();
- childAdded(c);
- connection.setScheduler(_networkConnectionScheduler);
-
- }
-
- public void connectionUnregistered(final AMQConnectionModel connection)
- {
- // ConnectionAdapter installs delete task to cause connection model
object to delete
- }
-
public void event(final Event event)
{
switch(event)
@@ -1117,13 +1127,13 @@ public abstract class AbstractVirtualHos
}
}
}
- for (AMQConnectionModel<?,?> connection :
getConnectionRegistry().getConnections())
+ for (Connection<?> connection : _connections)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Checking for long running open transactions
on connection " + connection);
}
- for (AMQSessionModel<?,?> session :
connection.getSessionModels())
+ for (AMQSessionModel<?,?> session :
connection.getUnderlyingConnection().getSessionModels())
{
if (_logger.isDebugEnabled())
{
@@ -1351,7 +1361,7 @@ public abstract class AbstractVirtualHos
@Override
public long getConnectionCount()
{
- return getConnectionRegistry().getConnections().size();
+ return _connections.size();
}
@Override
@@ -1665,6 +1675,30 @@ public abstract class AbstractVirtualHos
return _principal;
}
+ @Override
+ public void registerConnection(final Connection<?> connection)
+ {
+ childAdded(connection);
+
+ _connections.add(connection);
+
+ AMQConnectionModel<?,?> underlyingConnection =
connection.getUnderlyingConnection();
+ if(_blocked.get())
+ {
+ underlyingConnection.block();
+ }
+
+ underlyingConnection.setScheduler(_networkConnectionScheduler);
+
+ }
+
+ @Override
+ public void deregisterConnection(final Connection<?> connection)
+ {
+ _connections.remove(connection);
+ }
+
+
private long calculateTotalEnqueuedSize(final Collection<AMQQueue<?>>
queues)
{
long total = 0;
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
Tue Jun 16 13:43:15 2015
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
-import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
@@ -40,8 +39,6 @@ import org.apache.qpid.server.security.S
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.transport.NetworkConnectionScheduler;
-import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.txn.DtxRegistry;
public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends
AMQQueue<?>, E extends ExchangeImpl<?> >
@@ -51,8 +48,6 @@ public interface VirtualHostImpl< X exte
{
String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
- IConnectionRegistry getConnectionRegistry();
-
String getName();
Q getQueue(String name);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
Tue Jun 16 13:43:15 2015
@@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledFut
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageDestination;
@@ -308,12 +307,6 @@ class RedirectingVirtualHostImpl
}
@Override
- public IConnectionRegistry getConnectionRegistry()
- {
- return null;
- }
-
- @Override
public AMQQueue<?> getQueue(final String name)
{
return null;
@@ -552,6 +545,18 @@ class RedirectingVirtualHostImpl
return _principal;
}
+ @Override
+ public void registerConnection(final Connection<?> connection)
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public void deregisterConnection(final Connection<?> connection)
+ {
+ throwUnsupportedForRedirector();
+ }
+
private void throwUnsupportedForRedirector()
{
throw new IllegalStateException("The virtual host state of " +
getState()
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
Tue Jun 16 13:43:15 2015
@@ -52,7 +52,7 @@ import org.apache.qpid.protocol.AMQConst
import
org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import
org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener;
+import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -230,10 +230,11 @@ public class VirtualHostTest extends Qpi
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
AMQConnectionModel connection =
createMockProtocolConnection(virtualHost);
-
assertEquals("Unexpected number of connections before connection
registered", 0, virtualHost.getChildren(Connection.class).size());
- ((RegistryChangeListener)virtualHost).connectionRegistered(connection);
+ ConnectionAdapter modelConnection = new ConnectionAdapter(connection);
+ modelConnection.create();
+ virtualHost.registerConnection(modelConnection);
assertEquals("Unexpected number of connections after connection
registered", 1, virtualHost.getChildren(
Connection.class).size());
@@ -256,10 +257,13 @@ public class VirtualHostTest extends Qpi
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
AMQConnectionModel connection =
createMockProtocolConnection(virtualHost);
+ assertEquals("Unexpected number of connections before connection
registered",
+ 0,
+ virtualHost.getChildren(Connection.class).size());
- assertEquals("Unexpected number of connections before connection
registered", 0, virtualHost.getChildren(Connection.class).size());
-
- ((RegistryChangeListener)virtualHost).connectionRegistered(connection);
+ ConnectionAdapter modelConnection = new ConnectionAdapter(connection);
+ modelConnection.create();
+ virtualHost.registerConnection(modelConnection);
assertEquals("Unexpected number of connections after connection
registered", 1, virtualHost.getChildren(Connection.class).size());
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
Tue Jun 16 13:43:15 2015
@@ -44,6 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -109,6 +110,7 @@ public class ServerConnection extends Co
private ProtocolEngine_0_10 _protocolEngine;
private boolean _ignoreFutureInput;
private boolean _ignoreAllButConnectionCloseOk;
+ private ConnectionAdapter _adapter;
public ServerConnection(final long connectionId,
Broker<?> broker,
@@ -173,14 +175,17 @@ public class ServerConnection extends Co
true,
true));
- getVirtualHost().getConnectionRegistry().registerConnection(this);
+ _adapter = new ConnectionAdapter(this);
+ _adapter.create();
+ _adapter.virtualHostAssociated();
+
}
if (state == State.CLOSE_RCVD || state == State.CLOSED || state ==
State.CLOSING)
{
- if(_virtualHost != null)
+ if(_adapter != null)
{
-
_virtualHost.getConnectionRegistry().deregisterConnection(this);
+ _virtualHost.deregisterConnection(_adapter);
}
}
if(state == State.CLOSING)
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Tue Jun 16 13:43:15 2015
@@ -367,11 +367,12 @@ public class ServerConnectionDelegate ex
final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
final String userId = authorizedPrincipal == null ? "" :
authorizedPrincipal.getName();
- final Iterator<AMQConnectionModel> connections =
-
((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
+ final Iterator<org.apache.qpid.server.model.Connection> connections =
+
((ServerConnection)conn).getVirtualHost().getConnections().iterator();
while(connections.hasNext())
{
- final AMQConnectionModel amqConnectionModel = connections.next();
+ final org.apache.qpid.server.model.Connection<?> modelConnnection
= connections.next();
+ final AMQConnectionModel amqConnectionModel =
modelConnnection.getUnderlyingConnection();
if (amqConnectionModel instanceof ServerConnection)
{
ServerConnection otherConnection =
(ServerConnection)amqConnectionModel;
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
Tue Jun 16 13:43:15 2015
@@ -60,6 +60,7 @@ import org.apache.qpid.common.ServerProp
import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -105,6 +106,7 @@ public class AMQProtocolEngine implement
private final AggregateTicker _aggregateTicker;
+ private ConnectionAdapter _adapter;
enum ConnectionState
{
@@ -849,7 +851,7 @@ public class AMQProtocolEngine implement
{
if (_virtualHost != null)
{
-
_virtualHost.getConnectionRegistry().deregisterConnection(this);
+ _virtualHost.deregisterConnection(_adapter);
}
closeAllChannels();
}
@@ -1056,9 +1058,9 @@ public class AMQProtocolEngine implement
public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost)
{
_virtualHost = virtualHost;
-
- _virtualHost.getConnectionRegistry().registerConnection(this);
-
+ _adapter = new ConnectionAdapter(this);
+ _adapter.create();
+ _adapter.virtualHostAssociated();
_messageCompressionThreshold =
virtualHost.getContextValue(Integer.class,
Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
Tue Jun 16 13:43:15 2015
@@ -51,6 +51,7 @@ import org.apache.qpid.server.connection
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -110,7 +111,7 @@ public class Connection_1_0 implements C
private boolean _closedOnOpen;
-
+ private ConnectionAdapter _adapter;
public Connection_1_0(Broker<?> broker,
@@ -171,7 +172,10 @@ public class Connection_1_0 implements C
}
else
{
- _vhost.getConnectionRegistry().registerConnection(this);
+ _adapter = new ConnectionAdapter(this);
+ _adapter.create();
+ _adapter.virtualHostAssociated();
+
}
}
}
@@ -262,7 +266,7 @@ public class Connection_1_0 implements C
if(_vhost != null)
{
- _vhost.getConnectionRegistry().deregisterConnection(this);
+ _vhost.deregisterConnection(_adapter);
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1685835&r1=1685834&r2=1685835&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
Tue Jun 16 13:43:15 2015
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -40,8 +39,8 @@ import java.util.Map;
import java.util.UUID;
import javax.security.auth.Subject;
-import javax.security.sasl.Sasl;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
@@ -54,25 +53,19 @@ import org.apache.qpid.amqp_1_0.type.Sym
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.amqp_1_0.type.security.SaslInit;
import org.apache.qpid.amqp_1_0.type.transport.Open;
-import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
-import org.apache.qpid.server.connection.ConnectionRegistry;
-import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory;
import
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
-import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -90,8 +83,8 @@ public class ProtocolEngine_1_0_0Test ex
private AuthenticationProvider _authenticationProvider;
private List<ByteBuffer> _sentBuffers;
private FrameWriter _frameWriter;
- private IConnectionRegistry _connectionRegistry;
- private Connection_1_0 _connection;
+ private Connection _connection;
+ private VirtualHostImpl _virtualHost;
@Override
public void setUp() throws Exception
@@ -109,22 +102,21 @@ public class ProtocolEngine_1_0_0Test ex
_subjectCreator = mock(SubjectCreator.class);
_authenticationProvider = mock(AuthenticationProvider.class);
when(_port.getAuthenticationProvider()).thenReturn(_authenticationProvider);
- VirtualHostImpl virtualHost = mock(VirtualHostImpl.class);
-
- _connectionRegistry = mock(IConnectionRegistry.class);
- final ArgumentCaptor<AMQConnectionModel> connectionCaptor =
ArgumentCaptor.forClass(AMQConnectionModel.class);
+ _virtualHost = mock(VirtualHostImpl.class);
+ when(_virtualHost.getChildExecutor()).thenReturn(taskExecutor);
+ when(_virtualHost.getModel()).thenReturn(BrokerModel.getInstance());
+ final ArgumentCaptor<Connection> connectionCaptor =
ArgumentCaptor.forClass(Connection.class);
doAnswer(new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws
Throwable
{
- _connection = (Connection_1_0) connectionCaptor.getValue();
+ _connection = connectionCaptor.getValue();
return null;
}
-
}).when(_connectionRegistry).registerConnection(connectionCaptor.capture());
-
when(virtualHost.getConnectionRegistry()).thenReturn(_connectionRegistry);
-
when(virtualHost.getPrincipal()).thenReturn(mock(VirtualHostPrincipal.class));
- when(_port.getVirtualHost(anyString())).thenReturn(virtualHost);
+ }).when(_virtualHost).registerConnection(connectionCaptor.capture());
+
when(_virtualHost.getPrincipal()).thenReturn(mock(VirtualHostPrincipal.class));
+ when(_port.getVirtualHost(anyString())).thenReturn(_virtualHost);
when(_authenticationProvider.getSubjectCreator(anyBoolean())).thenReturn(_subjectCreator);
final ArgumentCaptor<Principal> userCaptor =
ArgumentCaptor.forClass(Principal.class);
@@ -179,8 +171,8 @@ public class ProtocolEngine_1_0_0Test ex
buf.flip();
_protocolEngine_1_0_0.received(buf);
-
verify(_connectionRegistry).registerConnection(any(AMQConnectionModel.class));
- AuthenticatedPrincipal principal = (AuthenticatedPrincipal)
_connection.getAuthorizedPrincipal();
+ verify(_virtualHost).registerConnection(any(Connection.class));
+ AuthenticatedPrincipal principal = (AuthenticatedPrincipal)
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
assertNotNull(principal);
assertEquals(principal, new
AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
}
@@ -202,7 +194,7 @@ public class ProtocolEngine_1_0_0Test ex
buf.flip();
_protocolEngine_1_0_0.received(buf);
- verify(_connectionRegistry,
never()).registerConnection(any(AMQConnectionModel.class));
+ verify(_virtualHost,
never()).registerConnection(any(Connection.class));
verify(_networkConnection).close();
}
@@ -232,8 +224,8 @@ public class ProtocolEngine_1_0_0Test ex
buf.flip();
_protocolEngine_1_0_0.received(buf);
-
verify(_connectionRegistry).registerConnection(any(AMQConnectionModel.class));
- AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal)
_connection.getAuthorizedPrincipal();
+ verify(_virtualHost).registerConnection(any(Connection.class));
+ AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal)
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
assertNotNull(authPrincipal);
assertEquals(authPrincipal, new AuthenticatedPrincipal(principal));
}
@@ -274,8 +266,8 @@ public class ProtocolEngine_1_0_0Test ex
buf.flip();
_protocolEngine_1_0_0.received(buf);
-
verify(_connectionRegistry).registerConnection(any(AMQConnectionModel.class));
- AuthenticatedPrincipal principal = (AuthenticatedPrincipal)
_connection.getAuthorizedPrincipal();
+ verify(_virtualHost).registerConnection(any(Connection.class));
+ AuthenticatedPrincipal principal = (AuthenticatedPrincipal)
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
assertNotNull(principal);
assertEquals(principal, new
AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]