Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Fri Oct 17 14:23:19 2014 @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.store.StoredMessage;
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Fri Oct 17 14:23:19 2014 @@ -21,9 +21,13 @@ package org.apache.qpid.server.protocol.v0_8; import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.AccessControlException; import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; @@ -41,11 +45,11 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.security.auth.Subject; +import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; -import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQDecoder; @@ -54,7 +58,6 @@ import org.apache.qpid.common.ServerProp import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; @@ -67,15 +70,15 @@ import org.apache.qpid.server.message.In import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.SessionModelListener; -import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; -import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; -import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -86,7 +89,9 @@ import org.apache.qpid.transport.Transpo import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; -public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine> +public class AMQProtocolEngine implements ServerProtocolEngine, + AMQConnectionModel<AMQProtocolEngine, AMQChannel>, + ServerMethodProcessor<ServerChannelMethodProcessor> { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -94,6 +99,8 @@ public class AMQProtocolEngine implement // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; + public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength"; + public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80; private final Port<?> _port; private final long _creationTime; @@ -105,13 +112,12 @@ public class AMQProtocolEngine implement private VirtualHostImpl<?,?,?> _virtualHost; - private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap = - new HashMap<Integer, AMQChannel<AMQProtocolEngine>>(); + private final Map<Integer, AMQChannel> _channelMap = + new HashMap<>(); private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = - new CopyOnWriteArrayList<SessionModelListener>(); + new CopyOnWriteArrayList<>(); - @SuppressWarnings("unchecked") - private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; + private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; /** * The channels that the latest call to {@link #received(ByteBuffer)} applied to. @@ -120,10 +126,8 @@ public class AMQProtocolEngine implement * * Thread-safety: guarded by {@link #_receivedLock}. */ - private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage = - new HashSet<AMQChannel<AMQProtocolEngine>>(); - - private final AMQStateManager _stateManager; + private final Set<AMQChannel> _channelsForCurrentMessage = + new HashSet<>(); private AMQDecoder _decoder; @@ -136,14 +140,13 @@ public class AMQProtocolEngine implement /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); - private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); + private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); private final List<Action<? super AMQProtocolEngine>> _taskList = - new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>(); + new CopyOnWriteArrayList<>(); - private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>(); + private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>(); private ProtocolOutputConverter _protocolOutputConverter; private final Subject _authorizedSubject = new Subject(); - private MethodDispatcher _dispatcher; private final long _connectionID; private Object _reference = new Object(); @@ -177,6 +180,9 @@ public class AMQProtocolEngine implement private boolean _authenticated; private boolean _compressionSupported; private int _messageCompressionThreshold; + private int _currentClassId; + private int _currentMethodId; + private int _binaryDataLimit; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -189,11 +195,12 @@ public class AMQProtocolEngine implement _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); - _stateManager = new AMQStateManager(broker, this); - _decoder = new AMQDecoder(true, this); + _decoder = new BrokerDecoder(this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); - + _binaryDataLimit = _broker.getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH) + ? _broker.getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH) + : DEFAULT_DEBUG_BINARY_DATA_LENGTH; _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); runAsSubject(new PrivilegedAction<Void>() { @@ -303,28 +310,7 @@ public class AMQProtocolEngine implement _receivedLock.lock(); try { - final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); - for (AMQDataBlock dataBlock : dataBlocks) - { - try - { - dataBlockReceived(dataBlock); - } - catch(AMQConnectionException e) - { - if(_logger.isDebugEnabled()) - { - _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e); - } - break; - } - catch (AMQException e) - { - _logger.error("Unexpected exception when processing datablock", e); - closeProtocolSession(); - break; - } - } + _decoder.decodeBuffer(msg); receivedComplete(); } catch (ConnectionScopedRuntimeException e) @@ -366,7 +352,7 @@ public class AMQProtocolEngine implement { RuntimeException exception = null; - for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage) + for (AMQChannel channel : _channelsForCurrentMessage) { try { @@ -391,112 +377,10 @@ public class AMQProtocolEngine implement } } - /** - * Process the data block. - * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}. - * - * @throws AMQConnectionException if unable to process the data block. In this case, - * the connection is already closed by the time the exception is thrown. If any other - * type of exception is thrown, the connection is not already closed. - */ - private void dataBlockReceived(AMQDataBlock message) throws AMQException - { - if (message instanceof ProtocolInitiation) - { - protocolInitiationReceived((ProtocolInitiation) message); - - } - else if (message instanceof AMQFrame) - { - AMQFrame frame = (AMQFrame) message; - frameReceived(frame); - - } - else - { - throw new AMQException("Unknown message type: " + message.getClass().getName() + ": " + message); - } - } - /** - * Handle the supplied frame. - * Adds this frame's channel to {@link #_channelsForCurrentMessage}. - * - * @throws AMQConnectionException if unable to process the data block. In this case, - * the connection is already closed by the time the exception is thrown. If any other - * type of exception is thrown, the connection is not already closed. - */ - private void frameReceived(AMQFrame frame) throws AMQException + void channelRequiresSync(final AMQChannel amqChannel) { - int channelId = frame.getChannel(); - AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId); - if(amqChannel != null) - { - // The _receivedLock is already acquired in the caller - // It is safe to add channel - _channelsForCurrentMessage.add(amqChannel); - } - else - { - // Not an error. The frame is probably a channel Open for this channel id, which - // does not require asynchronous work therefore its absence from - // _channelsForCurrentMessage is ok. - } - - AMQBody body = frame.getBodyFrame(); - - long startTime = 0; - String frameToString = null; - if (_logger.isDebugEnabled()) - { - startTime = System.currentTimeMillis(); - frameToString = frame.toString(); - _logger.debug("RECV: " + frame); - } - - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) - { - if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } - } - else - { - // The channel has been told to close, we don't process any more frames until - // it's closed. - return; - } - } - - try - { - body.handle(channelId, this); - } - catch(AMQConnectionException e) - { - _logger.info(e.getMessage() + " whilst processing frame: " + body); - closeConnection(channelId, e); - throw e; - } - catch (AMQException e) - { - closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage()); - throw e; - } - catch (TransportException e) - { - closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage()); - throw e; - } - - if(_logger.isDebugEnabled()) - { - _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString); - } + _channelsForCurrentMessage.add(amqChannel); } private synchronized void protocolInitiationReceived(ProtocolInitiation pi) @@ -615,87 +499,6 @@ public class AMQProtocolEngine implement return buf; } - public void methodFrameReceived(int channelId, AMQMethodBody methodBody) - { - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); - - try - { - try - { - boolean wasAnyoneInterested = _stateManager.methodReceived(evt); - - if (!wasAnyoneInterested) - { - throw new AMQNoMethodHandlerException(evt); - } - } - catch (AMQChannelException e) - { - if (getChannel(channelId) != null) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing channel due to: " + e.getMessage()); - } - - writeFrame(e.getCloseFrame(channelId)); - closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage()); - } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug("ChannelException occurred on non-existent channel:" + e.getMessage()); - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e.getMessage()); - } - - AMQConnectionException ce = - evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); - - _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce); - } - } - catch (AMQConnectionException e) - { - _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, e); - } - } - catch (Exception e) - { - _logger.error("Unexpected exception while processing frame. Closing connection.", e); - - closeProtocolSession(); - } - } - - public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException - { - - AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId); - - channel.publishContentHeader(body); - - } - - public void contentBodyReceived(int channelId, ContentBody body) throws AMQException - { - AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId); - - channel.publishContentBody(body); - } - - public void heartbeatBodyReceived(int channelId, HeartbeatBody body) - { - // NO - OP - } /** * Convenience method that writes a frame to the protocol session. Equivalent to calling @@ -735,28 +538,17 @@ public class AMQProtocolEngine implement _contextKey = contextKey; } - public List<AMQChannel<AMQProtocolEngine>> getChannels() + public List<AMQChannel> getChannels() { synchronized (_channelMap) { - return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values()); - } - } - - public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException - { - AMQChannel<AMQProtocolEngine> channel = getChannel(channelId); - if (channel == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); + return new ArrayList<>(_channelMap.values()); } - - return channel; } - public AMQChannel<AMQProtocolEngine> getChannel(int channelId) + public AMQChannel getChannel(int channelId) { - final AMQChannel<AMQProtocolEngine> channel = + final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); if ((channel == null) || channel.isClosing()) { @@ -773,38 +565,17 @@ public class AMQProtocolEngine implement return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId); } - public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException + public void addChannel(AMQChannel channel) { - if (_closed) - { - throw new AMQException("Session is closed"); - } - final int channelId = channel.getChannelId(); - if (_closingChannelsList.containsKey(channelId)) - { - throw new AMQException("Session is marked awaiting channel close"); - } - - if (_channelMap.size() == _maxNoOfChannels) - { - String errorMessage = - toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels - + "); can't create channel"; - _logger.error(errorMessage); - throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); - } - else + synchronized (_channelMap) { - synchronized (_channelMap) + _channelMap.put(channel.getChannelId(), channel); + sessionAdded(channel); + if(_blocking) { - _channelMap.put(channel.getChannelId(), channel); - sessionAdded(channel); - if(_blocking) - { - channel.block(); - } + channel.block(); } } @@ -830,7 +601,7 @@ public class AMQProtocolEngine implement } } - public Long getMaximumNumberOfChannels() + public long getMaximumNumberOfChannels() { return _maxNoOfChannels; } @@ -840,52 +611,52 @@ public class AMQProtocolEngine implement _maxNoOfChannels = value; } - /** - * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue - * subscriptions (this may in turn remove queues if they are auto delete</li> </ul> - * - * @param channelId id of the channel to close - * - * @throws IllegalArgumentException if the channel id is not valid - */ - @Override - public void closeChannel(int channelId) + + void closeChannel(AMQChannel channel) + { + closeChannel(channel, null, null, false); + } + + public void closeChannelAndWriteFrame(AMQChannel channel, AMQConstant cause, String message) { - closeChannel(channelId, null, null); + writeFrame(new AMQFrame(channel.getChannelId(), + getMethodRegistry().createChannelCloseBody(cause.getCode(), + AMQShortString.validValueOf(message), + _currentClassId, + _currentMethodId))); + closeChannel(channel, cause, message, true); } public void closeChannel(int channelId, AMQConstant cause, String message) { - final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId); + final AMQChannel channel = getChannel(channelId); if (channel == null) { throw new IllegalArgumentException("Unknown channel id"); } - else + closeChannel(channel, cause, message, true); + } + + void closeChannel(AMQChannel channel, AMQConstant cause, String message, boolean mark) + { + int channelId = channel.getChannelId(); + try { - try + channel.close(cause, message); + if(mark) { - channel.close(cause, message); markChannelAwaitingCloseOk(channelId); } - finally - { - removeChannel(channelId); - } + } + finally + { + removeChannel(channelId); } } + public void closeChannelOk(int channelId) { - // todo QPID-847 - This is called from two locations ChannelCloseHandler and ChannelCloseOkHandler. - // When it is the CC_OK_Handler then it makes sense to remove the channel else we will leak memory. - // We do it from the Close Handler as we are sending the OK back to the client. - // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException - // will send a close-ok.. Where we should call removeChannel. - // However, due to the poor exception handling on the client. The client-user will be notified of the - // InvalidArgument and if they then decide to close the session/connection then the there will be time - // for that to occur i.e. a new close method be sent before the exception handling can mark the session closed. - _closingChannelsList.remove(channelId); } @@ -901,7 +672,7 @@ public class AMQProtocolEngine implement */ public void removeChannel(int channelId) { - AMQChannel<AMQProtocolEngine> session; + AMQChannel session; synchronized (_channelMap) { session = _channelMap.remove(channelId); @@ -937,7 +708,7 @@ public class AMQProtocolEngine implement */ private void closeAllChannels() { - for (AMQChannel<AMQProtocolEngine> channel : getChannels()) + for (AMQChannel channel : getChannels()) { channel.close(); } @@ -952,7 +723,6 @@ public class AMQProtocolEngine implement } /** This must be called when the session is _closed in order to free up any resources managed by the session. */ - @Override public void closeSession() { @@ -1042,13 +812,30 @@ public class AMQProtocolEngine implement private void closeConnection(int channelId, AMQConnectionException e) { - try + + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e); - } + _logger.info("Closing connection due to: " + e); + } + closeConnection(channelId, e.getCloseFrame()); + } + + void closeConnection(AMQConstant errorCode, + String message, int channelId) + { + + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + message); + } + closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _currentClassId, _currentMethodId))); + } + + private void closeConnection(int channelId, AMQFrame frame) + { + try + { markChannelAwaitingCloseOk(channelId); closeSession(); } @@ -1056,8 +843,7 @@ public class AMQProtocolEngine implement { try { - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); + writeFrame(frame); } finally { @@ -1068,23 +854,9 @@ public class AMQProtocolEngine implement } - @Override public void closeProtocolSession() { _network.close(); - - try - { - _stateManager.changeState(AMQState.CONNECTION_CLOSED); - } - catch (ConnectionScopedRuntimeException e) - { - _logger.info(e.getMessage()); - } - catch (TransportException e) - { - _logger.info(e.getMessage()); - } } public String toString() @@ -1174,9 +946,8 @@ public class AMQProtocolEngine implement private void setProtocolVersion(ProtocolVersion pv) { _protocolVersion = pv; - _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); - _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this); - _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion); + _methodRegistry.setProtocolVersion(_protocolVersion); + _protocolOutputConverter = new ProtocolOutputConverterImpl(this); } public byte getProtocolMajorVersion() @@ -1204,7 +975,7 @@ public class AMQProtocolEngine implement return _virtualHost; } - public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException + public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) { _virtualHost = virtualHost; @@ -1285,11 +1056,6 @@ public class AMQProtocolEngine implement return _methodRegistry; } - public MethodDispatcher getMethodDispatcher() - { - return _dispatcher; - } - public void closed() { try @@ -1303,14 +1069,10 @@ public class AMQProtocolEngine implement closeProtocolSession(); } } - catch (ConnectionScopedRuntimeException e) + catch (ConnectionScopedRuntimeException | TransportException e) { _logger.error("Could not close protocol engine", e); } - catch (TransportException e) - { - _logger.error("Could not close protocol engine", e); - } } public void readerIdle() @@ -1351,9 +1113,11 @@ public class AMQProtocolEngine implement { _logger.error("Exception caught in " + this + ", closing connection explicitly: " + throwable, throwable); - - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion()); - ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200, AMQShortString.validValueOf(throwable.getMessage()),0,0); + ConnectionCloseBody closeBody = _methodRegistry.createConnectionCloseBody(AMQConstant.INTERNAL_ERROR.getCode(), + AMQShortString.validValueOf( + throwable.getMessage()), + _currentClassId, + _currentMethodId); writeFrame(closeBody.generateFrame(0)); @@ -1374,11 +1138,6 @@ public class AMQProtocolEngine implement } } - public void setSender(Sender<ByteBuffer> sender) - { - // Do nothing - } - public long getReadBytes() { return _readBytes; @@ -1460,7 +1219,7 @@ public class AMQProtocolEngine implement return String.valueOf(getRemoteAddress()); } - public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message) + public void closeSession(AMQChannel session, AMQConstant cause, String message) { int channelId = session.getChannelId(); closeChannel(channelId, cause, message); @@ -1470,7 +1229,7 @@ public class AMQProtocolEngine implement methodRegistry.createChannelCloseBody( cause.getCode(), AMQShortString.validValueOf(message), - 0,0); + 0, 0); writeFrame(responseBody.generateFrame(channelId)); } @@ -1478,9 +1237,8 @@ public class AMQProtocolEngine implement public void close(AMQConstant cause, String message) { closeConnection(0, new AMQConnectionException(cause, message, 0, 0, - getProtocolOutputConverter().getProtocolMajorVersion(), - getProtocolOutputConverter().getProtocolMinorVersion(), - null)); + getMethodRegistry(), + null)); } public void block() @@ -1490,7 +1248,7 @@ public class AMQProtocolEngine implement if(!_blocking) { _blocking = true; - for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values()) + for(AMQChannel channel : _channelMap.values()) { channel.block(); } @@ -1505,7 +1263,7 @@ public class AMQProtocolEngine implement if(_blocking) { _blocking = false; - for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values()) + for(AMQChannel channel : _channelMap.values()) { channel.unblock(); } @@ -1518,9 +1276,9 @@ public class AMQProtocolEngine implement return _closed; } - public List<AMQChannel<AMQProtocolEngine>> getSessionModels() + public List<AMQChannel> getSessionModels() { - return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels()); + return new ArrayList<>(getChannels()); } public LogSubject getLogSubject() @@ -1609,6 +1367,382 @@ public class AMQProtocolEngine implement _deferFlush = deferFlush; } + @Override + public void receiveChannelOpen(final int channelId) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + channelId + "] ChannelOpen"); + } + + // Protect the broker against out of order frame request. + if (_virtualHost == null) + { + closeConnection(AMQConstant.COMMAND_INVALID, + "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId); + } + else if(getChannel(channelId) != null || channelAwaitingClosure(channelId)) + { + closeConnection(AMQConstant.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId); + } + else if(channelId > getMaximumNumberOfChannels()) + { + closeConnection(AMQConstant.CHANNEL_ERROR, + "Channel " + channelId + " cannot be created as the max allowed channel id is " + + getMaximumNumberOfChannels(), + channelId); + } + else + { + _logger.info("Connecting to: " + _virtualHost.getName()); + + final AMQChannel channel = new AMQChannel(this, channelId, _virtualHost.getMessageStore()); + + addChannel(channel); + + ChannelOpenOkBody response; + + + response = getMethodRegistry().createChannelOpenOkBody(); + + + writeFrame(response.generateFrame(channelId)); + } + } + + @Override + public void receiveConnectionOpen(AMQShortString virtualHostName, + AMQShortString capabilities, + boolean insist) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionOpen[" +" virtualHost: " + virtualHostName + " capabilities: " + capabilities + " insist: " + insist + " ]"); + } + + String virtualHostStr; + if ((virtualHostName != null) && virtualHostName.charAt(0) == '/') + { + virtualHostStr = virtualHostName.toString().substring(1); + } + else + { + virtualHostStr = virtualHostName == null ? null : virtualHostName.toString(); + } + + VirtualHostImpl virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr); + + if (virtualHost == null) + { + closeConnection(AMQConstant.NOT_FOUND, + "Unknown virtual host: '" + virtualHostName + "'",0); + + } + else + { + // Check virtualhost access + if (virtualHost.getState() != State.ACTIVE) + { + closeConnection(AMQConstant.CONNECTION_FORCED, + "Virtual host '" + virtualHost.getName() + "' is not active",0); + + } + else + { + setVirtualHost(virtualHost); + try + { + virtualHost.getSecurityManager().authoriseCreateConnection(this); + if (getContextKey() == null) + { + setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); + } + + MethodRegistry methodRegistry = getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName); + + writeFrame(responseBody.generateFrame(0)); + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(),0); + } + } + } + } + + @Override + public void receiveConnectionClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionClose[" +" replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]"); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("ConnectionClose received with reply code/reply text " + replyCode + "/" + + replyText + " for " + this); + } + try + { + closeSession(); + } + catch (Exception e) + { + _logger.error("Error closing protocol session: " + e, e); + } + + MethodRegistry methodRegistry = getMethodRegistry(); + ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); + writeFrame(responseBody.generateFrame(0)); + + closeProtocolSession(); + + } + + @Override + public void receiveConnectionCloseOk() + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionCloseOk"); + } + + _logger.info("Received Connection-close-ok"); + + try + { + closeSession(); + } + catch (Exception e) + { + _logger.error("Error closing protocol session: " + e, e); + } + } + + @Override + public void receiveConnectionSecureOk(final byte[] response) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionSecureOk[ response: ******** ] "); + } + + Broker<?> broker = getBroker(); + + SubjectCreator subjectCreator = getSubjectCreator(); + + SaslServer ss = getSaslServer(); + if (ss == null) + { + closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in session",0 ); + } + MethodRegistry methodRegistry = getMethodRegistry(); + SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response); + switch (authResult.getStatus()) + { + case ERROR: + Exception cause = authResult.getCause(); + + _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); + + closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed",0); + + disposeSaslServer(); + break; + case SUCCESS: + if (_logger.isInfoEnabled()) + { + _logger.info("Connected as: " + authResult.getSubject()); + } + + int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); + + if (frameMax <= 0) + { + frameMax = Integer.MAX_VALUE; + } + + ConnectionTuneBody tuneBody = + methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), + frameMax, + broker.getConnection_heartBeatDelay()); + writeFrame(tuneBody.generateFrame(0)); + setAuthorizedSubject(authResult.getSubject()); + disposeSaslServer(); + break; + case CONTINUE: + + ConnectionSecureBody + secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); + writeFrame(secureBody.generateFrame(0)); + } + } + + + private void disposeSaslServer() + { + SaslServer ss = getSaslServer(); + if (ss != null) + { + setSaslServer(null); + try + { + ss.dispose(); + } + catch (SaslException e) + { + _logger.error("Error disposing of Sasl server: " + e); + } + } + } + + @Override + public void receiveConnectionStartOk(final FieldTable clientProperties, + final AMQShortString mechanism, + final byte[] response, + final AMQShortString locale) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionStartOk[" + + " clientProperties: " + + clientProperties + + " mechanism: " + + mechanism + + " response: ********" + + " locale: " + + locale + + " ]"); + } + + Broker<?> broker = getBroker(); + + _logger.info("SASL Mechanism selected: " + mechanism); + _logger.info("Locale selected: " + locale); + + SubjectCreator subjectCreator = getSubjectCreator(); + SaslServer ss = null; + try + { + ss = subjectCreator.createSaslServer(String.valueOf(mechanism), + getLocalFQDN(), + getPeerPrincipal()); + + if (ss == null) + { + closeConnection(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + mechanism, 0); + + } + else + { + //save clientProperties + setClientProperties(clientProperties); + + setSaslServer(ss); + + final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response); + + MethodRegistry methodRegistry = getMethodRegistry(); + + switch (authResult.getStatus()) + { + case ERROR: + Exception cause = authResult.getCause(); + + _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); + + closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed", 0); + + disposeSaslServer(); + break; + + case SUCCESS: + if (_logger.isInfoEnabled()) + { + _logger.info("Connected as: " + authResult.getSubject()); + } + setAuthorizedSubject(authResult.getSubject()); + + int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); + + if (frameMax <= 0) + { + frameMax = Integer.MAX_VALUE; + } + + ConnectionTuneBody + tuneBody = + methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), + frameMax, + broker.getConnection_heartBeatDelay()); + writeFrame(tuneBody.generateFrame(0)); + break; + case CONTINUE: + ConnectionSecureBody + secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); + writeFrame(secureBody.generateFrame(0)); + } + } + } + catch (SaslException e) + { + disposeSaslServer(); + closeConnection(AMQConstant.INTERNAL_ERROR, "SASL error: " + e, 0); + } + } + + @Override + public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionTuneOk[" +" channelMax: " + channelMax + " frameMax: " + frameMax + " heartbeat: " + heartbeat + " ]"); + } + + initHeartbeats(heartbeat); + + int brokerFrameMax = getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); + if (brokerFrameMax <= 0) + { + brokerFrameMax = Integer.MAX_VALUE; + } + + if (frameMax > (long) brokerFrameMax) + { + closeConnection(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + frameMax + + " greater than the broker will allow: " + + brokerFrameMax, 0); + } + else if (frameMax > 0 && frameMax < AMQConstant.FRAME_MIN_SIZE.getCode()) + { + closeConnection(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + frameMax + + " which is smaller than the specification defined minimum: " + + AMQConstant.FRAME_MIN_SIZE.getCode(), 0); + } + else + { + int calculatedFrameMax = frameMax == 0 ? brokerFrameMax : (int) frameMax; + setMaxFrameSize(calculatedFrameMax); + + //0 means no implied limit, except that forced by protocol limitations (0xFFFF) + setMaximumNumberOfChannels( ((channelMax == 0l) || (channelMax > 0xFFFFL)) + ? 0xFFFFL + : channelMax); + } + } + + public int getBinaryDataLimit() + { + return _binaryDataLimit; + } + public final class WriteDeliverMethod implements ClientDeliveryMethod { @@ -1656,24 +1790,31 @@ public class AMQProtocolEngine implement return _lastWriteTime.get(); } - @Override public boolean isCloseWhenNoRoute() { return _closeWhenNoRoute; } - @Override public boolean isCompressionSupported() { return _compressionSupported && _broker.isMessageCompressionEnabled(); } - @Override public int getMessageCompressionThreshold() { return _messageCompressionThreshold; } + public Broker<?> getBroker() + { + return _broker; + } + + public SubjectCreator getSubjectCreator() + { + return _broker.getSubjectCreator(getLocalAddress(), getTransport().isSecure()); + } + public EventLogger getEventLogger() { if(_virtualHost != null) @@ -1685,4 +1826,72 @@ public class AMQProtocolEngine implement return _broker.getEventLogger(); } } + + @Override + public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId) + { + ServerChannelMethodProcessor channelMethodProcessor = getChannel(channelId); + if(channelMethodProcessor == null) + { + channelMethodProcessor = (ServerChannelMethodProcessor) Proxy.newProxyInstance(ServerMethodDispatcher.class.getClassLoader(), + new Class[] { ServerChannelMethodProcessor.class }, new InvocationHandler() + { + @Override + public Object invoke(final Object proxy, final Method method, final Object[] args) + throws Throwable + { + if(method.getName().startsWith("receive")) + { + closeConnection(AMQConstant.CHANNEL_ERROR, + "Unknown channel id: " + channelId, + channelId); + return null; + } + else if(method.getName().equals("ignoreAllButCloseOk")) + { + return false; + } + return null; + } + }); + } + return channelMethodProcessor; + } + + @Override + public void receiveHeartbeat() + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV Heartbeat"); + } + + // No op + } + + @Override + public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) + { + + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ProtocolHeader [" + protocolInitiation + " ]"); + } + + protocolInitiationReceived(protocolInitiation); + } + + @Override + public void setCurrentMethod(final int classId, final int methodId) + { + _currentClassId = classId; + _currentMethodId = methodId; + } + + @Override + public boolean ignoreAllButCloseOk() + { + return _closing.get(); + } + } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Oct 17 14:23:19 2014 @@ -39,7 +39,6 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -80,7 +79,7 @@ public abstract class ConsumerTarget_0_8 public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager) throws AMQException + FlowCreditManager creditManager) { return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } @@ -90,7 +89,7 @@ public abstract class ConsumerTarget_0_8 final FieldTable filters, final FlowCreditManager creditManager, final ClientDeliveryMethod deliveryMethod, - final RecordDeliveryMethod recordMethod) throws AMQException + final RecordDeliveryMethod recordMethod) { return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } @@ -107,7 +106,6 @@ public abstract class ConsumerTarget_0_8 FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); @@ -148,7 +146,7 @@ public abstract class ConsumerTarget_0_8 public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager) throws AMQException + FlowCreditManager creditManager) { return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } @@ -171,7 +169,6 @@ public abstract class ConsumerTarget_0_8 FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); @@ -207,7 +204,7 @@ public abstract class ConsumerTarget_0_8 long size; synchronized (getChannel()) { - getChannel().getProtocolSession().setDeferFlush(batch); + getChannel().getConnection().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); size = sendToClient(consumer, message, props, deliveryTag); @@ -249,7 +246,6 @@ public abstract class ConsumerTarget_0_8 FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } @@ -265,7 +261,6 @@ public abstract class ConsumerTarget_0_8 public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager) - throws AMQException { return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } @@ -276,7 +271,6 @@ public abstract class ConsumerTarget_0_8 FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod); } @@ -288,7 +282,6 @@ public abstract class ConsumerTarget_0_8 FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } @@ -308,7 +301,7 @@ public abstract class ConsumerTarget_0_8 synchronized (getChannel()) { - getChannel().getProtocolSession().setDeferFlush(batch); + getChannel().getConnection().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); addUnacknowledgedMessage(entry); @@ -346,7 +339,6 @@ public abstract class ConsumerTarget_0_8 FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { super(State.ACTIVE); @@ -474,9 +466,9 @@ public abstract class ConsumerTarget_0_8 return _consumerTag; } - public AMQProtocolSession getProtocolSession() + public AMQProtocolEngine getProtocolSession() { - return _channel.getProtocolSession(); + return _channel.getConnection(); } public void restoreCredit(final ServerMessage message) @@ -525,7 +517,7 @@ public abstract class ConsumerTarget_0_8 public void confirmAutoClose() { - ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter(); + ProtocolOutputConverter converter = getChannel().getConnection().getProtocolOutputConverter(); converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag()); } @@ -540,9 +532,9 @@ public abstract class ConsumerTarget_0_8 public void flushBatched() { - _channel.getProtocolSession().setDeferFlush(false); + _channel.getConnection().setDeferFlush(false); - _channel.getProtocolSession().flushBatched(); + _channel.getConnection().flushBatched(); } protected void addUnacknowledgedMessage(MessageInstance entry) Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Fri Oct 17 14:23:19 2014 @@ -20,14 +20,13 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.Map; + import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import java.util.Map; - public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor { private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class); @@ -45,7 +44,7 @@ public class ExtractResendAndRequeue imp _msgToResend = msgToResend; } - public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException + public boolean callback(final long deliveryTag, MessageInstance message) { message.setRedelivered(); Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Fri Oct 17 14:23:19 2014 @@ -20,16 +20,15 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; +import java.util.ArrayList; +import java.util.List; + import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.message.MessageDestination; -import java.util.ArrayList; -import java.util.List; - public class IncomingMessage { @@ -58,7 +57,7 @@ public class IncomingMessage return _messagePublishInfo; } - public void addContentBodyFrame(final ContentBody contentChunk) throws AMQException + public void addContentBodyFrame(final ContentBody contentChunk) { _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); @@ -94,7 +93,7 @@ public class IncomingMessage _messageDestination = e; } - public int getBodyCount() throws AMQException + public int getBodyCount() { return _contentChunks.size(); } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Fri Oct 17 14:23:19 2014 @@ -33,7 +33,7 @@ import org.apache.qpid.framing.AMQShortS import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; @@ -135,38 +135,10 @@ public class MessageConverter_Internal_t private MessageMetaData convertMetaData(final InternalMessage serverMsg, final String bodyMimeType, final int size) { - MessagePublishInfo publishInfo = new MessagePublishInfo() - { - @Override - public AMQShortString getExchange() - { - return AMQShortString.EMPTY_STRING; - } - - @Override - public void setExchange(final AMQShortString amqShortString) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isImmediate() - { - return false; - } - - @Override - public boolean isMandatory() - { - return false; - } - - @Override - public AMQShortString getRoutingKey() - { - return AMQShortString.valueOf(serverMsg.getInitialRoutingAddress()); - } - }; + MessagePublishInfo publishInfo = new MessagePublishInfo(AMQShortString.EMPTY_STRING, + false, + false, + AMQShortString.valueOf(serverMsg.getInitialRoutingAddress())); final BasicContentHeaderProperties props = new BasicContentHeaderProperties(); @@ -191,7 +163,7 @@ public class MessageConverter_Internal_t props.setHeaders(FieldTable.convertToFieldTable(headerProps)); - final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); + final ContentHeaderBody chb = new ContentHeaderBody(props); chb.setBodySize(size); return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime()); } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Fri Oct 17 14:23:19 2014 @@ -20,8 +20,13 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collection; -import org.apache.qpid.AMQException; +import java.util.Set; + import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; @@ -29,7 +34,7 @@ import org.apache.qpid.framing.BasicCont import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.EncodingUtils; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -37,12 +42,6 @@ import org.apache.qpid.server.util.ByteB import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.util.ByteBufferInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Set; - /** * Encapsulates a publish body and a content header. In the context of the message store these are treated as a * single unit. @@ -170,33 +169,11 @@ public class MessageMetaData implements long arrivalTime = EncodingUtils.readLong(dais); MessagePublishInfo publishBody = - new MessagePublishInfo() - { + new MessagePublishInfo(exchange, + (flags & IMMEDIATE_FLAG) != 0, + (flags & MANDATORY_FLAG) != 0, + routingKey); - public AMQShortString getExchange() - { - return exchange; - } - - public void setExchange(AMQShortString exchange) - { - } - - public boolean isImmediate() - { - return (flags & IMMEDIATE_FLAG) != 0; - } - - public boolean isMandatory() - { - return (flags & MANDATORY_FLAG) != 0; - } - - public AMQShortString getRoutingKey() - { - return routingKey; - } - }; return new MessageMetaData(publishBody, chb, arrivalTime); } catch (IOException e) Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Fri Oct 17 14:23:19 2014 @@ -20,13 +20,12 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.queue.QueueEntry; - import java.util.Collection; import java.util.Set; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; + public interface UnacknowledgedMessageMap { @@ -37,12 +36,12 @@ public interface UnacknowledgedMessageMa *@param message the message being iterated over @return true to stop iteration, false to continue * @throws AMQException */ - boolean callback(final long deliveryTag, MessageInstance message) throws AMQException; + boolean callback(final long deliveryTag, MessageInstance message); void visitComplete(); } - void visit(Visitor visitor) throws AMQException; + void visit(Visitor visitor); void add(long deliveryTag, MessageInstance message); Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Fri Oct 17 14:23:19 2014 @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.qpid.AMQException; import org.apache.qpid.server.message.MessageInstance; public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap @@ -82,7 +81,7 @@ public class UnacknowledgedMessageMapImp } } - public void visit(Visitor visitor) throws AMQException + public void visit(Visitor visitor) { synchronized (_lock) { Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Fri Oct 17 14:23:19 2014 @@ -29,7 +29,7 @@ import java.util.Map; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.message.MessageContentSource; @@ -41,7 +41,7 @@ import org.apache.qpid.test.utils.QpidTe public class AMQChannelTest extends QpidTestCase { private VirtualHostImpl _virtualHost; - private AMQProtocolSession _protocolSession; + private AMQProtocolEngine _protocolSession; private Map<Integer,String> _replies; private Broker _broker; @@ -98,18 +98,17 @@ public class AMQChannelTest extends Qpid AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore()); channel.setLocalTransactional(); - MessagePublishInfo info = mock(MessagePublishInfo.class); + MessagePublishInfo info = new MessagePublishInfo(new AMQShortString("test"), false, false, null); ExchangeImpl e = mock(ExchangeImpl.class); ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class); BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class); when(contentHeaderBody.getProperties()).thenReturn(properties); - when(info.getExchange()).thenReturn(new AMQShortString("test")); when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName() + "_incorrect")); channel.setPublishFrame(info, e); channel.publishContentHeader(contentHeaderBody); - channel.commit(); + channel.commit(null, false); assertEquals("Unexpected number of replies", 1, _replies.size()); assertEquals("Message authorization passed", "Access Refused", _replies.get(403)); @@ -121,18 +120,17 @@ public class AMQChannelTest extends Qpid AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore()); channel.setLocalTransactional(); - MessagePublishInfo info = mock(MessagePublishInfo.class); + MessagePublishInfo info = new MessagePublishInfo(new AMQShortString("test"), false, false, null); ExchangeImpl e = mock(ExchangeImpl.class); ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class); BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class); when(contentHeaderBody.getProperties()).thenReturn(properties); - when(info.getExchange()).thenReturn(new AMQShortString("test")); when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName())); channel.setPublishFrame(info, e); channel.publishContentHeader(contentHeaderBody); - channel.commit(); + channel.commit(null, false); assertEquals("Unexpected number of replies", 0, _replies.size()); } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Fri Oct 17 14:23:19 2014 @@ -29,7 +29,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; @@ -51,7 +51,7 @@ public class AckTest extends QpidTestCas private ConsumerTarget_0_8 _subscriptionTarget; private ConsumerImpl _consumer; - private AMQProtocolSession _protocolSession; + private AMQProtocolEngine _protocolEngine; private TestMemoryMessageStore _messageStore; @@ -68,8 +68,8 @@ public class AckTest extends QpidTestCas super.setUp(); BrokerTestHelper.setUp(); _channel = BrokerTestHelper_0_8.createChannel(5); - _protocolSession = _channel.getProtocolSession(); - _virtualHost = _protocolSession.getVirtualHost(); + _protocolEngine = _channel.getConnection(); + _virtualHost = _protocolEngine.getVirtualHost(); _queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost); _messageStore = (TestMemoryMessageStore)_virtualHost.getMessageStore(); } @@ -90,37 +90,10 @@ public class AckTest extends QpidTestCas { for (int i = 1; i <= count; i++) { - MessagePublishInfo publishBody = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return new AMQShortString("someExchange"); - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return new AMQShortString("rk"); - } - }; + MessagePublishInfo publishBody = new MessagePublishInfo(new AMQShortString("someExchange"), false, false, + new AMQShortString("rk")); BasicContentHeaderProperties b = new BasicContentHeaderProperties(); - ContentHeaderBody cb = new ContentHeaderBody(); - cb.setProperties(b); + ContentHeaderBody cb = new ContentHeaderBody(b); if (persistent) { Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Fri Oct 17 14:23:19 2014 @@ -25,6 +25,7 @@ import java.util.List; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageCounter; import org.apache.qpid.server.store.MessageStore; @@ -75,7 +76,7 @@ public class AcknowledgeTest extends Qpi private InternalTestProtocolSession getSession() { - return (InternalTestProtocolSession)_channel.getProtocolSession(); + return (InternalTestProtocolSession)_channel.getConnection(); } private AMQQueue getQueue() @@ -129,7 +130,7 @@ public class AcknowledgeTest extends Qpi if (getChannel().isTransactional()) { - getChannel().commit(); + getChannel().commit(null, false); } //Ensure they are stored @@ -140,7 +141,7 @@ public class AcknowledgeTest extends Qpi //Subscribe to the queue AMQShortString subscriber = _channel.consumeFromSource(null, - Collections.singleton(_queue), + Collections.<MessageSource>singleton(_queue), true, null, true, false); getQueue().deliverAsync(); @@ -164,7 +165,7 @@ public class AcknowledgeTest extends Qpi if (getChannel().isTransactional()) { - getChannel().commit(); + getChannel().commit(null, false); } // Check Remaining Acknowledgements Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java Fri Oct 17 14:23:19 2014 @@ -24,18 +24,15 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class BrokerTestHelper_0_8 extends BrokerTestHelper { - public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException + public static AMQChannel createChannel(int channelId, AMQProtocolEngine session) throws AMQException { AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore()); session.addChannel(channel); @@ -69,9 +66,7 @@ public class BrokerTestHelper_0_8 extend { AMQShortString routingKey = new AMQShortString(queueName); AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName); - MessagePublishInfo info = mock(MessagePublishInfo.class); - when(info.getExchange()).thenReturn(exchangeNameAsShortString); - when(info.getRoutingKey()).thenReturn(routingKey); + MessagePublishInfo info = new MessagePublishInfo(exchangeNameAsShortString, false, false, routingKey); MessageDestination destination; if(exchangeName == null || "".equals(exchangeName)) @@ -86,22 +81,20 @@ public class BrokerTestHelper_0_8 extend { channel.setPublishFrame(info, destination); - // Set the body size - ContentHeaderBody _headerBody = new ContentHeaderBody(); - _headerBody.setBodySize(0); // Set Minimum properties BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + properties.setExpiration(0L); properties.setTimestamp(System.currentTimeMillis()); // Make Message Persistent properties.setDeliveryMode((byte) 2); - _headerBody.setProperties(properties); + ContentHeaderBody headerBody = new ContentHeaderBody(properties, 0); - channel.publishContentHeader(_headerBody); + channel.publishContentHeader(headerBody); } channel.sync(); } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Fri Oct 17 14:23:19 2014 @@ -39,14 +39,13 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -231,7 +230,7 @@ public class InternalTestProtocolSession //Simulate the Client responding with a CloseOK // should really update the StateManger but we don't have access here // changeState(AMQState.CONNECTION_CLOSED); - ((AMQChannel)session).getProtocolSession().closeSession(); + ((AMQChannel)session).getConnection().closeSession(); } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java Fri Oct 17 14:23:19 2014 @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; @@ -46,23 +44,16 @@ public class MaxChannelsTest extends Qpi long maxChannels = 10L; _session.setMaximumNumberOfChannels(maxChannels); - assertEquals("Number of channels not correctly set.", new Long(maxChannels), _session.getMaximumNumberOfChannels()); + assertEquals("Number of channels not correctly set.", maxChannels, _session.getMaximumNumberOfChannels()); - for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++) + for (long currentChannel = 1L; currentChannel <= maxChannels; currentChannel++) { - _session.addChannel(new AMQChannel(_session, (int) currentChannel, null)); + _session.receiveChannelOpen( (int) currentChannel); } - - try - { - _session.addChannel(new AMQChannel(_session, (int) maxChannels, null)); - fail("Cannot create more channels then maximum"); - } - catch (AMQException e) - { - assertEquals("Wrong exception received.", e.getErrorCode(), AMQConstant.NOT_ALLOWED); - } - assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size())); + assertFalse("Connection should not be closed after opening " + maxChannels + " channels",_session.isClosed()); + assertEquals("Maximum number of channels not set.", maxChannels, _session.getChannels().size()); + _session.receiveChannelOpen((int) maxChannels+1); + assertTrue("Connection should be closed after opening " + (maxChannels + 1) + " channels",_session.isClosed()); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
