Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Sat Jun 27 21:13:25 2015 @@ -48,13 +48,14 @@ import org.apache.qpid.server.model.Stat import org.apache.qpid.server.model.VirtualHostAlias; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.transport.AbstractAMQPConnection; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.virtualhost.*; @@ -502,7 +503,7 @@ class RedirectingVirtualHostImpl } @Override - public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection) + public boolean authoriseCreateConnection(final AMQPConnection<?> connection) { return false; } @@ -549,13 +550,13 @@ class RedirectingVirtualHostImpl } @Override - public void registerConnection(final Connection<?> connection) + public void registerConnection(final AMQPConnection<?> connection) { throwUnsupportedForRedirector(); } @Override - public void deregisterConnection(final Connection<?> connection) + public void deregisterConnection(final AMQPConnection<?> connection) { throwUnsupportedForRedirector(); }
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java Sat Jun 27 21:13:25 2015 @@ -40,14 +40,14 @@ import org.apache.qpid.server.logging.Lo import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.test.utils.QpidTestCase; public class ConnectionVersionValidatorTest extends QpidTestCase { private VirtualHost _virtualHostMock; - private AMQConnectionModel _connectionMock; + private AMQPConnection _connectionMock; private EventLogger _eventLoggerMock; private ConnectionVersionValidator _connectionValidator; @@ -57,7 +57,7 @@ public class ConnectionVersionValidatorT _connectionValidator = new ConnectionVersionValidator(); _virtualHostMock = mock(VirtualHost.class); - _connectionMock = mock(AMQConnectionModel.class); + _connectionMock = mock(AMQPConnection.class); _eventLoggerMock = mock(EventLogger.class); Broker brokerMock = mock(Broker.class); Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Sat Jun 27 21:13:25 2015 @@ -24,15 +24,21 @@ package org.apache.qpid.server.consumer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.lang.reflect.Type; import java.net.SocketAddress; +import java.security.AccessControlException; import java.security.Principal; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -43,20 +49,23 @@ import org.apache.qpid.server.logging.Lo import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.ConsumerListener; -import org.apache.qpid.server.transport.ProtocolEngine; -import org.apache.qpid.server.protocol.SessionModelListener; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.transport.AbstractAMQPConnection; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.transport.NetworkConnectionScheduler; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; @@ -314,7 +323,7 @@ public class MockConsumer implements Con } - private static class MockSessionModel implements AMQSessionModel + private static class MockSessionModel implements AMQSessionModel<MockSessionModel> { private final UUID _id = UUID.randomUUID(); private Session _modelObject; @@ -338,9 +347,9 @@ public class MockConsumer implements Con } @Override - public AMQConnectionModel getConnectionModel() + public AMQPConnection<?> getAMQPConnection() { - return new MockConnectionModel(); + return null; } @Override @@ -499,11 +508,6 @@ public class MockConsumer implements Con } - @Override - public int compareTo(final Object o) - { - return 0; - } @Override public void transportStateChanged() @@ -528,9 +532,15 @@ public class MockConsumer implements Con { } + + @Override + public int compareTo(final AMQSessionModel o) + { + return 0; + } } - private static class MockConnectionModel implements AMQConnectionModel + private static class MockConnectionModel implements AMQPConnection<MockConnectionModel> { @Override @@ -544,81 +554,112 @@ public class MockConsumer implements Con } @Override - public StatisticsCounter getMessageDeliveryStatistics() + public void closeAsync(AMQConstant cause, String message) + { + } + + @Override + public void closeSessionAsync(AMQSessionModel<?> session, AMQConstant cause, + String message) + { + } + + @Override + public long getConnectionId() + { + return 0; + } + + @Override + public void block() { - return null; } @Override - public StatisticsCounter getMessageReceiptStatistics() + public void unblock() + { + + } + + @Override + public String getRemoteAddressString() + { + return "remoteAddress:1234"; + } + + public SocketAddress getRemoteSocketAddress() { return null; } @Override - public StatisticsCounter getDataDeliveryStatistics() + public String getClientId() { return null; } @Override - public StatisticsCounter getDataReceiptStatistics() + public String getRemoteContainerName() { return null; } @Override - public void resetStatistics() + public void notifyWork() { } @Override - public void closeAsync(AMQConstant cause, String message) + public boolean isMessageAssignmentSuspended() { + return false; } @Override - public void closeSessionAsync(AMQSessionModel session, AMQConstant cause, - String message) + public boolean hasSessionWithName(final byte[] name) { + return false; } @Override - public long getConnectionId() + public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler) { - return 0; + } @Override - public List<AMQSessionModel> getSessionModels() + public String getClientVersion() { return null; } @Override - public void block() + public boolean isIncoming() { + return false; } @Override - public void unblock() + public String getLocalAddress() { + return null; } @Override - public LogSubject getLogSubject() + public String getPrincipal() { return null; } @Override - public String getRemoteAddressString() + public String getRemoteAddress() { - return "remoteAddress:1234"; + return null; } - public SocketAddress getRemoteAddress() + @Override + public String getRemoteProcessName() { return null; } @@ -630,130 +671,395 @@ public class MockConsumer implements Con } @Override - public String getClientId() + public long getSessionCountLimit() + { + return 0; + } + + @Override + public Principal getAuthorizedPrincipal() { return null; } @Override - public String getRemoteContainerName() + public AmqpPort<?> getPort() { return null; } @Override - public void addSessionListener(final SessionModelListener listener) + public long getBytesIn() + { + return 0; + } + + @Override + public long getBytesOut() { + return 0; + } + @Override + public long getMessagesIn() + { + return 0; } @Override - public void removeSessionListener(final SessionModelListener listener) + public long getMessagesOut() { + return 0; + } + @Override + public long getLastIoTime() + { + return 0; } @Override - public void notifyWork() + public int getSessionCount() { + return 0; + } + @Override + public Collection<Session> getSessions() + { + return null; } @Override - public boolean isMessageAssignmentSuspended() + public AbstractAMQPConnection<?> getUnderlyingConnection() + { + return null; + } + + @Override + public Transport getTransport() + { + return null; + } + + @Override + public boolean isConnectionStopped() { return false; } @Override - public ProtocolEngine getProtocolEngine() + public String getVirtualHostName() { return null; } @Override - public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler) + public VirtualHost<?, ?, ?> getVirtualHost() + { + return null; + } + + @Override + public void addDeleteTask(final Action task) { } @Override - public String getClientVersion() + public void removeDeleteTask(final Action task) + { + + } + + + @Override + public UUID getId() { return null; } @Override - public String getClientProduct() + public String getName() { return null; } @Override - public Principal getAuthorizedPrincipal() + public String getDescription() { return null; } @Override - public long getSessionCountLimit() + public String getType() + { + return null; + } + + @Override + public Map<String, String> getContext() + { + return null; + } + + @Override + public <T> T getContextValue(final Class<T> clazz, final String propertyName) + { + return null; + } + + @Override + public <T> T getContextValue(final Class<T> clazz, final Type t, final String propertyName) + { + return null; + } + + @Override + public Set<String> getContextKeys(final boolean excludeSystem) + { + return null; + } + + @Override + public String getLastUpdatedBy() + { + return null; + } + + @Override + public long getLastUpdatedTime() { return 0; } @Override - public long getLastIoTime() + public String getCreatedBy() + { + return null; + } + + @Override + public long getCreatedTime() { return 0; } @Override - public AmqpPort<?> getPort() + public org.apache.qpid.server.model.State getDesiredState() { return null; } @Override - public Transport getTransport() + public org.apache.qpid.server.model.State getState() { return null; } @Override - public void stop() + public void addChangeListener(final ConfigurationChangeListener listener) { + } @Override - public boolean isStopped() + public boolean removeChangeListener(final ConfigurationChangeListener listener) { return false; } @Override - public String getVirtualHostName() + public <T extends ConfiguredObject> T getParent(final Class<T> clazz) { return null; } @Override - public VirtualHost<?, ?, ?> getVirtualHost() + public boolean isDurable() + { + return false; + } + + @Override + public LifetimePolicy getLifetimePolicy() { return null; } @Override - public void addDeleteTask(final Action task) + public Collection<String> getAttributeNames() { + return null; + } + @Override + public Object getAttribute(final String name) + { + return null; } @Override - public void removeDeleteTask(final Action task) + public Map<String, Object> getActualAttributes() + { + return null; + } + + @Override + public Object setAttribute(final String name, final Object expected, final Object desired) + throws IllegalStateException, AccessControlException, IllegalArgumentException + { + return null; + } + + @Override + public Map<String, Number> getStatistics() + { + return null; + } + + @Override + public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz) + { + return null; + } + + @Override + public <C extends ConfiguredObject> C getChildById(final Class<C> clazz, final UUID id) + { + return null; + } + + @Override + public <C extends ConfiguredObject> C getChildByName(final Class<C> clazz, final String name) + { + return null; + } + + @Override + public <C extends ConfiguredObject> C createChild(final Class<C> childClass, + final Map<String, Object> attributes, + final ConfiguredObject... otherParents) + { + return null; + } + + @Override + public <C extends ConfiguredObject> ListenableFuture<C> createChildAsync(final Class<C> childClass, + final Map<String, Object> attributes, + final ConfiguredObject... otherParents) + { + return null; + } + + @Override + public void setAttributes(final Map<String, Object> attributes) + throws IllegalStateException, AccessControlException, IllegalArgumentException { } + @Override + public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) + throws IllegalStateException, AccessControlException, IllegalArgumentException + { + return null; + } + + @Override + public Class<? extends ConfiguredObject> getCategoryClass() + { + return null; + } + @Override + public Class<? extends ConfiguredObject> getTypeClass() + { + return null; + } + + @Override + public boolean managesChildStorage() + { + return false; + } + + @Override + public <C extends ConfiguredObject<C>> C findConfiguredObject(final Class<C> clazz, final String name) + { + return null; + } + + @Override + public ConfiguredObjectRecord asObjectRecord() + { + return null; + } + + @Override + public void open() + { + + } + + @Override + public ListenableFuture<Void> openAsync() + { + return null; + } + + @Override + public void close() + { + + } + + @Override + public ListenableFuture<Void> closeAsync() + { + return null; + } + + @Override + public ListenableFuture<Void> deleteAsync() + { + return null; + } + + @Override + public TaskExecutor getTaskExecutor() + { + return null; + } + + @Override + public TaskExecutor getChildExecutor() + { + return null; + } + + @Override + public ConfiguredObjectFactory getObjectFactory() + { + return null; + } + + @Override + public Model getModel() + { + return null; + } + + @Override + public void delete() + { + + } + + @Override + public void decryptSecrets() + { + + } } } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java Sat Jun 27 21:13:25 2015 @@ -27,8 +27,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.Arrays; - import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java Sat Jun 27 21:13:25 2015 @@ -21,13 +21,13 @@ package org.apache.qpid.server.logging.actors; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public abstract class BaseConnectionActorTestCase extends BaseActorTestCase { - private AMQConnectionModel _session; + private AMQPConnection<?> _connection; private VirtualHostImpl _virtualHost; @Override @@ -35,7 +35,7 @@ public abstract class BaseConnectionActo { super.setUp(); BrokerTestHelper.setUp(); - _session = BrokerTestHelper.createConnection(); + _connection = BrokerTestHelper.createConnection(); _virtualHost = BrokerTestHelper.createVirtualHost("test"); } @@ -53,9 +53,9 @@ public abstract class BaseConnectionActo { _virtualHost.close(); } - if (_session != null) + if (_connection != null) { - _session.closeAsync(AMQConstant.CONNECTION_FORCED, ""); + _connection.closeAsync(AMQConstant.CONNECTION_FORCED, ""); } } finally @@ -65,9 +65,9 @@ public abstract class BaseConnectionActo } } - public AMQConnectionModel getConnection() + public AMQPConnection<?> getConnection() { - return _session; + return _connection; } } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java Sat Jun 27 21:13:25 2015 @@ -38,7 +38,7 @@ public class ChannelLogSubjectTest exten super.setUp(); AMQSessionModel session = mock(AMQSessionModel.class); - when(session.getConnectionModel()).thenReturn(getConnection()); + when(session.getAMQPConnection()).thenReturn(getConnection()); when(session.getChannelId()).thenReturn(_channelID); _subject = new ChannelLogSubject(session); } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java Sat Jun 27 21:13:25 2015 @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.transport.AMQPConnection; import java.security.Principal; @@ -38,7 +38,7 @@ public class ConnectionLogSubjectTest ex private static final String IP_STRING = "127.0.0.1:1"; private static final String VHOST = "test"; - private AMQConnectionModel _connection; + private AMQPConnection _connection; @Override public void setUp() throws Exception @@ -48,7 +48,7 @@ public class ConnectionLogSubjectTest ex final Principal principal = mock(Principal.class); when(principal.getName()).thenReturn(USER); - _connection = mock(AMQConnectionModel.class); + _connection = mock(AMQPConnection.class); when(_connection.getConnectionId()).thenReturn(CONNECTION_ID); when(_connection.getAuthorizedPrincipal()).thenReturn(principal); when(_connection.getRemoteAddressString()).thenReturn("/"+IP_STRING); @@ -66,7 +66,7 @@ public class ConnectionLogSubjectTest ex verifyConnection(CONNECTION_ID, USER, IP_STRING, VHOST, message); } - public AMQConnectionModel getConnection() + public AMQPConnection getConnection() { return _connection; } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Sat Jun 27 21:13:25 2015 @@ -53,12 +53,12 @@ import org.apache.qpid.protocol.AMQConst import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.model.adapter.ConnectionAdapter; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.transport.AbstractAMQPConnection; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost; @@ -230,10 +230,10 @@ public class VirtualHostTest extends Qpi VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName); assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); - AMQConnectionModel connection = createMockProtocolConnection(virtualHost); + AbstractAMQPConnection connection = createMockProtocolConnection(virtualHost); assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getConnectionCount()); - Connection modelConnection = mock(Connection.class); + AMQPConnection modelConnection = mock(AMQPConnection.class); when(modelConnection.getUnderlyingConnection()).thenReturn(connection); when(modelConnection.closeAsync()).thenReturn(Futures.immediateFuture(null)); virtualHost.registerConnection(modelConnection); @@ -257,12 +257,12 @@ public class VirtualHostTest extends Qpi VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName); assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); - AMQConnectionModel connection = createMockProtocolConnection(virtualHost); + AbstractAMQPConnection connection = createMockProtocolConnection(virtualHost); assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getConnectionCount()); - Connection modelConnection = mock(Connection.class); + AMQPConnection modelConnection = mock(AMQPConnection.class); when(modelConnection.getUnderlyingConnection()).thenReturn(connection); virtualHost.registerConnection(modelConnection); @@ -397,9 +397,9 @@ public class VirtualHostTest extends Qpi return host; } - private AMQConnectionModel createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost) + private AbstractAMQPConnection createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost) { - final AMQConnectionModel connection = mock(AMQConnectionModel.class); + final AbstractAMQPConnection connection = mock(AbstractAMQPConnection.class); final List<Action<?>> tasks = new ArrayList<>(); final ArgumentCaptor<Action> deleteTaskCaptor = ArgumentCaptor.forClass(Action.class); Answer answer = new Answer() Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java Sat Jun 27 21:13:25 2015 @@ -59,13 +59,13 @@ import org.apache.qpid.server.model.Virt import org.apache.qpid.server.model.VirtualHostLogger; import org.apache.qpid.server.model.VirtualHostLoggerFilter; import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.QueueConsumer; import org.apache.qpid.server.security.access.ObjectProperties; import org.apache.qpid.server.security.access.ObjectProperties.Property; import org.apache.qpid.server.security.access.ObjectType; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.security.access.OperationLoggingDetails; +import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.test.utils.QpidTestCase; public class SecurityManagerTest extends QpidTestCase @@ -190,7 +190,7 @@ public class SecurityManagerTest extends public void testAuthoriseCreateConnection() { - AMQConnectionModel<?,?> connection = mock(AMQConnectionModel.class); + AMQPConnection<?> connection = mock(AMQPConnection.class); when(connection.getVirtualHostName()).thenReturn(TEST_VIRTUAL_HOST); ObjectProperties properties = new ObjectProperties(); Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java Sat Jun 27 21:13:25 2015 @@ -43,12 +43,12 @@ import org.apache.qpid.server.model.Syst import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.QueueExistsException; import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost; @@ -181,34 +181,34 @@ public class BrokerTestHelper return createVirtualHost(attributes, broker, defaultVHN); } - public static AMQSessionModel<?,?> createSession(int channelId, AMQConnectionModel<?,?> connection) + public static AMQSessionModel<?> createSession(int channelId, AMQPConnection<?> connection) { @SuppressWarnings("rawtypes") AMQSessionModel session = mock(AMQSessionModel.class); - when(session.getConnectionModel()).thenReturn(connection); + when(session.getAMQPConnection()).thenReturn(connection); when(session.getChannelId()).thenReturn(channelId); return session; } - public static AMQSessionModel<?,?> createSession(int channelId) throws Exception + public static AMQSessionModel<?> createSession(int channelId) throws Exception { - AMQConnectionModel<?,?> session = createConnection(); + AMQPConnection<?> session = createConnection(); return createSession(channelId, session); } - public static AMQSessionModel<?,?> createSession() throws Exception + public static AMQSessionModel<?> createSession() throws Exception { return createSession(1); } - public static AMQConnectionModel<?,?> createConnection() throws Exception + public static AMQPConnection<?> createConnection() throws Exception { return createConnection("test"); } - public static AMQConnectionModel<?,?> createConnection(String hostName) throws Exception + public static AMQPConnection<?> createConnection(String hostName) throws Exception { - return mock(AMQConnectionModel.class); + return mock(AMQPConnection.class); } public static ExchangeImpl<?> createExchange(String hostName, final boolean durable, final EventLogger eventLogger) throws Exception Modified: qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java (original) +++ qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java Sat Jun 27 21:13:25 2015 @@ -175,7 +175,7 @@ public class DefaultAccessControl implem Set<ConnectionPrincipal> principals = subject.getPrincipals(ConnectionPrincipal.class); if(!principals.isEmpty()) { - SocketAddress address = principals.iterator().next().getConnection().getRemoteAddress(); + SocketAddress address = principals.iterator().next().getConnection().getRemoteSocketAddress(); if(address instanceof InetSocketAddress) { addressOfClient = ((InetSocketAddress) address).getAddress(); Modified: qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java (original) +++ qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java Sat Jun 27 21:13:25 2015 @@ -35,7 +35,6 @@ import org.apache.qpid.server.connection import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.EventLoggerProvider; import org.apache.qpid.server.logging.UnitTestMessageLogger; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.Result; import org.apache.qpid.server.security.access.ObjectProperties; import org.apache.qpid.server.security.access.ObjectType; @@ -44,6 +43,7 @@ import org.apache.qpid.server.security.a import org.apache.qpid.server.security.access.config.Rule; import org.apache.qpid.server.security.access.config.RuleSet; import org.apache.qpid.server.security.auth.TestPrincipalUtils; +import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.test.utils.QpidTestCase; /** @@ -239,8 +239,8 @@ public class DefaultAccessControlTest ex final InetAddress inetAddress = InetAddress.getLocalHost(); final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1); - AMQConnectionModel connectionModel = mock(AMQConnectionModel.class); - when(connectionModel.getRemoteAddress()).thenReturn(inetSocketAddress); + AMQPConnection connectionModel = mock(AMQPConnection.class); + when(connectionModel.getRemoteSocketAddress()).thenReturn(inetSocketAddress); subject.getPrincipals().add(new ConnectionPrincipal(connectionModel)); @@ -269,8 +269,8 @@ public class DefaultAccessControlTest ex final InetAddress inetAddress = InetAddress.getLocalHost(); final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1); - AMQConnectionModel connectionModel = mock(AMQConnectionModel.class); - when(connectionModel.getRemoteAddress()).thenReturn(inetSocketAddress); + AMQPConnection connectionModel = mock(AMQPConnection.class); + when(connectionModel.getRemoteSocketAddress()).thenReturn(inetSocketAddress); subject.getPrincipals().add(new ConnectionPrincipal(connectionModel)); Added: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1687962&view=auto ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (added) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Sat Jun 27 21:13:25 2015 @@ -0,0 +1,371 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.security.PrivilegedAction; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.security.auth.Subject; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.transport.AbstractAMQPConnection; +import org.apache.qpid.server.transport.ProtocolEngine; +import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Constant; +import org.apache.qpid.transport.network.AggregateTicker; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.NetworkConnection; + + +public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10> +{ + private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class); + private final InputHandler _inputHandler; + + + private final NetworkConnection _network; + private ServerConnection _connection; + + private long _createTime = System.currentTimeMillis(); + private volatile long _lastReadTime = _createTime; + private volatile long _lastWriteTime = _createTime; + private volatile boolean _transportBlockedForWriting; + + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); + + private final AtomicBoolean _stateChanged = new AtomicBoolean(); + private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>(); + + + public AMQPConnection_0_10(final Broker<?> broker, + NetworkConnection network, + final AmqpPort<?> port, + final Transport transport, + final long id, + final AggregateTicker aggregateTicker) + { + super(broker, network, port, transport, id, aggregateTicker); + + _connection = new ServerConnection(id, broker, port, transport); + _connection.setAmqpConnection(this); + SocketAddress address = network.getLocalAddress(); + String fqdn = null; + + if (address instanceof InetSocketAddress) + { + fqdn = ((InetSocketAddress) address).getHostName(); + } + SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure()); + ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, fqdn, subjectCreator); + + _connection.setConnectionDelegate(connDelegate); + _connection.setRemoteAddress(network.getRemoteAddress()); + _connection.setLocalAddress(network.getLocalAddress()); + + _inputHandler = new InputHandler(new ServerAssembler(_connection)); + _network = network; + + Subject.doAs(getSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _connection.getEventLogger().message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); + + _connection.setNetworkConnection(_network); + ServerDisassembler disassembler = new ServerDisassembler(wrapSender(_network.getSender()), Constant.MIN_MAX_FRAME_SIZE); + _connection.setSender(disassembler); + _connection.addFrameSizeObserver(disassembler); + // FIXME Two log messages to maintain compatibility with earlier protocol versions + _connection.getEventLogger().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false)); + + return null; + } + }); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); + + for(AMQSessionModel<?> session : _connection.getSessionModels()) + { + for (Consumer<?> consumer : session.getConsumers()) + { + ConsumerImpl consumerImpl = (ConsumerImpl) consumer; + if (!messageAssignmentSuspended) + { + consumerImpl.getTarget().notifyCurrentState(); + } + else + { + // ensure that by the time the method returns, no consumer can be in the process of + // delivering a message. + consumerImpl.getSendLock(); + consumerImpl.releaseSendLock(); + } + } + } + } + + + private ByteBufferSender wrapSender(final ByteBufferSender sender) + { + return new ByteBufferSender() + { + @Override + public void send(ByteBuffer msg) + { + _lastWriteTime = System.currentTimeMillis(); + sender.send(msg); + } + + @Override + public void flush() + { + sender.flush(); + + } + + @Override + public void close() + { + sender.close(); + } + }; + } + + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime; + } + + public void received(final ByteBuffer buf) + { + Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _lastReadTime = System.currentTimeMillis(); + if (_connection.getAuthorizedPrincipal() == null && + (_lastReadTime - _createTime) > _connection.getPort().getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) + { + + _logger.warn("Connection has taken more than " + + _connection.getPort() + .getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) + + "ms to establish identity. Closing as possible DoS."); + _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + + } + _inputHandler.received(buf); + _connection.receivedComplete(); + return null; + } + }); + } + + @Override + public void encryptedTransport() + { + } + + public void writerIdle() + { + _connection.doHeartBeat(); + } + + public void readerIdle() + { + Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + return null; + } + }); + + } + + public String getAddress() + { + return _network.getRemoteAddress().toString(); + } + + @Override + public void closed() + { + _inputHandler.closed(); + } + + @Override + protected void performDeleteTasks() + { + super.performDeleteTasks(); + } + + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + _connection.transportStateChanged(); + } + + @Override + public void processPending() + { + _connection.processPending(); + } + + @Override + public boolean hasWork() + { + return _stateChanged.get(); + } + + @Override + public void notifyWork() + { + _stateChanged.set(true); + + final Action<ProtocolEngine> listener = _workListener.get(); + if(listener != null) + { + listener.performAction(this); + } + } + + public void clearWork() + { + _stateChanged.set(false); + } + + public void setWorkListener(final Action<ProtocolEngine> listener) + { + _workListener.set(listener); + } + + public boolean hasSessionWithName(final byte[] name) + { + return _connection.hasSessionWithName(name); + } + + public void closeAsync(final AMQConstant cause, final String message) + { + _connection.closeAsync(cause, message); + } + + public Principal getAuthorizedPrincipal() + { + return _connection.getAuthorizedPrincipal(); + } + + public void closeSessionAsync(final AMQSessionModel<?> session, + final AMQConstant cause, final String message) + { + _connection.closeSessionAsync((ServerSession)session, cause, message); + } + + public void block() + { + _connection.block(); + } + + public String getRemoteContainerName() + { + return _connection.getRemoteContainerName(); + } + + public VirtualHost<?, ?, ?> getVirtualHost() + { + return _connection.getVirtualHost(); + } + + public List<ServerSession> getSessionModels() + { + return _connection.getSessionModels(); + } + + public void unblock() + { + _connection.unblock(); + } + + public LogSubject getLogSubject() + { + return _connection.getLogSubject(); + } + + public long getSessionCountLimit() + { + return _connection.getSessionCountLimit(); + } + +} Propchange: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java ------------------------------------------------------------------------------ svn:executable = * Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Sat Jun 27 21:13:25 2015 @@ -107,7 +107,7 @@ public class ConsumerTarget_0_10 extends @Override public boolean doIsSuspended() { - return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension + return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getAMQPConnection().isConnectionStopped(); // TODO check for Session suspension } public boolean close() @@ -559,10 +559,10 @@ public class ConsumerTarget_0_10 extends switch(flowMode) { case CREDIT: - _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine()); + _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getAmqpConnection()); break; case WINDOW: - _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine()); + _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getAmqpConnection()); break; default: // this should never happen, as 0-10 is finalised and so the enum should never change Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Sat Jun 27 21:13:25 2015 @@ -20,9 +20,6 @@ */ package org.apache.qpid.server.protocol.v0_10; -import java.net.InetSocketAddress; -import java.net.SocketAddress; - import org.apache.qpid.server.transport.ProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; @@ -30,8 +27,6 @@ import org.apache.qpid.server.model.Tran import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.plugin.ProtocolEngineCreator; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.network.AggregateTicker; import org.apache.qpid.transport.network.NetworkConnection; @@ -72,25 +67,11 @@ public class ProtocolEngineCreator_0_10 Transport transport, long id, final AggregateTicker aggregateTicker) { - String fqdn = null; - SocketAddress address = network.getLocalAddress(); - if (address instanceof InetSocketAddress) - { - fqdn = ((InetSocketAddress) address).getHostName(); - } - SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure()); - ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, fqdn, subjectCreator); - - ServerConnection conn = new ServerConnection(id, broker, port, transport); - - conn.setConnectionDelegate(connDelegate); - conn.setRemoteAddress(network.getRemoteAddress()); - conn.setLocalAddress(network.getLocalAddress()); - - ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network, aggregateTicker); - conn.setProtocolEngine(protocolEngine); - return protocolEngine; + final AMQPConnection_0_10 protocolEngine_0_10 = + new AMQPConnection_0_10(broker, network, port, transport, id, aggregateTicker); + protocolEngine_0_10.create(); + return protocolEngine_0_10; } @Override Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Sat Jun 27 21:13:25 2015 @@ -34,7 +34,6 @@ import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -44,23 +43,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.model.adapter.ConnectionAdapter; import org.apache.qpid.server.protocol.ConnectionClosingTicker; -import org.apache.qpid.server.transport.ProtocolEngine; -import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; -import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.transport.NetworkConnectionScheduler; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -75,7 +67,7 @@ import org.apache.qpid.transport.Option; import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.Session; -public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>, +public class ServerConnection extends Connection implements //AMQConnectionModel<ServerConnection, ServerSession>, LogSubject, AuthorizationHolder { private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class); @@ -83,9 +75,7 @@ public class ServerConnection extends Co private final Broker<?> _broker; private AtomicBoolean _logClosed = new AtomicBoolean(false); - private final Subject _authorizedSubject = new Subject(); private Principal _authorizedPrincipal = null; - private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final long _connectionId; private final Object _reference = new Object(); private VirtualHostImpl<?,?,?> _virtualHost; @@ -94,23 +84,15 @@ public class ServerConnection extends Co private boolean _blocking; private final Transport _transport; - private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList = - new CopyOnWriteArrayList<Action<? super ServerConnection>>(); - private final Queue<Action<? super ServerConnection>> _asyncTaskList = new ConcurrentLinkedQueue<>(); - private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = - new CopyOnWriteArrayList<SessionModelListener>(); - - private volatile boolean _stopped; private int _messageCompressionThreshold; private final int _maxMessageSize; - private ProtocolEngine_0_10 _protocolEngine; + private AMQPConnection_0_10 _amqpConnection; private boolean _ignoreFutureInput; private boolean _ignoreAllButConnectionCloseOk; - private ConnectionAdapter _adapter; public ServerConnection(final long connectionId, Broker<?> broker, @@ -118,7 +100,6 @@ public class ServerConnection extends Co final Transport transport) { _connectionId = connectionId; - _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); _broker = broker; _port = port; @@ -127,13 +108,6 @@ public class ServerConnection extends Co int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE); _maxMessageSize = (maxMessageSize > 0) ? maxMessageSize : Integer.MAX_VALUE; - - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId()); - _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId()); - _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId()); - _dataReceived = new StatisticsCounter("data-received-" + getConnectionId()); - _adapter = new ConnectionAdapter(this); - _adapter.create(); } public Object getReference() @@ -177,19 +151,19 @@ public class ServerConnection extends Co true, true)); - _adapter.virtualHostAssociated(); + _amqpConnection.virtualHostAssociated(); } if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING) { if(_virtualHost != null) { - _virtualHost.deregisterConnection(_adapter); + _virtualHost.deregisterConnection(_amqpConnection); } } if(state == State.CLOSING) { - getProtocolEngine().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, getNetworkConnection())); + getAmqpConnection().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, getNetworkConnection())); } if (state == State.CLOSED) { @@ -211,26 +185,14 @@ public class ServerConnection extends Co return (ServerConnectionDelegate) super.getConnectionDelegate(); } - public void setConnectionDelegate(ServerConnectionDelegate delegate) - { - super.setConnectionDelegate(delegate); - } - - @Override - public ProtocolEngine getProtocolEngine() - { - return _protocolEngine; - } - - @Override - public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler) + public AMQPConnection_0_10 getAmqpConnection() { - _protocolEngine.setScheduler(networkConnectionScheduler); + return _amqpConnection; } - public void setProtocolEngine(final ProtocolEngine_0_10 serverProtocolEngine) + public void setAmqpConnection(final AMQPConnection_0_10 serverProtocolEngine) { - _protocolEngine = serverProtocolEngine; + _amqpConnection = serverProtocolEngine; } public VirtualHostImpl<?,?,?> getVirtualHost() @@ -249,39 +211,19 @@ public class ServerConnection extends Co { _messageCompressionThreshold = Integer.MAX_VALUE; } - _authorizedSubject.getPrincipals().add(_virtualHost.getPrincipal()); + _amqpConnection.getSubject().getPrincipals().add(_virtualHost.getPrincipal()); } - @Override - public String getVirtualHostName() - { - return _virtualHost == null ? null : _virtualHost.getName(); - } - - @Override public AmqpPort<?> getPort() { return _port; } - @Override public Transport getTransport() { return _transport; } - @Override - public void stop() - { - _stopped = true; - } - - @Override - public boolean isStopped() - { - return _stopped; - } - public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message) { addAsyncTask(new Action<ServerConnection>() @@ -345,7 +287,7 @@ public class ServerConnection extends Co Subject subject; if (event.isConnectionControl()) { - subject = _authorizedSubject; + subject = _amqpConnection.getSubject(); } else { @@ -356,7 +298,7 @@ public class ServerConnection extends Co } else { - subject = _authorizedSubject; + subject = _amqpConnection.getSubject(); } } @@ -446,10 +388,7 @@ public class ServerConnection extends Co protected void performDeleteTasks() { - for(Action<? super ServerConnection> task : _connectionCloseTaskList) - { - task.performAction(this); - } + _amqpConnection.performDeleteTasks(); } public synchronized void block() @@ -480,7 +419,7 @@ public class ServerConnection extends Co public synchronized void registerSession(final Session ssn) { super.registerSession(ssn); - sessionAdded((ServerSession)ssn); + _amqpConnection.sessionAdded((ServerSession) ssn); if(_blocking) { ((ServerSession)ssn).block(); @@ -490,13 +429,13 @@ public class ServerConnection extends Co @Override public synchronized void removeSession(final Session ssn) { - sessionRemoved((ServerSession)ssn); + _amqpConnection.sessionRemoved((ServerSession) ssn); super.removeSession(ssn); } public List<ServerSession> getSessionModels() { - List<ServerSession> sessions = new ArrayList<ServerSession>(); + List<ServerSession> sessions = new ArrayList<>(); for (Session ssn : getChannels()) { sessions.add((ServerSession) ssn); @@ -504,54 +443,13 @@ public class ServerConnection extends Co return sessions; } - public void registerMessageDelivered(long messageSize) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - _virtualHost.registerMessageDelivered(messageSize); - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - _virtualHost.registerMessageReceived(messageSize, timestamp); - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return _messagesReceived; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceived; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messagesDelivered; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDelivered; - } - - public void resetStatistics() - { - _messagesDelivered.reset(); - _dataDelivered.reset(); - _messagesReceived.reset(); - _dataReceived.reset(); - } /** * @return authorizedSubject */ public Subject getAuthorizedSubject() { - return _authorizedSubject; + return _amqpConnection.getSubject(); } /** @@ -568,7 +466,7 @@ public class ServerConnection extends Co } else { - _authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals()); + getAuthorizedSubject().getPrincipals().addAll(authorizedSubject.getPrincipals()); _authorizedPrincipal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(authorizedSubject); } @@ -586,13 +484,7 @@ public class ServerConnection extends Co public String getRemoteAddressString() { - return String.valueOf(getRemoteAddress()); - } - - @Override - public String getRemoteProcessPid() - { - return getConnectionDelegate().getRemoteProcessPid(); + return String.valueOf(getRemoteSocketAddress()); } @Override @@ -636,59 +528,22 @@ public class ServerConnection extends Co super.send(event); } - public long getLastIoTime() - { - return _lastIoTime.longValue(); - } - - @Override public String getClientId() { return getConnectionDelegate().getClientId(); } - @Override public String getRemoteContainerName() { return getConnectionDelegate().getClientId(); } - @Override - public void addSessionListener(final SessionModelListener listener) - { - _sessionListeners.add(listener); - } - - @Override - public void removeSessionListener(final SessionModelListener listener) - { - _sessionListeners.remove(listener); - } - - private void sessionAdded(final AMQSessionModel<?,?> session) - { - for(SessionModelListener l : _sessionListeners) - { - l.sessionAdded(session); - } - } - private void sessionRemoved(final AMQSessionModel<?,?> session) - { - for(SessionModelListener l : _sessionListeners) - { - l.sessionRemoved(session); - } - } - - - @Override public String getClientVersion() { return getConnectionDelegate().getClientVersion(); } - @Override public String getClientProduct() { return getConnectionDelegate().getClientProduct(); @@ -721,24 +576,12 @@ public class ServerConnection extends Co super.doHeartBeat(); } - @Override - public void addDeleteTask(final Action<? super ServerConnection> task) - { - _connectionCloseTaskList.add(task); - } - private void addAsyncTask(final Action<ServerConnection> action) { _asyncTaskList.add(action); notifyWork(); } - @Override - public void removeDeleteTask(final Action<? super ServerConnection> task) - { - _connectionCloseTaskList.remove(task); - } - public int getMessageCompressionThreshold() { return _messageCompressionThreshold; @@ -757,26 +600,18 @@ public class ServerConnection extends Co } } - @Override public void notifyWork() { - _protocolEngine.notifyWork(); - } - - - @Override - public boolean isMessageAssignmentSuspended() - { - return _protocolEngine.isMessageAssignmentSuspended(); + _amqpConnection.notifyWork(); } public void processPending() { - List<? extends AMQSessionModel<?,?>> sessionsWithPending = new ArrayList<>(getSessionModels()); + List<? extends AMQSessionModel<?>> sessionsWithPending = new ArrayList<>(getSessionModels()); while(!sessionsWithPending.isEmpty()) { - final Iterator<? extends AMQSessionModel<?, ?>> iter = sessionsWithPending.iterator(); - AMQSessionModel<?, ?> session; + final Iterator<? extends AMQSessionModel<?>> iter = sessionsWithPending.iterator(); + AMQSessionModel<?> session; while(iter.hasNext()) { session = iter.next(); Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Sat Jun 27 21:13:25 2015 @@ -45,10 +45,10 @@ import org.apache.qpid.server.configurat import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; +import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.*; import org.apache.qpid.transport.network.NetworkConnection; @@ -211,7 +211,7 @@ public class ServerConnectionDelegate ex sconn.setVirtualHost(vhost); try { - if(!vhost.authoriseCreateConnection(sconn)) + if(!vhost.authoriseCreateConnection(sconn.getAmqpConnection())) { sconn.setState(Connection.State.CLOSING); sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not authorized"); @@ -367,23 +367,19 @@ public class ServerConnectionDelegate ex final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal(); final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName(); - final Iterator<org.apache.qpid.server.model.Connection<?>> connections = + final Iterator<? extends org.apache.qpid.server.model.Connection<?>> connections = ((ServerConnection)conn).getVirtualHost().getConnections().iterator(); while(connections.hasNext()) { final org.apache.qpid.server.model.Connection<?> modelConnnection = connections.next(); - final AMQConnectionModel amqConnectionModel = modelConnnection.getUnderlyingConnection(); - if (amqConnectionModel instanceof ServerConnection) - { - ServerConnection otherConnection = (ServerConnection)amqConnectionModel; + final AMQPConnection<?> amqConnectionModel = modelConnnection.getUnderlyingConnection(); - final String userName = amqConnectionModel.getAuthorizedPrincipal() == null - ? "" - : amqConnectionModel.getAuthorizedPrincipal().getName(); - if (userId.equals(userName) && otherConnection.hasSessionWithName(name)) - { - return false; - } + final String userName = amqConnectionModel.getAuthorizedPrincipal() == null + ? "" + : amqConnectionModel.getAuthorizedPrincipal().getName(); + if (userId.equals(userName) && amqConnectionModel.hasSessionWithName(name)) + { + return false; } } return true; @@ -402,10 +398,20 @@ public class ServerConnectionDelegate ex _compressionSupported = Boolean.parseBoolean(String.valueOf(compressionSupported)); } + final AMQPConnection_0_10 protocolEngine = ((ServerConnection) conn).getAmqpConnection(); + protocolEngine.setClientId(getStringClientProperty(ConnectionStartProperties.CLIENT_ID_0_10)); + protocolEngine.setClientProduct(getStringClientProperty(ConnectionStartProperties.PRODUCT)); + protocolEngine.setClientVersion(getStringClientProperty(ConnectionStartProperties.VERSION_0_10)); + protocolEngine.setRemoteProcessPid(getStringClientProperty(ConnectionStartProperties.PID)); } super.connectionStartOk(conn, ok); } + private String getStringClientProperty(final String name) + { + return (_clientProperties == null || _clientProperties.get(name) == null) ? null : String.valueOf(_clientProperties.get(name)); + } + public Map<String,Object> getClientProperties() { return _clientProperties; Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sat Jun 27 21:13:25 2015 @@ -75,6 +75,7 @@ import org.apache.qpid.server.queue.AMQQ import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; @@ -113,7 +114,7 @@ import org.apache.qpid.transport.network public class ServerSession extends Session implements AuthorizationHolder, - AMQSessionModel<ServerSession,ServerConnection>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder, + AMQSessionModel<ServerSession>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder, Deletable<ServerSession> { @@ -192,7 +193,7 @@ public class ServerSession extends Sessi @Override public void doTimeoutAction(String reason) { - getConnectionModel().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); + getAMQPConnection().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); } }, getVirtualHost()); @@ -275,7 +276,7 @@ public class ServerSession extends Sessi message.getInitialRoutingAddress(), instanceProperties, _transaction, _checkCapacityAction ); - getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); + getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime()); incrementOutstandingTxnsIfNecessary(); incrementUncommittedMessageSize(message.getStoredMessage()); return enqueues; @@ -321,7 +322,7 @@ public class ServerSession extends Sessi public void sendMessage(MessageTransfer xfr, Runnable postIdSettingAction) { - getConnectionModel().registerMessageDelivered(xfr.getBodySize()); + getAMQPConnection().registerMessageDelivered(xfr.getBodySize()); invoke(xfr, postIdSettingAction); } @@ -787,9 +788,10 @@ public class ServerSession extends Sessi return _id; } - public ServerConnection getConnectionModel() + @Override + public AMQPConnection<?> getAMQPConnection() { - return getConnection(); + return getConnection().getAmqpConnection(); } public String getClientID() @@ -897,7 +899,7 @@ public class ServerSession extends Sessi ? getConnection().getConnectionId() : -1; - String remoteAddress = String.valueOf(getConnection().getRemoteAddress()); + String remoteAddress = String.valueOf(getConnection().getRemoteSocketAddress()); return "[" + MessageFormat.format(CHANNEL_FORMAT, connectionId, @@ -1188,13 +1190,13 @@ public class ServerSession extends Sessi @Override public void addTicker(final Ticker ticker) { - getConnection().getProtocolEngine().getAggregateTicker().addTicker(ticker); + getConnection().getAmqpConnection().getAggregateTicker().addTicker(ticker); } @Override public void removeTicker(final Ticker ticker) { - getConnection().getProtocolEngine().getAggregateTicker().removeTicker(ticker); + getConnection().getAmqpConnection().getAggregateTicker().removeTicker(ticker); } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sat Jun 27 21:13:25 2015 @@ -255,7 +255,7 @@ public class ServerSessionDelegate exten } else { - ProtocolEngine protocolEngine = getServerConnection(session).getProtocolEngine(); + ProtocolEngine protocolEngine = getServerConnection(session).getAmqpConnection(); FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, protocolEngine); FilterManager filterManager = null; Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Sat Jun 27 21:13:25 2015 @@ -25,7 +25,8 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; -import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; +import javax.security.auth.Subject; + import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; import org.apache.qpid.server.model.Broker; @@ -80,15 +81,17 @@ public class ServerSessionTest extends Q AmqpPort amqpPort = createMockPort(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP); - final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class); - connection.setProtocolEngine(protocolEngine); + final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class); + Subject subject = new Subject(); + when(protocolEngine.getSubject()).thenReturn(subject); + connection.setAmqpConnection(protocolEngine); connection.setVirtualHost(_virtualHost); ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(), new Binary(getName().getBytes()), 0); // create a session with the same name but on a different connection ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP); - connection2.setProtocolEngine(protocolEngine); + connection2.setAmqpConnection(protocolEngine); connection2.setVirtualHost(_virtualHost); ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(), new Binary(getName().getBytes()), 0); @@ -105,8 +108,11 @@ public class ServerSessionTest extends Q AmqpPort port = createMockPort(1024); ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP); - final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class); - connection.setProtocolEngine(protocolEngine); + final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class); + Subject subject = new Subject(); + when(protocolEngine.getSubject()).thenReturn(subject); + + connection.setAmqpConnection(protocolEngine); connection.setVirtualHost(_virtualHost); final List<Method> invokedMethods = new ArrayList<>(); ServerSession session = new ServerSession(connection, new ServerSessionDelegate(), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
