Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java Sun May 22 21:03:46 2016 @@ -33,6 +33,7 @@ import java.util.UUID; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.PatternMatchingAlias; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.VirtualHost; @@ -96,38 +97,38 @@ public class VirtualHostAliasTest extend public void testDefaultAliases_VirtualHostNameAlias() { - VirtualHost<?> vhost = _port.getVirtualHost("red"); + NamedAddressSpace addressSpace = _port.getAddressSpace("red"); - assertNotNull(vhost); - assertEquals(_vhosts.get("red"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("red"), addressSpace); - vhost = _port.getVirtualHost("blue"); + addressSpace = _port.getAddressSpace("blue"); - assertNotNull(vhost); - assertEquals(_vhosts.get("blue"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("blue"), addressSpace); - vhost = _port.getVirtualHost("orange!"); + addressSpace = _port.getAddressSpace("orange!"); - assertNull(vhost); + assertNull(addressSpace); } public void testDefaultAliases_DefaultVirtualHostAlias() { // test the default vhost resolution - VirtualHost<?> vhost = _port.getVirtualHost(""); + NamedAddressSpace addressSpace = _port.getAddressSpace(""); - assertNotNull(vhost); - assertEquals(_vhosts.get("black"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("black"), addressSpace); } public void testDefaultAliases_HostNameAlias() { // 127.0.0.1 should always resolve and thus return the default vhost - VirtualHost<?> vhost = _port.getVirtualHost("127.0.0.1"); + NamedAddressSpace addressSpace = _port.getAddressSpace("127.0.0.1"); - assertNotNull(vhost); - assertEquals(_vhosts.get("black"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("black"), addressSpace); } public void testPatternMatching() @@ -139,42 +140,42 @@ public class VirtualHostAliasTest extend attributes.put(PatternMatchingAlias.VIRTUAL_HOST_NODE, _vhosts.get("purple").getParent(VirtualHostNode.class)); _port.createChild(VirtualHostAlias.class, attributes); - VirtualHost<?> vhost = _port.getVirtualHost("orange"); + NamedAddressSpace addressSpace = _port.getAddressSpace("orange"); - assertNotNull(vhost); - assertEquals(_vhosts.get("purple"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("purple"), addressSpace); - vhost = _port.getVirtualHost("pink"); + addressSpace = _port.getAddressSpace("pink"); - assertNotNull(vhost); - assertEquals(_vhosts.get("purple"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("purple"), addressSpace); - vhost = _port.getVirtualHost("pinker"); + addressSpace = _port.getAddressSpace("pinker"); - assertNotNull(vhost); - assertEquals(_vhosts.get("purple"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("purple"), addressSpace); - vhost = _port.getVirtualHost("o.*"); + addressSpace = _port.getAddressSpace("o.*"); - assertNull(vhost); + assertNull(addressSpace); } public void testPriority() { - VirtualHost<?> vhost = _port.getVirtualHost("blue"); + NamedAddressSpace addressSpace = _port.getAddressSpace("blue"); - assertNotNull(vhost); - assertEquals(_vhosts.get("blue"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("blue"), addressSpace); - vhost = _port.getVirtualHost("black"); + addressSpace = _port.getAddressSpace("black"); - assertNotNull(vhost); - assertEquals(_vhosts.get("black"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("black"), addressSpace); @@ -186,15 +187,15 @@ public class VirtualHostAliasTest extend attributes.put(PatternMatchingAlias.VIRTUAL_HOST_NODE, _vhosts.get("purple").getParent(VirtualHostNode.class)); _port.createChild(VirtualHostAlias.class, attributes); - vhost = _port.getVirtualHost("blue"); + addressSpace = _port.getAddressSpace("blue"); - assertNotNull(vhost); - assertEquals(_vhosts.get("purple"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("purple"), addressSpace); - vhost = _port.getVirtualHost("black"); + addressSpace = _port.getAddressSpace("black"); - assertNotNull(vhost); - assertEquals(_vhosts.get("purple"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("purple"), addressSpace); attributes = new HashMap<>(); @@ -207,22 +208,22 @@ public class VirtualHostAliasTest extend - vhost = _port.getVirtualHost("blue"); + addressSpace = _port.getAddressSpace("blue"); - assertNotNull(vhost); - assertEquals(_vhosts.get("red"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("red"), addressSpace); - vhost = _port.getVirtualHost("black"); + addressSpace = _port.getAddressSpace("black"); - assertNotNull(vhost); - assertEquals(_vhosts.get("purple"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("purple"), addressSpace); - vhost = _port.getVirtualHost("purple"); + addressSpace = _port.getAddressSpace("purple"); - assertNotNull(vhost); - assertEquals(_vhosts.get("red"), vhost); + assertNotNull(addressSpace); + assertEquals(_vhosts.get("red"), addressSpace);
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Sun May 22 21:03:46 2016 @@ -162,7 +162,7 @@ public class AMQPConnection_0_10 extends } catch (StoreException e) { - if (getVirtualHost().getState() == State.ACTIVE) + if (getAddressSpace().isActive()) { throw new ServerScopedRuntimeException(e); } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Sun May 22 21:03:46 2016 @@ -228,7 +228,7 @@ public class ConsumerTarget_0_10 extends MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class); - msg = (MessageTransferMessage) converter.convert(serverMsg, _session.getVirtualHost()); + msg = (MessageTransferMessage) converter.convert(serverMsg, _session.getAddressSpace()); } DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties(); messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties(); @@ -410,7 +410,7 @@ public class ConsumerTarget_0_10 extends private void forceDequeue(final MessageInstance entry, final boolean restoreCredit) { - AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore()); + AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getAddressSpace().getMessageStore()); dequeueTxn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action() { @@ -506,7 +506,7 @@ public class ConsumerTarget_0_10 extends protected EventLogger getEventLogger() { - return getSessionModel().getVirtualHost().getEventLogger(); + return getSessionModel().getAMQPConnection().getEventLogger(); } private boolean isMaxDeliveryLimitReached(MessageInstance entry) Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Sun May 22 21:03:46 2016 @@ -25,7 +25,7 @@ import java.util.Collections; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.internal.InternalMessage; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.store.StoredMessage; @@ -50,7 +50,7 @@ public class MessageConverter_Internal_t } @Override - public MessageTransferMessage convert(InternalMessage serverMsg, VirtualHost<?> vhost) + public MessageTransferMessage convert(InternalMessage serverMsg, NamedAddressSpace addressSpace) { return new MessageTransferMessage(convertToStoredMessage(serverMsg), null); } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Sun May 22 21:03:46 2016 @@ -31,7 +31,7 @@ import java.util.Map; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.store.StoredMessage; @@ -61,7 +61,7 @@ public class MessageConverter_v0_10 impl } @Override - public MessageTransferMessage convert(ServerMessage serverMsg, VirtualHost<?> vhost) + public MessageTransferMessage convert(ServerMessage serverMsg, NamedAddressSpace addressSpace) { return new MessageTransferMessage(convertToStoredMessage(serverMsg), null); } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java Sun May 22 21:03:46 2016 @@ -32,7 +32,7 @@ import java.util.Set; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.internal.InternalMessage; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -58,7 +58,7 @@ public class MessageConverter_v0_10_to_I } @Override - public InternalMessage convert(MessageTransferMessage serverMessage, VirtualHost<?> vhost) + public InternalMessage convert(MessageTransferMessage serverMessage, NamedAddressSpace addressSpace) { final String mimeType = serverMessage.getMessageHeader().getMimeType(); byte[] data = new byte[(int) serverMessage.getSize()]; Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Sun May 22 21:03:46 2016 @@ -41,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.ConnectionClosingTicker; import org.apache.qpid.server.logging.EventLogger; @@ -143,14 +144,14 @@ public class ServerConnection extends Co return _amqpConnection; } - public VirtualHost<?> getVirtualHost() + public NamedAddressSpace getAddressSpace() { - return _amqpConnection.getVirtualHost(); + return _amqpConnection.getAddressSpace(); } - public void setVirtualHost(VirtualHost<?> virtualHost) + public void setVirtualHost(NamedAddressSpace addressSpace) { - _amqpConnection.setVirtualHost(virtualHost); + _amqpConnection.setAddressSpace(addressSpace); } public AmqpPort<?> getPort() @@ -381,10 +382,10 @@ public class ServerConnection extends Co } finally { - VirtualHost<?> virtualHost = getVirtualHost(); - if(virtualHost != null) + NamedAddressSpace addressSpace = getAddressSpace(); + if(addressSpace != null) { - virtualHost.deregisterConnection(_amqpConnection); + addressSpace.deregisterConnection(_amqpConnection); } } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Sun May 22 21:03:46 2016 @@ -41,11 +41,9 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.configuration.CommonProperties; import org.apache.qpid.properties.ConnectionStartProperties; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; @@ -54,7 +52,6 @@ import org.apache.qpid.server.transport. import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.transport.*; -import org.apache.qpid.transport.network.NetworkConnection; public class ServerConnectionDelegate extends ServerDelegate { @@ -219,7 +216,7 @@ public class ServerConnectionDelegate ex { final ServerConnection sconn = (ServerConnection) conn; assertState(sconn, ConnectionState.AWAIT_OPEN); - VirtualHost<?> vhost; + NamedAddressSpace addressSpace; String vhostName; if(open.hasVirtualHost()) { @@ -231,16 +228,16 @@ public class ServerConnectionDelegate ex } AmqpPort port = (AmqpPort) sconn.getPort(); - vhost = port.getVirtualHost(vhostName); + addressSpace = port.getAddressSpace(vhostName); - if(vhost != null) + if(addressSpace != null) { - if (vhost.getState() != State.ACTIVE) + if (!addressSpace.isActive()) { sconn.setState(Connection.State.CLOSING); - final String redirectHost = vhost.getRedirectHost(port); + final String redirectHost = addressSpace.getRedirectHost(port); if(redirectHost == null) { sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, @@ -255,8 +252,8 @@ public class ServerConnectionDelegate ex try { - sconn.setVirtualHost(vhost); - if(!vhost.authoriseCreateConnection(sconn.getAmqpConnection())) + sconn.setVirtualHost(addressSpace); + if(!addressSpace.authoriseCreateConnection(sconn.getAmqpConnection())) { sconn.setState(Connection.State.CLOSING); sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not authorized"); @@ -406,7 +403,7 @@ public class ServerConnectionDelegate ex final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName(); final Iterator<? extends org.apache.qpid.server.model.Connection<?>> connections = - ((ServerConnection)conn).getVirtualHost().getConnections().iterator(); + ((ServerConnection)conn).getAddressSpace().getConnections().iterator(); while(connections.hasNext()) { final AMQPConnection<?> amqConnectionModel = (AMQPConnection<?>) connections.next(); Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sun May 22 21:03:46 2016 @@ -71,8 +71,8 @@ import org.apache.qpid.server.model.Brok import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.protocol.ConsumerListener; @@ -198,11 +198,11 @@ public class ServerSession extends Sessi { getAMQPConnection().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); } - }, getVirtualHost()); + }, getConnection().getAmqpConnection()); _blockingTimeout = ((ServerConnection)connection).getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT); - _maxUncommittedInMemorySize = getVirtualHost().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE); + _maxUncommittedInMemorySize = getConnection().getAmqpConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE); } @@ -219,7 +219,7 @@ public class ServerSession extends Sessi if (state == State.OPEN) { - getVirtualHost().getEventLogger().message(ChannelMessages.CREATE()); + getConnection().getAmqpConnection().getEventLogger().message(ChannelMessages.CREATE()); } } else @@ -306,7 +306,7 @@ public class ServerSession extends Sessi handle.flowToDisk(); if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize()) { - getVirtualHost().getEventLogger() + getConnection().getAmqpConnection().getEventLogger() .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize)); } @@ -495,7 +495,7 @@ public class ServerSession extends Sessi } else if(_transaction instanceof DistributedTransaction) { - getVirtualHost().getDtxRegistry().endAssociations(this); + getAddressSpace().getDtxRegistry().endAssociations(this); } for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) @@ -514,7 +514,7 @@ public class ServerSession extends Sessi { operationalLoggingMessage = ChannelMessages.CLOSE(); } - getVirtualHost().getEventLogger().message(getLogSubject(), operationalLoggingMessage); + getConnection().getAmqpConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage); } @Override @@ -596,7 +596,7 @@ public class ServerSession extends Sessi public void selectDtx() { - _transaction = new DistributedTransaction(this, getMessageStore(), getVirtualHost()); + _transaction = new DistributedTransaction(this, getAddressSpace().getDtxRegistry()); } @@ -626,14 +626,14 @@ public class ServerSession extends Sessi public long getTimeoutDtx(Xid xid) throws UnknownDtxBranchException { - return getVirtualHost().getDtxRegistry().getTimeout(xid); + return getAddressSpace().getDtxRegistry().getTimeout(xid); } public void setTimeoutDtx(Xid xid, long timeout) throws UnknownDtxBranchException { - getVirtualHost().getDtxRegistry().setTimeout(xid, timeout); + getAddressSpace().getDtxRegistry().setTimeout(xid, timeout); } @@ -641,14 +641,14 @@ public class ServerSession extends Sessi throws UnknownDtxBranchException, IncorrectDtxStateException, StoreException, RollbackOnlyDtxException, TimeoutDtxException { - getVirtualHost().getDtxRegistry().prepare(xid); + getAddressSpace().getDtxRegistry().prepare(xid); } public void commitDtx(Xid xid, boolean onePhase) throws UnknownDtxBranchException, IncorrectDtxStateException, StoreException, RollbackOnlyDtxException, TimeoutDtxException { - getVirtualHost().getDtxRegistry().commit(xid, onePhase); + getAddressSpace().getDtxRegistry().commit(xid, onePhase); } @@ -656,18 +656,18 @@ public class ServerSession extends Sessi throws UnknownDtxBranchException, IncorrectDtxStateException, StoreException, TimeoutDtxException { - getVirtualHost().getDtxRegistry().rollback(xid); + getAddressSpace().getDtxRegistry().rollback(xid); } public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException { - getVirtualHost().getDtxRegistry().forget(xid); + getAddressSpace().getDtxRegistry().forget(xid); } public List<Xid> recoverDtx() { - return getVirtualHost().getDtxRegistry().recover(); + return getAddressSpace().getDtxRegistry().recover(); } private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException @@ -776,12 +776,12 @@ public class ServerSession extends Sessi public MessageStore getMessageStore() { - return getVirtualHost().getMessageStore(); + return getAddressSpace().getMessageStore(); } - public VirtualHost<?> getVirtualHost() + public NamedAddressSpace getAddressSpace() { - return getConnection().getVirtualHost(); + return getConnection().getAddressSpace(); } public boolean isDurable() @@ -802,7 +802,7 @@ public class ServerSession extends Sessi } @Override - public AMQPConnection<?> getAMQPConnection() + public AMQPConnection_0_10 getAMQPConnection() { return getConnection().getAmqpConnection(); } @@ -844,7 +844,7 @@ public class ServerSession extends Sessi if(_blocking.compareAndSet(false,true)) { - getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); + getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); if(getState() == State.OPEN) { getConnection().notifyWork(); @@ -872,7 +872,7 @@ public class ServerSession extends Sessi { if(_blocking.compareAndSet(true,false) && !isClosing()) { - getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); + getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); getConnection().notifyWork(); } } @@ -914,7 +914,7 @@ public class ServerSession extends Sessi connectionId, authorizedPrincipal, remoteAddress, - getVirtualHost().getName(), + getAddressSpace().getName(), getChannel()) + "] "; } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sun May 22 21:03:46 2016 @@ -39,8 +39,9 @@ import org.apache.qpid.bytebuffer.QpidBy import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.transport.ProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.store.MessageHandle; @@ -212,15 +213,15 @@ public class ServerSessionDelegate exten else { String queueName = method.getQueue(); - VirtualHost<?> vhost = getVirtualHost(session); + NamedAddressSpace addressSpace = getAddressSpace(session); final Collection<MessageSource> sources = new HashSet<>(); - final MessageSource queue = vhost.getAttainedMessageSource(queueName); + final MessageSource queue = addressSpace.getAttainedMessageSource(queueName); if(queue != null) { sources.add(queue); } - else if(vhost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") + else if(getContextValue(session, Boolean.class, "qpid.enableMultiQueueConsumers") && method.getArguments() != null && method.getArguments().get("x-multiqueue") instanceof Collection) { @@ -230,7 +231,7 @@ public class ServerSessionDelegate exten sourceName = sourceName.trim(); if(sourceName.length() != 0) { - MessageSource source = vhost.getAttainedMessageSource(sourceName); + MessageSource source = addressSpace.getAttainedMessageSource(sourceName); if(source == null) { sources.clear(); @@ -392,7 +393,7 @@ public class ServerSessionDelegate exten ServerSession serverSession = (ServerSession) ssn; if(serverSession.blockingTimeoutExceeded()) { - getVirtualHost(ssn).getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED()); + getEventLogger(ssn).message(ChannelMessages.FLOW_CONTROL_IGNORED()); serverSession.close(AMQConstant.MESSAGE_TOO_LARGE, "Session flow control was requested, but not enforced by sender"); @@ -415,10 +416,10 @@ public class ServerSessionDelegate exten final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); - final VirtualHost<?> virtualHost = getVirtualHost(ssn); + final NamedAddressSpace virtualHost = getAddressSpace(ssn); try { - virtualHost.getSecurityManager() + getServerConnection(ssn).getAmqpConnection().getBroker().getSecurityManager() .authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), destination.getName(), @@ -480,7 +481,7 @@ public class ServerSessionDelegate exten } else { - virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(), + getEventLogger(ssn).message(ExchangeMessages.DISCARDMSG(destination.getName(), messageMetaData.getRoutingKey())); } } @@ -826,7 +827,7 @@ public class ServerSessionDelegate exten public void exchangeDeclare(Session session, ExchangeDeclare method) { String exchangeName = method.getExchange(); - VirtualHost<?> virtualHost = getVirtualHost(session); + NamedAddressSpace addressSpace = getAddressSpace(session); //we must check for any unsupported arguments present and throw not-implemented if(method.hasArguments()) @@ -890,7 +891,7 @@ public class ServerSessionDelegate exten attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange()); - virtualHost.createChild(Exchange.class, attributes);; + addressSpace.createMessageDestination(Exchange.class, attributes);; } catch(ReservedExchangeNameException e) { @@ -958,33 +959,46 @@ public class ServerSessionDelegate exten private Exchange<?> getExchange(Session session, String exchangeName) { - return getVirtualHost(session).getAttainedChildFromAddress(Exchange.class, exchangeName); + return getExchange(getAddressSpace(session),exchangeName); + } + + private Exchange<?> getExchange(NamedAddressSpace addressSpace, String exchangeName) + { + MessageDestination destination = addressSpace.getAttainedMessageDestination(exchangeName); + return destination instanceof Exchange ? (Exchange<?>) destination : null; + } + + + private Queue<?> getQueue(NamedAddressSpace addressSpace, String name) + { + MessageSource source = addressSpace.getAttainedMessageSource(name); + return source instanceof Queue ? (Queue<?>) source : null; } private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr) { - VirtualHost<?> virtualHost = getVirtualHost(ssn); + NamedAddressSpace addressSpace = getAddressSpace(ssn); MessageDestination destination; if(xfr.hasDestination()) { - destination = virtualHost.getAttainedMessageDestination(xfr.getDestination()); + destination = addressSpace.getAttainedMessageDestination(xfr.getDestination()); if(destination == null) { - destination = virtualHost.getDefaultDestination(); + destination = addressSpace.getDefaultDestination(); } } else { - destination = virtualHost.getDefaultDestination(); + destination = addressSpace.getDefaultDestination(); } return destination; } - private VirtualHost<?> getVirtualHost(Session session) + private NamedAddressSpace getAddressSpace(Session session) { ServerConnection conn = getServerConnection(session); - return conn.getVirtualHost(); + return conn.getAddressSpace(); } private ServerConnection getServerConnection(Session session) @@ -992,6 +1006,16 @@ public class ServerSessionDelegate exten return (ServerConnection) session.getConnection(); } + private <T> T getContextValue(Session session, Class<T> clazz, String name) + { + return getServerConnection(session).getAmqpConnection().getContextProvider().getContextValue(clazz, name); + } + + private EventLogger getEventLogger(Session session) + { + return getServerConnection(session).getAmqpConnection().getEventLogger(); + } + @Override public void exchangeDelete(Session session, ExchangeDelete method) { @@ -1083,7 +1107,7 @@ public class ServerSessionDelegate exten public void exchangeBind(Session session, ExchangeBind method) { - VirtualHost<?> virtualHost = getVirtualHost(session); + NamedAddressSpace addressSpace = getAddressSpace(session); if (!method.hasQueue()) { @@ -1104,8 +1128,8 @@ public class ServerSessionDelegate exten { method.setBindingKey(method.getQueue()); } - Queue<?> queue = virtualHost.getAttainedChildFromAddress(Queue.class, method.getQueue()); - Exchange<?> exchange = virtualHost.getAttainedChildFromAddress(Exchange.class, exchangeName); + Queue<?> queue = getQueue(addressSpace, method.getQueue()); + Exchange<?> exchange = getExchange(addressSpace, exchangeName); if(queue == null) { exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); @@ -1148,7 +1172,7 @@ public class ServerSessionDelegate exten @Override public void exchangeUnbind(Session session, ExchangeUnbind method) { - VirtualHost<?> virtualHost = getVirtualHost(session); + NamedAddressSpace addressSpace = getAddressSpace(session); if (!method.hasQueue()) { @@ -1164,8 +1188,8 @@ public class ServerSessionDelegate exten } else { - Queue<?> queue = virtualHost.getAttainedChildFromAddress(Queue.class, method.getQueue()); - Exchange<?> exchange = virtualHost.getAttainedChildFromAddress(Exchange.class, method.getExchange()); + Queue<?> queue = getQueue(addressSpace, method.getQueue()); + Exchange<?> exchange = getExchange(addressSpace, method.getExchange()); if(queue == null) { exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); @@ -1196,7 +1220,7 @@ public class ServerSessionDelegate exten { ExchangeBoundResult result = new ExchangeBoundResult(); - VirtualHost<?> virtualHost = getVirtualHost(session); + NamedAddressSpace addressSpace = getAddressSpace(session); Exchange<?> exchange; MessageSource source; Queue<?> queue; @@ -1204,7 +1228,7 @@ public class ServerSessionDelegate exten if(!nameNullOrEmpty(method.getExchange())) { isDefaultExchange = false; - exchange = virtualHost.getAttainedChildFromAddress(Exchange.class, method.getExchange()); + exchange = getExchange(addressSpace, method.getExchange()); if(exchange == null) { @@ -1380,19 +1404,19 @@ public class ServerSessionDelegate exten private MessageSource getMessageSource(Session session, String queue) { - return getVirtualHost(session).getAttainedMessageSource(queue); + return getAddressSpace(session).getAttainedMessageSource(queue); } private Queue<?> getQueue(Session session, String queue) { - return getVirtualHost(session).getAttainedChildFromAddress(Queue.class, queue); + return getQueue(getAddressSpace(session), queue); } @Override public void queueDeclare(Session session, final QueueDeclare method) { - final VirtualHost<?> virtualHost = getVirtualHost(session); + final NamedAddressSpace addressSpace = getAddressSpace(session); String queueName = method.getQueue(); Queue<?> queue; @@ -1403,11 +1427,11 @@ public class ServerSessionDelegate exten if(method.getPassive()) { - queue = virtualHost.getAttainedChildFromAddress(Queue.class, queueName); + queue = getQueue(addressSpace, queueName); if (queue == null) { - String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; + String description = "Queue: " + queueName + " not found on VirtualHost(" + addressSpace + ")."; ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND; exception(session, method, errorCode, description); @@ -1482,7 +1506,7 @@ public class ServerSessionDelegate exten arguments.put(Queue.DURABLE, method.getDurable()); - queue = virtualHost.createChild(Queue.class, arguments); + queue = addressSpace.createMessageSource(Queue.class, arguments); } catch(QueueExistsException qe) @@ -1541,8 +1565,6 @@ public class ServerSessionDelegate exten } else { - VirtualHost<?> virtualHost = getVirtualHost(session); - try { queue.delete(); Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegateTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegateTest.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegateTest.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegateTest.java Sun May 22 21:03:46 2016 @@ -48,7 +48,7 @@ public class ServerSessionDelegateTest e _host = mock(VirtualHost.class); ServerConnection serverConnection = mock(ServerConnection.class); - doReturn(_host).when(serverConnection).getVirtualHost(); + doReturn(_host).when(serverConnection).getAddressSpace(); _session = mock(ServerSession.class); when(_session.getConnection()).thenReturn(serverConnection); @@ -61,7 +61,7 @@ public class ServerSessionDelegateTest e Exchange<?> exchange = mock(Exchange.class); when(exchange.hasBindings()).thenReturn(true); - doReturn(exchange).when(_host).getAttainedChildFromAddress(Exchange.class, getTestName()); + doReturn(exchange).when(_host).getAttainedMessageDestination(getTestName()); final ExchangeDelete method = new ExchangeDelete(getTestName(), Option.IF_UNUSED); _delegate.exchangeDelete(_session, method); @@ -82,7 +82,7 @@ public class ServerSessionDelegateTest e Exchange<?> exchange = mock(Exchange.class); when(exchange.hasBindings()).thenReturn(false); - doReturn(exchange).when(_host).getAttainedChildFromAddress(Exchange.class, getTestName()); + doReturn(exchange).when(_host).getAttainedMessageDestination(getTestName()); final ExchangeDelete method = new ExchangeDelete(getTestName(), Option.IF_UNUSED); _delegate.exchangeDelete(_session, method); Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Sun May 22 21:03:46 2016 @@ -29,13 +29,14 @@ import javax.security.auth.Subject; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; +import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.Binary; @@ -77,12 +78,17 @@ public class ServerSessionTest extends Q public void testOverlargeMessageTest() throws Exception { final Broker<?> broker = mock(Broker.class); + SecurityManager securityManager = new SecurityManager(broker, true); + when(broker.getSecurityManager()).thenReturn(securityManager); when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l); AmqpPort port = createMockPort(); final AMQPConnection_0_10 modelConnection = mock(AMQPConnection_0_10.class); - when(modelConnection.getVirtualHost()).thenReturn((VirtualHost) _virtualHost); + when(modelConnection.getAddressSpace()).thenReturn(_virtualHost); + when(modelConnection.getContextProvider()).thenReturn(_virtualHost); + when(modelConnection.getBroker()).thenReturn((Broker)broker); + when(modelConnection.getEventLogger()).thenReturn(mock(EventLogger.class)); Subject subject = new Subject(); when(modelConnection.getSubject()).thenReturn(subject); when(modelConnection.getMaxMessageSize()).thenReturn(1024l); Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun May 22 21:03:46 2016 @@ -51,7 +51,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.QpidException; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; @@ -69,6 +68,8 @@ import org.apache.qpid.server.filter.Fil import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.EventLoggerProvider; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; @@ -88,12 +89,12 @@ import org.apache.qpid.server.model.Cons import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.NoFactoryForTypeException; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.UnknownConfiguredObjectException; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.protocol.ConsumerListener; @@ -120,7 +121,8 @@ import org.apache.qpid.transport.network public class AMQChannel implements AMQSessionModel<AMQChannel>, AsyncAutoCommitTransaction.FutureRecorder, - ServerChannelMethodProcessor + ServerChannelMethodProcessor, + EventLoggerProvider { public static final int DEFAULT_PREFETCH = 4096; @@ -239,7 +241,7 @@ public class AMQChannel _accessControllerContext = org.apache.qpid.server.security.SecurityManager.getAccessControlContextFromSubject(_subject); - _maxUncommittedInMemorySize = connection.getVirtualHost().getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE); + _maxUncommittedInMemorySize = connection.getContextProvider().getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE); _logSubject = new ChannelLogSubject(this); _messageStore = messageStore; @@ -257,14 +259,14 @@ public class AMQChannel { _connection.sendConnectionCloseAsync(AMQConstant.RESOURCE_ERROR, reason); } - }, getVirtualHost()); + }, getConnection()); AccessController.doPrivileged((new PrivilegedAction<Object>() { @Override public Object run() { - getVirtualHost().getEventLogger().message(ChannelMessages.CREATE()); + message(ChannelMessages.CREATE()); return null; } @@ -272,6 +274,11 @@ public class AMQChannel } + private void message(final LogMessage message) + { + getEventLogger().message(message); + } + public AccessControlContext getAccessControllerContext() { return _accessControllerContext; @@ -417,9 +424,9 @@ public class AMQChannel { MessagePublishInfo info = _currentMessage.getMessagePublishInfo(); String routingKey = AMQShortString.toString(info.getRoutingKey()); - VirtualHost<?> virtualHost = getVirtualHost(); + NamedAddressSpace virtualHost = getAddressSpace(); - SecurityManager securityManager = virtualHost.getSecurityManager(); + SecurityManager securityManager = getConnection().getBroker().getSecurityManager(); try { ContentHeaderBody contentHeader = _currentMessage.getContentHeader(); @@ -554,8 +561,7 @@ public class AMQChannel handle.flowToDisk(); if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize()) { - getVirtualHost().getEventLogger() - .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize)); + messageWithSubject(ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize)); } if(!_uncommittedMessages.isEmpty()) @@ -580,7 +586,7 @@ public class AMQChannel * Pre-requisite: the current message is judged to have no destination queues. * * @throws AMQConnectionException if the message is mandatory close-on-no-route - * @see AMQPConnection_0_8#isCloseWhenNoRoute() + * @see AMQPConnection_0_8Impl#isCloseWhenNoRoute() */ private Runnable handleUnroutableMessage(AMQMessage message) { @@ -632,7 +638,7 @@ public class AMQChannel else { - getVirtualHost().getEventLogger().message(ExchangeMessages.DISCARDMSG(exchangeName, routingKey)); + message(ExchangeMessages.DISCARDMSG(exchangeName, routingKey)); } } return returnVal; @@ -892,10 +898,15 @@ public class AMQChannel LogMessage operationalLogMessage = cause == null ? ChannelMessages.CLOSE() : ChannelMessages.CLOSE_FORCED(cause.getCode(), message); - getVirtualHost().getEventLogger().message(_logSubject, operationalLogMessage); + messageWithSubject(operationalLogMessage); } } + private void messageWithSubject(final LogMessage operationalLogMessage) + { + getEventLogger().message(_logSubject, operationalLogMessage); + } + private void unsubscribeAllConsumers() { if (_logger.isDebugEnabled()) @@ -1152,7 +1163,7 @@ public class AMQChannel // Log Flow Started before we start the subscriptions if (!suspended && _logChannelFlowMessages) { - getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW("Started")); + messageWithSubject(ChannelMessages.FLOW("Started")); } @@ -1196,7 +1207,7 @@ public class AMQChannel // stopped. if (suspended && _logChannelFlowMessages) { - getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW("Stopped")); + messageWithSubject(ChannelMessages.FLOW("Stopped")); } } @@ -1324,7 +1335,7 @@ public class AMQChannel { if (!_prefetchLoggedForChannel) { - getVirtualHost().getEventLogger().message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount)); + message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount)); _prefetchLoggedForChannel = true; } @@ -1667,8 +1678,7 @@ public class AMQChannel if(_blocking.compareAndSet(false,true)) { - getVirtualHost().getEventLogger().message(_logSubject, - ChannelMessages.FLOW_ENFORCED("** All Queues **")); + messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues **")); getConnection().notifyWork(); @@ -1682,7 +1692,7 @@ public class AMQChannel { if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false)) { - getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); + messageWithSubject(ChannelMessages.FLOW_REMOVED()); getConnection().notifyWork(); } } @@ -1696,7 +1706,7 @@ public class AMQChannel if(_blocking.compareAndSet(false,true)) { - getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName())); + messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName())); getConnection().notifyWork(); } @@ -1709,7 +1719,7 @@ public class AMQChannel { if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing()) { - getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); + messageWithSubject(ChannelMessages.FLOW_REMOVED()); getConnection().notifyWork(); } } @@ -1746,9 +1756,9 @@ public class AMQChannel return _blocking.get(); } - public VirtualHost<?> getVirtualHost() + public NamedAddressSpace getAddressSpace() { - return getConnection().getVirtualHost(); + return getConnection().getAddressSpace(); } public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) @@ -1776,10 +1786,9 @@ public class AMQChannel @Override public void performAction(final MessageInstance requeueEntry) { - getVirtualHost().getEventLogger().message(_logSubject, - ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), - requeueEntry.getOwningResource() - .getName())); + messageWithSubject(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + requeueEntry.getOwningResource() + .getName())); } }, null); } @@ -1802,10 +1811,9 @@ public class AMQChannel "No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); } - getVirtualHost().getEventLogger().message(_logSubject, - ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), - queue.getName(), - msg.getInitialRoutingAddress())); + messageWithSubject(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), + queue.getName(), + msg.getInitialRoutingAddress())); } else @@ -1816,9 +1824,8 @@ public class AMQChannel "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); } - getVirtualHost().getEventLogger().message(_logSubject, - ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), - altExchange.getName())); + messageWithSubject(ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), + altExchange.getName())); } } } @@ -2109,7 +2116,7 @@ public class AMQChannel } AMQShortString consumerTag1 = consumerTag; - VirtualHost<?> vHost = _connection.getVirtualHost(); + NamedAddressSpace vHost = _connection.getAddressSpace(); sync(); String queueName = AMQShortString.toString(queue); @@ -2119,7 +2126,7 @@ public class AMQChannel { sources.add(queue1); } - else if (vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") + else if (_connection.getContextProvider().getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") && arguments != null && arguments.get("x-multiqueue") instanceof Collection) { @@ -2234,7 +2241,7 @@ public class AMQChannel _logger.debug("RECV[" + _channelId + "] BasicGet[" +" queue: " + queueName + " noAck: " + noAck + " ]"); } - VirtualHost<?> vHost = _connection.getVirtualHost(); + NamedAddressSpace vHost = _connection.getAddressSpace(); sync(); MessageSource queue = queueName == null ? getDefaultQueue() : vHost.getAttainedMessageSource(queueName.toString()); if (queue == null) @@ -2307,11 +2314,11 @@ public class AMQChannel - VirtualHost<?> vHost = _connection.getVirtualHost(); + NamedAddressSpace vHost = _connection.getAddressSpace(); if(blockingTimeoutExceeded()) { - getVirtualHost().getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED()); + message(ChannelMessages.FLOW_CONTROL_IGNORED()); closeChannel(AMQConstant.MESSAGE_TOO_LARGE, "Channel flow control was requested, but not enforced by sender"); } @@ -2354,6 +2361,12 @@ public class AMQChannel } } + @Override + public EventLogger getEventLogger() + { + return getConnection().getEventLogger(); + } + private boolean blockingTimeoutExceeded() { @@ -2689,7 +2702,7 @@ public class AMQChannel routingKey + " queue: " + queueName + " ]"); } - VirtualHost<?> virtualHost = _connection.getVirtualHost(); + NamedAddressSpace virtualHost = _connection.getAddressSpace(); MethodRegistry methodRegistry = _connection.getMethodRegistry(); sync(); @@ -2703,9 +2716,9 @@ public class AMQChannel { if (queueName == null) { - replyCode = virtualHost.getChildren(Queue.class).isEmpty() - ? ExchangeBoundOkBody.NO_BINDINGS - : ExchangeBoundOkBody.OK; + replyCode = virtualHost.hasMessageSources() + ? ExchangeBoundOkBody.OK + : ExchangeBoundOkBody.NO_BINDINGS; replyText = null; } @@ -2728,14 +2741,15 @@ public class AMQChannel { if (queueName == null) { - replyCode = virtualHost.getAttainedChildFromAddress(Queue.class, routingKey.toString()) == null - ? ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK - : ExchangeBoundOkBody.OK; + replyCode = virtualHost.getAttainedMessageDestination(routingKey.toString()) instanceof Queue + ? ExchangeBoundOkBody.OK + : ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK; replyText = null; } else { - Queue<?> queue = virtualHost.getAttainedChildFromAddress(Queue.class, queueName.toString()); + MessageDestination destination = virtualHost.getAttainedMessageDestination(queueName.toString()); + Queue<?> queue = destination instanceof Queue ? (Queue) destination : null; if (queue == null) { @@ -2754,7 +2768,7 @@ public class AMQChannel } else { - Exchange<?> exchange = virtualHost.getAttainedChildFromAddress(Exchange.class, exchangeName.toString()); + Exchange<?> exchange = getExchange(exchangeName.toString()); if (exchange == null) { @@ -2778,8 +2792,7 @@ public class AMQChannel } else { - - Queue<?> queue = virtualHost.getAttainedChildFromAddress(Queue.class, queueName.toString()); + Queue<?> queue = getQueue(queueName.toString()); if (queue == null) { replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; @@ -2806,7 +2819,7 @@ public class AMQChannel } else if (queueName != null) { - Queue<?> queue = virtualHost.getAttainedChildFromAddress(Queue.class, queueName.toString()); + Queue<?> queue = getQueue(queueName.toString()); if (queue == null) { replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; @@ -2878,7 +2891,7 @@ public class AMQChannel final AMQMethodBody declareOkBody = methodRegistry.createExchangeDeclareOkBody(); Exchange<?> exchange; - VirtualHost<?> virtualHost = _connection.getVirtualHost(); + NamedAddressSpace virtualHost = _connection.getAddressSpace(); if (isDefaultExchange(exchangeName)) { @@ -2900,7 +2913,7 @@ public class AMQChannel { if (passive) { - exchange = virtualHost.getAttainedChildFromAddress(Exchange.class, exchangeName.toString()); + exchange = getExchange(exchangeName.toString()); if (exchange == null) { closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: '" + exchangeName + "'"); @@ -2944,7 +2957,7 @@ public class AMQChannel { attributes.put(Exchange.ALTERNATE_EXCHANGE, null); } - exchange = virtualHost.createChild(Exchange.class, attributes); + exchange = virtualHost.createMessageDestination(Exchange.class, attributes); if (!nowait) { @@ -2955,7 +2968,7 @@ public class AMQChannel } catch (ReservedExchangeNameException e) { - Exchange existing = virtualHost.getAttainedChildFromAddress(Exchange.class, exchangeName.toString()); + Exchange existing = getExchange(exchangeName.toString()); if (existing != null && new AMQShortString(existing.getType()).equals(type)) { sync(); @@ -3033,7 +3046,7 @@ public class AMQChannel } - VirtualHost<?> virtualHost = _connection.getVirtualHost(); + NamedAddressSpace virtualHost = _connection.getAddressSpace(); sync(); if (isDefaultExchange(exchangeStr)) @@ -3047,7 +3060,7 @@ public class AMQChannel { final String exchangeName = exchangeStr.toString(); - final Exchange<?> exchange = virtualHost.getAttainedChildFromAddress(Exchange.class, exchangeName); + final Exchange<?> exchange = getExchange(exchangeName); if (exchange == null) { closeChannel(AMQConstant.NOT_FOUND, "No such exchange: '" + exchangeStr + "'"); @@ -3103,7 +3116,7 @@ public class AMQChannel " nowait: " + nowait + " arguments: " + argumentsTable + " ]"); } - VirtualHost<?> virtualHost = _connection.getVirtualHost(); + NamedAddressSpace virtualHost = _connection.getAddressSpace(); Queue<?> queue; if (queueName == null) { @@ -3120,7 +3133,7 @@ public class AMQChannel } else { - queue = virtualHost.getAttainedChildFromAddress(Queue.class, queueName.toString()); + queue = getQueue(queueName.toString()); routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey; } @@ -3142,7 +3155,7 @@ public class AMQChannel final String exchangeName = exchange.toString(); - final Exchange<?> exch = virtualHost.getAttainedChildFromAddress(Exchange.class, exchangeName); + final Exchange<?> exch = getExchange(exchangeName); if (exch == null) { closeChannel(AMQConstant.NOT_FOUND, @@ -3212,7 +3225,7 @@ public class AMQChannel " autoDelete: " + autoDelete + " nowait: " + nowait + " arguments: " + arguments + " ]"); } - VirtualHost<?> virtualHost = _connection.getVirtualHost(); + NamedAddressSpace virtualHost = _connection.getAddressSpace(); final AMQShortString queueName; @@ -3233,7 +3246,7 @@ public class AMQChannel if (passive) { - queue = virtualHost.getAttainedChildFromAddress(Queue.class, queueName.toString()); + queue = getQueue(queueName.toString()); if (queue == null) { closeChannel(AMQConstant.NOT_FOUND, @@ -3309,7 +3322,7 @@ public class AMQChannel attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); } - queue = virtualHost.createChild(Queue.class, attributes); + queue = virtualHost.createMessageSource(Queue.class, attributes); setDefaultQueue(queue); @@ -3408,7 +3421,7 @@ public class AMQChannel _logger.debug("RECV[" + _channelId + "] QueueDelete[" +" queue: " + queueName + " ifUnused: " + ifUnused + " ifEmpty: " + ifEmpty + " nowait: " + nowait + " ]"); } - VirtualHost<?> virtualHost = _connection.getVirtualHost(); + NamedAddressSpace virtualHost = _connection.getAddressSpace(); sync(); Queue<?> queue; if (queueName == null) @@ -3419,7 +3432,7 @@ public class AMQChannel } else { - queue = virtualHost.getAttainedChildFromAddress(Queue.class, queueName.toString()); + queue = getQueue(queueName.toString()); } if (queue == null) @@ -3478,14 +3491,14 @@ public class AMQChannel _logger.debug("RECV[" + _channelId + "] QueuePurge[" +" queue: " + queueName + " nowait: " + nowait + " ]"); } - VirtualHost<?> virtualHost = _connection.getVirtualHost(); + NamedAddressSpace virtualHost = _connection.getAddressSpace(); Queue<?> queue = null; if (queueName == null && (queue = getDefaultQueue()) == null) { _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "No queue specified.", getChannelId()); } - else if ((queueName != null) && (queue = virtualHost.getAttainedChildFromAddress(Queue.class, queueName.toString())) == null) + else if ((queueName != null) && (queue = getQueue(queueName.toString())) == null) { closeChannel(AMQConstant.NOT_FOUND, "Queue '" + queueName + "' does not exist."); } @@ -3531,13 +3544,13 @@ public class AMQChannel " arguments: " + arguments + " ]"); } - VirtualHost<?> virtualHost = _connection.getVirtualHost(); + NamedAddressSpace virtualHost = _connection.getAddressSpace(); final boolean useDefaultQueue = queueName == null; final Queue<?> queue = useDefaultQueue ? getDefaultQueue() - : virtualHost.getAttainedChildFromAddress(Queue.class, queueName.toString()); + : getQueue(queueName.toString()); if (queue == null) @@ -3557,7 +3570,7 @@ public class AMQChannel else { - final Exchange<?> exch = virtualHost.getAttainedChildFromAddress(Exchange.class, exchange.toString()); + final Exchange<?> exch = getExchange(exchange.toString()); if (exch == null) { @@ -3814,4 +3827,16 @@ public class AMQChannel { return _tag2SubscriptionTargetMap.values(); } + + private Exchange<?> getExchange(String name) + { + MessageDestination destination = getAddressSpace().getAttainedMessageDestination(name); + return destination instanceof Exchange ? (Exchange<?>) destination : null; + } + + private Queue<?> getQueue(String name) + { + MessageSource source = getAddressSpace().getAttainedMessageSource(name); + return source instanceof Queue ? (Queue<?>) source : null; + } } Added: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1745091&view=auto ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (added) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Sun May 22 21:03:46 2016 @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.logging.EventLoggerProvider; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ContextProvider; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.transport.ProtocolEngine; + +interface AMQPConnection_0_8<C extends AMQPConnection_0_8<C>> extends AMQPConnection<C>, ProtocolEngine, EventLoggerProvider +{ + Broker<?> getBroker(); + + MethodRegistry getMethodRegistry(); + + void writeFrame(AMQDataBlock frame); + + void sendConnectionClose(AMQConstant errorCode, + String message, int channelId); + + boolean isCloseWhenNoRoute(); + + ContextProvider getContextProvider(); + + boolean isClosing(); + + void closeChannelOk(int channelId); + + int getBinaryDataLimit(); + + long getMaxMessageSize(); + + boolean ignoreAllButCloseOk(); + + boolean channelAwaitingClosure(int channelId); + + ProtocolVersion getProtocolVersion(); + + void closeChannel(AMQChannel amqChannel); + + boolean isSendQueueDeleteOkRegardless(); + + void closeChannelAndWriteFrame(AMQChannel amqChannel, AMQConstant cause, String message); + + ProtocolOutputConverter getProtocolOutputConverter(); + + Object getReference(); + + ClientDeliveryMethod createDeliveryMethod(int channelId); + + void setDeferFlush(boolean batch); +} Propchange: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java ------------------------------------------------------------------------------ svn:eol-style = native Copied: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java (from r1744872, qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?p2=qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java&p1=qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java&r1=1744872&r2=1745091&rev=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java Sun May 22 21:03:46 2016 @@ -60,8 +60,8 @@ import org.apache.qpid.configuration.Com import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.Protocol; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.ConnectionClosingTicker; import org.apache.qpid.server.security.*; import org.apache.qpid.server.transport.AbstractAMQPConnection; @@ -72,7 +72,6 @@ import org.apache.qpid.server.logging.me import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -87,9 +86,9 @@ import org.apache.qpid.transport.ByteBuf import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.AggregateTicker; -public class AMQPConnection_0_8 - extends AbstractAMQPConnection<AMQPConnection_0_8> - implements ServerMethodProcessor<ServerChannelMethodProcessor> +public class AMQPConnection_0_8Impl + extends AbstractAMQPConnection<AMQPConnection_0_8Impl> + implements ServerMethodProcessor<ServerChannelMethodProcessor>, AMQPConnection_0_8<AMQPConnection_0_8Impl> { enum ConnectionState @@ -102,7 +101,7 @@ public class AMQPConnection_0_8 OPEN } - private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_8.class); + private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_8Impl.class); private static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength"; private static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80; @@ -133,7 +132,7 @@ public class AMQPConnection_0_8 private volatile ProtocolVersion _protocolVersion; private volatile MethodRegistry _methodRegistry; - private final Queue<Action<? super AMQPConnection_0_8>> _asyncTaskList = + private final Queue<Action<? super AMQPConnection_0_8Impl>> _asyncTaskList = new ConcurrentLinkedQueue<>(); private final Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>(); @@ -166,13 +165,13 @@ public class AMQPConnection_0_8 private final int _binaryDataLimit; private volatile boolean _transportBlockedForWriting; - public AMQPConnection_0_8(Broker<?> broker, - ServerNetworkConnection network, - AmqpPort<?> port, - Transport transport, - Protocol protocol, - long connectionId, - AggregateTicker aggregateTicker) + public AMQPConnection_0_8Impl(Broker<?> broker, + ServerNetworkConnection network, + AmqpPort<?> port, + Transport transport, + Protocol protocol, + long connectionId, + AggregateTicker aggregateTicker) { super(broker, network, port, transport, protocol, connectionId, aggregateTicker); @@ -260,7 +259,7 @@ public class AMQPConnection_0_8 } catch (StoreException e) { - if (getVirtualHost().getState() == State.ACTIVE) + if (getAddressSpace().isActive()) { throw new ServerScopedRuntimeException(e); } @@ -442,7 +441,7 @@ public class AMQPConnection_0_8 } - void closeChannel(AMQChannel channel) + public void closeChannel(AMQChannel channel) { closeChannel(channel, null, null, false); } @@ -542,7 +541,7 @@ public class AMQPConnection_0_8 } } - void sendConnectionClose(AMQConstant errorCode, + public void sendConnectionClose(AMQConstant errorCode, String message, int channelId) { sendConnectionClose(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _currentClassId, _currentMethodId))); @@ -710,7 +709,7 @@ public class AMQPConnection_0_8 { performDeleteTasks(); - final VirtualHost<?> virtualHost = getVirtualHost(); + final NamedAddressSpace virtualHost = getAddressSpace(); if (virtualHost != null) { virtualHost.deregisterConnection(this); @@ -770,11 +769,11 @@ public class AMQPConnection_0_8 public void closeSessionAsync(final AMQSessionModel<?> session, final AMQConstant cause, final String message) { - addAsyncTask(new Action<AMQPConnection_0_8>() + addAsyncTask(new Action<AMQPConnection_0_8Impl>() { @Override - public void performAction(final AMQPConnection_0_8 object) + public void performAction(final AMQPConnection_0_8Impl object) { int channelId = session.getChannelId(); closeChannel(channelId, cause, message); @@ -795,10 +794,10 @@ public class AMQPConnection_0_8 @Override public void sendConnectionCloseAsync(final AMQConstant cause, final String message) { - Action<AMQPConnection_0_8> action = new Action<AMQPConnection_0_8>() + Action<AMQPConnection_0_8Impl> action = new Action<AMQPConnection_0_8Impl>() { @Override - public void performAction(final AMQPConnection_0_8 object) + public void performAction(final AMQPConnection_0_8Impl object) { AMQConnectionException e = new AMQConnectionException(cause, message, 0, 0, getMethodRegistry(), @@ -809,7 +808,7 @@ public class AMQPConnection_0_8 addAsyncTask(action); } - private void addAsyncTask(final Action<AMQPConnection_0_8> action) + private void addAsyncTask(final Action<AMQPConnection_0_8Impl> action) { _asyncTaskList.add(action); notifyWork(); @@ -879,7 +878,7 @@ public class AMQPConnection_0_8 assertState(ConnectionState.OPEN); // Protect the broker against out of order frame request. - final VirtualHost<?> virtualHost = getVirtualHost(); + final NamedAddressSpace virtualHost = getAddressSpace(); if (virtualHost == null) { sendConnectionClose(AMQConstant.COMMAND_INVALID, @@ -942,9 +941,9 @@ public class AMQPConnection_0_8 virtualHostStr = virtualHostStr.substring(1); } - VirtualHost<?> virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr); + NamedAddressSpace addressSpace = ((AmqpPort)getPort()).getAddressSpace(virtualHostStr); - if (virtualHost == null) + if (addressSpace == null) { sendConnectionClose(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'", 0); @@ -953,9 +952,9 @@ public class AMQPConnection_0_8 else { // Check virtualhost access - if (virtualHost.getState() != State.ACTIVE) + if (!addressSpace.isActive()) { - String redirectHost = virtualHost.getRedirectHost(getPort()); + String redirectHost = addressSpace.getRedirectHost(getPort()); if(redirectHost != null) { sendConnectionClose(0, new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), AMQShortString.valueOf(redirectHost), null))); @@ -963,7 +962,7 @@ public class AMQPConnection_0_8 else { sendConnectionClose(AMQConstant.CONNECTION_FORCED, - "Virtual host '" + virtualHost.getName() + "' is not active", 0); + "Virtual host '" + addressSpace.getName() + "' is not active", 0); } } @@ -971,9 +970,9 @@ public class AMQPConnection_0_8 { try { - setVirtualHost(virtualHost); + setAddressSpace(addressSpace); - if(virtualHost.authoriseCreateConnection(this)) + if(addressSpace.authoriseCreateConnection(this)) { MethodRegistry methodRegistry = getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName); @@ -1462,13 +1461,13 @@ public class AMQPConnection_0_8 } else if(!_asyncTaskList.isEmpty()) { - final Action<? super AMQPConnection_0_8> asyncAction = _asyncTaskList.poll(); + final Action<? super AMQPConnection_0_8Impl> asyncAction = _asyncTaskList.poll(); return new Runnable() { @Override public void run() { - asyncAction.performAction(AMQPConnection_0_8.this); + asyncAction.performAction(AMQPConnection_0_8Impl.this); } }; } Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java Sun May 22 21:03:46 2016 @@ -36,13 +36,13 @@ import org.apache.qpid.server.util.Serve public class BrokerDecoder extends ServerDecoder { private static final Logger _logger = LoggerFactory.getLogger(BrokerDecoder.class); - private final AMQPConnection_0_8 _connection; + private final AMQPConnection_0_8Impl _connection; /** * Creates a new AMQP decoder. * * @param connection */ - public BrokerDecoder(final AMQPConnection_0_8 connection) + public BrokerDecoder(final AMQPConnection_0_8Impl connection) { super(connection); _connection = connection; Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1745091&r1=1745090&r2=1745091&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Sun May 22 21:03:46 2016 @@ -149,7 +149,7 @@ public abstract class ConsumerTarget_0_8 { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); - _txn = new AutoCommitTransaction(channel.getVirtualHost().getMessageStore()); + _txn = new AutoCommitTransaction(channel.getAddressSpace().getMessageStore()); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
