Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java Sat Oct 11 23:46:39 2014 @@ -22,12 +22,6 @@ package org.apache.qpid.server.protocol. import java.security.AccessControlException; import java.security.PrivilegedAction; -import java.util.Collection; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.UUID; import javax.security.auth.Subject; import javax.security.sasl.SaslException; @@ -37,44 +31,20 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.filter.AMQInvalidArgumentException; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageDestination; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.ExclusivityPolicy; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.NoFactoryForTypeException; -import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.UnknownConfiguredObjectException; import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; -import org.apache.qpid.server.virtualhost.ExchangeExistsException; -import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; -import org.apache.qpid.server.virtualhost.QueueExistsException; -import org.apache.qpid.server.virtualhost.RequiredExchangeException; -import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class ServerMethodDispatcherImpl implements MethodDispatcher { private static final Logger _logger = Logger.getLogger(ServerMethodDispatcherImpl.class); - private final AMQProtocolSession<?> _connection; + private final AMQProtocolEngine _connection; private static interface ChannelAction @@ -82,19 +52,19 @@ public class ServerMethodDispatcherImpl void onChannel(ChannelMethodProcessor channel); } - public static MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection) + public static MethodDispatcher createMethodDispatcher(AMQProtocolEngine connection) { return new ServerMethodDispatcherImpl(connection); } - public ServerMethodDispatcherImpl(AMQProtocolSession<?> connection) + public ServerMethodDispatcherImpl(AMQProtocolEngine connection) { _connection = connection; } - protected final AMQProtocolSession<?> getConnection() + protected final AMQProtocolEngine getConnection() { return _connection; } @@ -104,7 +74,7 @@ public class ServerMethodDispatcherImpl final AMQChannel channel = _connection.getChannel(channelId); if (channel == null) { - // TODO throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + closeConnection(AMQConstant.CHANNEL_ERROR, "Unknown channel id: " + channelId); } else { @@ -121,595 +91,169 @@ public class ServerMethodDispatcherImpl } - public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException + public boolean dispatchAccessRequest(final AccessRequestBody body, int channelId) { - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - if(ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()) ) - { - throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9"); - } - - // We don't implement access control class, but to keep clients happy that expect it - // always use the "0" ticket. - AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0); - channel.sync(); - _connection.writeFrame(response.generateFrame(channelId)); - return true; - } - - public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException - { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId); - } - - final AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveAccessRequest(body.getRealm(), + body.getExclusive(), + body.getPassive(), + body.getActive(), + body.getWrite(), + body.getRead()); + } + } + ); - // this method throws an AMQException if the delivery tag is not known - channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple()); return true; } - public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException + public boolean dispatchBasicAck(final BasicAckBody body, int channelId) { - final AMQChannel channel = _connection.getChannel(channelId); - - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("BasicCancel: for:" + body.getConsumerTag() + - " nowait:" + body.getNowait()); - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicAck(body.getDeliveryTag(), body.getMultiple()); + } + } + ); - channel.unsubscribeConsumer(body.getConsumerTag()); - if (!body.getNowait()) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag()); - channel.sync(); - _connection.writeFrame(cancelOkBody.generateFrame(channelId)); - } return true; } - public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException + public boolean dispatchBasicCancel(final BasicCancelBody body, int channelId) { - AMQChannel channel = _connection.getChannel(channelId); - VirtualHostImpl<?,?,?> vHost = _connection.getVirtualHost(); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - else - { - channel.sync(); - String queueName = body.getQueue() == null ? null : body.getQueue().asString(); - if (_logger.isDebugEnabled()) - { - _logger.debug("BasicConsume: from '" + queueName + - "' for:" + body.getConsumerTag() + - " nowait:" + body.getNowait() + - " args:" + body.getArguments()); - } - - MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName); - final Collection<MessageSource> sources = new HashSet<>(); - if(queue != null) - { - sources.add(queue); - } - else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") - && body.getArguments() != null - && body.getArguments().get("x-multiqueue") instanceof Collection) - { - for(Object object : (Collection<Object>) body.getArguments().get("x-multiqueue")) - { - String sourceName = String.valueOf(object); - sourceName = sourceName.trim(); - if(sourceName.length() != 0) - { - MessageSource source = vHost.getMessageSource(sourceName); - if(source == null) - { - sources.clear(); - break; - } - else - { - sources.add(source); - } - } - } - queueName = body.getArguments().get("x-multiqueue").toString(); - } - - if (sources.isEmpty()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("No queue for '" + queueName + "'"); - } - if (queueName != null) - { - String msg = "No such queue, '" + queueName + "'"; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry()); - } - else - { - String msg = "No queue name provided, no default queue defined."; - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, _connection.getMethodRegistry()); - } - } - else - { - final AMQShortString consumerTagName; - - if (body.getConsumerTag() != null) - { - consumerTagName = body.getConsumerTag().intern(false); - } - else - { - consumerTagName = null; - } - - try - { - if(consumerTagName == null || channel.getSubscription(consumerTagName) == null) - { - - AMQShortString consumerTag = channel.consumeFromSource(consumerTagName, - sources, - !body.getNoAck(), - body.getArguments(), - body.getExclusive(), - body.getNoLocal()); - if (!body.getNowait()) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - } - } - else - { - AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'"); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode - msg, // replytext - body.getClazz(), - body.getMethod()); - _connection.writeFrame(responseBody.generateFrame(0)); - } - - } - catch (AMQInvalidArgumentException ise) - { - _logger.debug("Closing connection due to invalid selector"); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(), - AMQShortString.validValueOf(ise.getMessage()), - body.getClazz(), - body.getMethod()); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - - } - catch (AMQQueue.ExistingExclusiveConsumer e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " as it already has an existing exclusive consumer", - _connection.getMethodRegistry()); - } - catch (AMQQueue.ExistingConsumerPreventsExclusive e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " exclusively as it already has a consumer", - _connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " permission denied", _connection.getMethodRegistry()); - } - catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " as it already has an incompatible exclusivity policy", - _connection.getMethodRegistry()); - } - - } - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicCancel(body.getConsumerTag(), + body.getNowait() + ); + } + } + ); return true; } - public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException + public boolean dispatchBasicConsume(final BasicConsumeBody body, int channelId) { - VirtualHostImpl vHost = _connection.getVirtualHost(); - - AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - else - { - channel.sync(); - AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString()); - if (queue == null) - { - _logger.info("No queue for '" + body.getQueue() + "'"); - if(body.getQueue()!=null) - { - throw body.getConnectionException(AMQConstant.NOT_FOUND, - "No such queue, '" + body.getQueue() + "'", - _connection.getMethodRegistry()); - } - else - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "No queue name provided, no default queue defined.", - _connection.getMethodRegistry()); - } - } - else - { - - try - { - if (!performGet(queue, _connection, channel, !body.getNoAck())) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicConsume(body.getQueue(), body.getConsumerTag(), + body.getNoLocal(), body.getNoAck(), + body.getExclusive(), body.getNowait(), + body.getArguments()); + } + } + ); - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), _connection.getMethodRegistry()); - } - catch (MessageSource.ExistingExclusiveConsumer e) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an exclusive consumer", - _connection.getMethodRegistry()); - } - catch (MessageSource.ExistingConsumerPreventsExclusive e) - { - throw body.getConnectionException(AMQConstant.INTERNAL_ERROR, - "The GET request has been evaluated as an exclusive consumer, " + - "this is likely due to a programming error in the Qpid broker", - _connection.getMethodRegistry()); - } - catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an incompatible exclusivit policy", - _connection.getMethodRegistry()); - } - } - } return true; } - public static boolean performGet(final AMQQueue queue, - final AMQProtocolSession session, - final AMQChannel channel, - final boolean acks) - throws AMQException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused + private void closeConnection(final AMQConstant constant, + final String message) { - - final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); - - final GetDeliveryMethod getDeliveryMethod = - new GetDeliveryMethod(singleMessageCredit, session, channel, queue); - final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() - { - - public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) - { - channel.addUnacknowledgedMessage(entry, deliveryTag, null); - } - }; - - ConsumerTarget_0_8 target; - EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, - ConsumerImpl.Option.SEES_REQUEUES); - if(acks) - { - - target = ConsumerTarget_0_8.createAckTarget(channel, - AMQShortString.EMPTY_STRING, null, - singleMessageCredit, getDeliveryMethod, getRecordMethod); - } - else - { - target = ConsumerTarget_0_8.createGetNoAckTarget(channel, - AMQShortString.EMPTY_STRING, null, - singleMessageCredit, getDeliveryMethod, getRecordMethod); - } - - ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options); - sub.flush(); - sub.close(); - return(getDeliveryMethod.hasDeliveredMessage()); - - + _connection.closeConnection(constant, message, 0); } - - private static class GetDeliveryMethod implements ClientDeliveryMethod + public boolean dispatchBasicGet(final BasicGetBody body, int channelId) { - private final FlowCreditManager _singleMessageCredit; - private final AMQProtocolSession _session; - private final AMQChannel _channel; - private final AMQQueue _queue; - private boolean _deliveredMessage; - - public GetDeliveryMethod(final FlowCreditManager singleMessageCredit, - final AMQProtocolSession session, - final AMQChannel channel, final AMQQueue queue) - { - _singleMessageCredit = singleMessageCredit; - _session = session; - _channel = channel; - _queue = queue; - } - - @Override - public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, - final InstanceProperties props, final long deliveryTag) - { - _singleMessageCredit.useCreditForMessage(message.getSize()); - long size =_session.getProtocolOutputConverter().writeGetOk(message, - props, - _channel.getChannelId(), - deliveryTag, - _queue.getQueueDepthMessages()); - - _deliveredMessage = true; - return size; - } - - public boolean hasDeliveredMessage() - { - return _deliveredMessage; - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicGet(body.getQueue(), body.getNoAck()); + } + } + ); + return true; } - public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException + public boolean dispatchBasicPublish(final BasicPublishBody body, int channelId) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Publish received on channel " + channelId); - } - - AMQShortString exchangeName = body.getExchange(); - VirtualHostImpl vHost = _connection.getVirtualHost(); - - // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? - - MessageDestination destination; - - if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName)) - { - destination = vHost.getDefaultDestination(); - } - else - { - destination = vHost.getMessageDestination(exchangeName.toString()); - } - - // if the exchange does not exist we raise a channel exception - if (destination == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name", - _connection.getMethodRegistry()); - } - else - { - // The partially populated BasicDeliver frame plus the received route body - // is stored in the channel. Once the final body frame has been received - // it is routed to the exchange. - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicPublish(body.getExchange(), body.getRoutingKey(), + body.getMandatory(), body.getImmediate()); + } + } + ); - MessagePublishInfo info = new MessagePublishInfo(body.getExchange(), - body.getImmediate(), - body.getMandatory(), - body.getRoutingKey()); - info.setExchange(exchangeName); - try - { - channel.setPublishFrame(info, destination); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - } return true; } - public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException + public boolean dispatchBasicQos(final BasicQosBody body, int channelId) { - AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.sync(); - channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount()); - - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicQos(body.getPrefetchSize(), body.getPrefetchCount(), + body.getGlobal()); + } + } + ); return true; } - public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException + public boolean dispatchBasicRecover(final BasicRecoverBody body, int channelId) { - _logger.debug("Recover received on protocol session " + _connection - + " and channel " + channelId); - AMQChannel channel = _connection.getChannel(channelId); - - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - channel.resend(); + final boolean sync = _connection.getProtocolVersion().equals(ProtocolVersion.v8_0); - // Qpid 0-8 hacks a synchronous -ok onto recover. - // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant - if(_connection.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); - channel.sync(); - _connection.writeFrame(recoverOk.generateFrame(channelId)); - - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicRecover(body.getRequeue(), sync); + } + } + ); return true; } - public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException + public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) throws AMQException { - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting:" + body.getDeliveryTag() + - ": Requeue:" + body.getRequeue() + - " on channel:" + channel.debugIdentity()); - } - - long deliveryTag = body.getDeliveryTag(); - - MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag); - - if (message == null) - { - _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); - } - else - { - - if (message.getMessage() == null) - { - _logger.warn("Message has already been purged, unable to Reject."); - } - else - { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + - ": Requeue:" + body.getRequeue() + - " on channel:" + channel.debugIdentity()); - } - - if (body.getRequeue()) - { - //this requeue represents a message rejected from the pre-dispatch queue - //therefore we need to amend the delivery counter. - message.decrementDeliveryCount(); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicReject(body.getDeliveryTag(), body.getRequeue()); + } + } + ); - channel.requeue(deliveryTag); - } - else - { - // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here - // as it would prevent redelivery - // message.reject(); - - final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); - _logger.debug("maxDeliveryCountEnabled: " - + maxDeliveryCountEnabled - + " deliveryTag " - + deliveryTag); - if (maxDeliveryCountEnabled) - { - final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag); - _logger.debug("deliveredTooManyTimes: " - + deliveredTooManyTimes - + " deliveryTag " - + deliveryTag); - if (deliveredTooManyTimes) - { - channel.deadLetter(body.getDeliveryTag()); - } - else - { - //this requeue represents a message rejected because of a recover/rollback that we - //are not ready to DLQ. We rely on the reject command to resend from the unacked map - //and therefore need to increment the delivery counter so we cancel out the effect - //of the AMQChannel#resend() decrement. - message.incrementDeliveryCount(); - } - } - else - { - channel.requeue(deliveryTag); - } - } - } - } return true; } @@ -720,7 +264,9 @@ public class ServerMethodDispatcherImpl // Protect the broker against out of order frame request. if (virtualHost == null) { - throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null); + throw new AMQException(AMQConstant.COMMAND_INVALID, + "Virtualhost has not yet been set. ConnectionOpen has not been called.", + null); } _logger.info("Connecting to: " + virtualHost.getName()); @@ -783,30 +329,17 @@ public class ServerMethodDispatcherImpl public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException { - if (_logger.isInfoEnabled()) - { - _logger.info("Received channel close for id " + channelId - + " citing class " + body.getClassId() + - " and method " + body.getMethodId()); - } - + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveChannelClose(); + } + } + ); - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, - "Trying to close unknown channel", - _connection.getMethodRegistry()); - } - channel.sync(); - _connection.closeChannel(channelId); - // Client requested closure so we don't wait for ok we send it - _connection.closeChannelOk(channelId); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); return true; } @@ -814,32 +347,34 @@ public class ServerMethodDispatcherImpl public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException { - _logger.info("Received channel-close-ok for channel-id " + channelId); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveChannelCloseOk(); + } + } + ); - // Let the Protocol Session know the channel is now closed. - _connection.closeChannelOk(channelId); return true; } - public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException - { - final AMQProtocolSession<?> connection = getConnection(); - - - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - channel.sync(); - channel.setSuspended(!body.getActive()); - _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive()); - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive()); - connection.writeFrame(responseBody.generateFrame(channelId)); + public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) throws AMQException + { + + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveChannelFlow(body.getActive()); + } + } + ); return true; } @@ -861,7 +396,8 @@ public class ServerMethodDispatcherImpl String virtualHostName; if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/') { - virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString(); + virtualHostName = + new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString(); } else { @@ -872,41 +408,41 @@ public class ServerMethodDispatcherImpl if (virtualHost == null) { - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'", - _connection.getMethodRegistry()); + closeConnection(AMQConstant.NOT_FOUND, + "Unknown virtual host: '" + virtualHostName + "'"); + } else { // Check virtualhost access if (virtualHost.getState() != State.ACTIVE) { - throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, - "Virtual host '" + virtualHost.getName() + "' is not active", - _connection.getMethodRegistry()); - } + closeConnection(AMQConstant.CONNECTION_FORCED, + "Virtual host '" + virtualHost.getName() + "' is not active" + ); - _connection.setVirtualHost(virtualHost); - try - { - virtualHost.getSecurityManager().authoriseCreateConnection(_connection); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); } - - // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (_connection.getContextKey() == null) + else { - _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); - } + _connection.setVirtualHost(virtualHost); + try + { + virtualHost.getSecurityManager().authoriseCreateConnection(_connection); + if (_connection.getContextKey() == null) + { + _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); + } - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); - _connection.writeFrame(responseBody.generateFrame(channelId)); + _connection.writeFrame(responseBody.generateFrame(channelId)); + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); + } + } } return true; } @@ -1067,7 +603,7 @@ public class ServerMethodDispatcherImpl int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - if(frameMax <= 0) + if (frameMax <= 0) { frameMax = Integer.MAX_VALUE; } @@ -1089,12 +625,12 @@ public class ServerMethodDispatcherImpl return true; } - private void disposeSaslServer(AMQProtocolSession ps) + private void disposeSaslServer(AMQProtocolEngine connection) { - SaslServer ss = ps.getSaslServer(); + SaslServer ss = connection.getSaslServer(); if (ss != null) { - ps.setSaslServer(null); + connection.setSaslServer(null); try { ss.dispose(); @@ -1123,60 +659,66 @@ public class ServerMethodDispatcherImpl if (ss == null) { - throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, - "Unable to create SASL Server:" + body.getMechanism(), - _connection.getMethodRegistry()); + closeConnection(AMQConstant.RESOURCE_ERROR, + "Unable to create SASL Server:" + body.getMechanism() + ); + } + else + { - _connection.setSaslServer(ss); + _connection.setSaslServer(ss); - final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); - //save clientProperties - _connection.setClientProperties(body.getClientProperties()); + final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); + //save clientProperties + _connection.setClientProperties(body.getClientProperties()); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); - switch (authResult.getStatus()) - { - case ERROR: - Exception cause = authResult.getCause(); + switch (authResult.getStatus()) + { + case ERROR: + Exception cause = authResult.getCause(); - _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); + _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - ConnectionCloseBody closeBody = - methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode - AMQConstant.NOT_ALLOWED.getName(), - body.getClazz(), - body.getMethod()); - - _connection.writeFrame(closeBody.generateFrame(0)); - disposeSaslServer(_connection); - break; + ConnectionCloseBody closeBody = + methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), + // replyCode + AMQConstant.NOT_ALLOWED.getName(), + body.getClazz(), + body.getMethod()); + + _connection.writeFrame(closeBody.generateFrame(0)); + disposeSaslServer(_connection); + break; - case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + authResult.getSubject()); - } - _connection.setAuthorizedSubject(authResult.getSubject()); + case SUCCESS: + if (_logger.isInfoEnabled()) + { + _logger.info("Connected as: " + authResult.getSubject()); + } + _connection.setAuthorizedSubject(authResult.getSubject()); - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); + int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - if(frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } + if (frameMax <= 0) + { + frameMax = Integer.MAX_VALUE; + } - ConnectionTuneBody - tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - _connection.writeFrame(tuneBody.generateFrame(0)); - break; - case CONTINUE: - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - _connection.writeFrame(secureBody.generateFrame(0)); + ConnectionTuneBody + tuneBody = + methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), + frameMax, + broker.getConnection_heartBeatDelay()); + _connection.writeFrame(tuneBody.generateFrame(0)); + break; + case CONTINUE: + ConnectionSecureBody + secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); + _connection.writeFrame(secureBody.generateFrame(0)); + } } } catch (SaslException e) @@ -1189,38 +731,33 @@ public class ServerMethodDispatcherImpl public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException { - final AMQProtocolSession<?> connection = getConnection(); - - if (_logger.isDebugEnabled()) - { - _logger.debug(body); - } + final AMQProtocolEngine connection = getConnection(); connection.initHeartbeats(body.getHeartbeat()); int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - if(brokerFrameMax <= 0) + if (brokerFrameMax <= 0) { brokerFrameMax = Integer.MAX_VALUE; } - if(body.getFrameMax() > (long) brokerFrameMax) + if (body.getFrameMax() > (long) brokerFrameMax) { throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, "Attempt to set max frame size to " + body.getFrameMax() + " greater than the broker will allow: " + brokerFrameMax, body.getClazz(), body.getMethod(), - connection.getMethodRegistry(),null); + connection.getMethodRegistry(), null); } - else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) + else if (body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) { throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, "Attempt to set max frame size to " + body.getFrameMax() + " which is smaller than the specification definined minimum: " + AMQConstant.FRAME_MIN_SIZE.getCode(), body.getClazz(), body.getMethod(), - connection.getMethodRegistry(),null); + connection.getMethodRegistry(), null); } int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax(); connection.setMaxFrameSize(frameMax); @@ -1231,1035 +768,246 @@ public class ServerMethodDispatcherImpl return true; } - public static final int OK = 0; - public static final int EXCHANGE_NOT_FOUND = 1; - public static final int QUEUE_NOT_FOUND = 2; - public static final int NO_BINDINGS = 3; - public static final int QUEUE_NOT_BOUND = 4; - public static final int NO_QUEUE_BOUND_WITH_RK = 5; - public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6; - - public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException + public boolean dispatchExchangeBound(final ExchangeBoundBody body, int channelId) { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.sync(); - - - AMQShortString exchangeName = body.getExchange(); - AMQShortString queueName = body.getQueue(); - AMQShortString routingKey = body.getRoutingKey(); - ExchangeBoundOkBody response; - - if(isDefaultExchange(exchangeName)) - { - if(routingKey == null) - { - if(queueName == null) - { - response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null); - } - else - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - response = methodRegistry.createExchangeBoundOkBody(OK, null); - } - } - } - else - { - if(queueName == null) - { - response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null); - } - else - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null); - } - } - } - } - else - { - ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); - if (exchange == null) - { - - - response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND, - AMQShortString.validValueOf("Exchange '" + exchangeName + "' not found")); - } - else if (routingKey == null) - { - if (queueName == null) - { - if (exchange.hasBindings()) - { - response = methodRegistry.createExchangeBoundOkBody(OK, null); - } - else - { - - response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode - null); // replyText - } - } - else - { - - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - if (exchange.isBound(queue)) - { - - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText - } - } - } - } - else if (queueName != null) - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString(); - if (exchange.isBound(bindingKey, queue)) - { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveExchangeBound(body.getExchange(), body.getQueue(), body.getRoutingKey()); + } + } + ); - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { + return true; + } - String message = "Queue '" + queueName + "' not bound with routing key '" + - body.getRoutingKey() + "' to exchange '" + exchangeName + "'"; + public boolean dispatchExchangeDeclare(final ExchangeDeclareBody body, int channelId) + { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveExchangeDeclare(body.getExchange(), body.getType(), + body.getPassive(), + body.getDurable(), + body.getAutoDelete(), + body.getInternal(), + body.getNowait(), + body.getArguments()); + } + } + ); - response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - AMQShortString.validValueOf(message)); // replyText - } - } - } - else - { - if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString())) - { + return true; + } - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { + public boolean dispatchExchangeDelete(final ExchangeDeleteBody body, int channelId) + { + + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveExchangeDelete(body.getExchange(), + body.getIfUnused(), + body.getNowait()); + } + } + ); - response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode - AMQShortString.validValueOf("No queue bound with routing key '" + body.getRoutingKey() + - "' to exchange '" + exchangeName + "'")); // replyText - } - } - } - _connection.writeFrame(response.generateFrame(channelId)); return true; } - public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException + private boolean isDefaultExchange(final AMQShortString exchangeName) { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - final AMQShortString exchangeName = body.getExchange(); - if (_logger.isDebugEnabled()) - { - _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName); - } - - ExchangeImpl exchange; - - if(isDefaultExchange(exchangeName)) - { - if(!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(body.getType())) - { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: " - + " of type " - + ExchangeDefaults.DIRECT_EXCHANGE_CLASS - + " to " + body.getType() +".", - body.getClazz(), body.getMethod(), - _connection.getMethodRegistry(),null); - } - } - else - { - if (body.getPassive()) - { - exchange = virtualHost.getExchange(exchangeName.toString()); - if(exchange == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName, - _connection.getMethodRegistry()); - } - else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString())) - { + return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); + } - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + - exchangeName + " of type " + exchange.getType() - + " to " + body.getType() +".", - body.getClazz(), body.getMethod(), - _connection.getMethodRegistry(),null); - } + public boolean dispatchQueueBind(final QueueBindBody body, int channelId) + { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveQueueBind(body.getQueue(), + body.getExchange(), + body.getRoutingKey(), + body.getNowait(), + body.getArguments()); + } + } + ); + + return true; + } + + public boolean dispatchQueueDeclare(final QueueDeclareBody body, int channelId) + { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveQueueDeclare(body.getQueue(), + body.getPassive(), + body.getDurable(), + body.getExclusive(), + body.getAutoDelete(), + body.getNowait(), + body.getArguments()); + } + } + ); + + return true; + } + + public boolean dispatchQueueDelete(final QueueDeleteBody body, int channelId) + { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveQueueDelete(body.getQueue(), + body.getIfUnused(), + body.getIfEmpty(), + body.getNowait()); + } + } + ); + + return true; + } + + public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) throws AMQException + { + + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveQueuePurge(body.getQueue(), + body.getNowait()); + } + } + ); - } - else - { - try - { - String name = exchangeName == null ? null : exchangeName.intern().toString(); - String type = body.getType() == null ? null : body.getType().intern().toString(); + return true; + } - Map<String,Object> attributes = new HashMap<String, Object>(); - if(body.getArguments() != null) - { - attributes.putAll(FieldTable.convertToMap(body.getArguments())); - } - attributes.put(org.apache.qpid.server.model.Exchange.ID, null); - attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); - attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE)) - { - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); - } - exchange = virtualHost.createExchange(attributes); - } - catch(ReservedExchangeNameException e) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix.", - _connection.getMethodRegistry()); + public boolean dispatchTxCommit(TxCommitBody body, final int channelId) throws AMQException + { - } - catch(ExchangeExistsException e) - { - exchange = e.getExistingExchange(); - if(!new AMQShortString(exchange.getType()).equals(body.getType())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " - + exchange.getType() - + " to " + body.getType() + ".", - _connection.getMethodRegistry()); - } - } - catch(NoFactoryForTypeException e) - { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, - "Unknown exchange type '" - + e.getType() - + "' for exchange '" - + exchangeName - + "'", - _connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - catch (UnknownConfiguredObjectException e) - { - // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur - throw body.getConnectionException(AMQConstant.NOT_FOUND, - "Unknown alternate exchange " - + (e.getName() != null - ? "name: \"" + e.getName() + "\"" - : "id: " + e.getId()), - _connection.getMethodRegistry()); - } - catch (IllegalArgumentException e) - { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, - "Error creating exchange '" - + exchangeName - + "': " - + e.getMessage(), - _connection.getMethodRegistry()); - } - } - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveTxCommit(); + } + } + ); - if(!body.getNowait()) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); - channel.sync(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - } return true; } - public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException + public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) throws AMQException { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.sync(); - try - { - - if(isDefaultExchange(body.getExchange())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Default Exchange cannot be deleted", - _connection.getMethodRegistry()); - } - - final String exchangeName = body.getExchange().toString(); - - final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); - if(exchange == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(), - _connection.getMethodRegistry()); - } - virtualHost.removeExchange(exchange, !body.getIfUnused()); - - ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody(); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveTxRollback(); + } + } + ); + return true; + } - _connection.writeFrame(responseBody.generateFrame(channelId)); - } + public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException + { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveTxSelect(); + } + } + ); + return true; + } - catch (ExchangeIsAlternateException e) - { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange", - _connection.getMethodRegistry()); + public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) throws AMQException + { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicRecover(body.getRequeue(), true); + } + } + ); - } - catch (RequiredExchangeException e) - { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, - "Exchange '" + body.getExchange() + "' cannot be deleted", - _connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } return true; } - private boolean isDefaultExchange(final AMQShortString exchangeName) + public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException { - return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); + throw new UnexpectedMethodException(body); } - public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException + @Override + public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId) + throws AMQException { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - final AMQQueue queue; - final AMQShortString routingKey; - - final AMQShortString queueName = body.getQueue(); - - if (queueName == null) - { - - queue = channel.getDefaultQueue(); - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, - "No default queue defined on channel and queue was null", - _connection.getMethodRegistry()); - } + throw new UnexpectedMethodException(body); + } - if (body.getRoutingKey() == null) - { - routingKey = AMQShortString.valueOf(queue.getName()); - } - else - { - routingKey = body.getRoutingKey().intern(); - } - } - else - { - queue = virtualHost.getQueue(queueName.toString()); - routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern(); - } + public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.", - _connection.getMethodRegistry()); - } - - if(isDefaultExchange(body.getExchange())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Cannot bind the queue " + queueName + " to the default exchange", - _connection.getMethodRegistry()); - } - - final String exchangeName = body.getExchange().toString(); - - final ExchangeImpl exch = virtualHost.getExchange(exchangeName); - if (exch == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.", - _connection.getMethodRegistry()); - } - - - try - { - - Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments()); - String bindingKey = String.valueOf(routingKey); - - if (!exch.isBound(bindingKey, arguments, queue)) - { - - if(!exch.addBinding(bindingKey, queue, arguments) && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType())) - { - exch.replaceBinding(bindingKey, queue, arguments); - } - } - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); - } - if (!body.getNowait()) - { - channel.sync(); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - } - return true; - } - - public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException + public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) throws AMQException { - final AMQSessionModel session = _connection.getChannel(channelId); - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - - final AMQShortString queueName; - - // if we aren't given a queue name, we create one which we return to the client - if ((body.getQueue() == null) || (body.getQueue().length() == 0)) - { - queueName = new AMQShortString("tmp_" + UUID.randomUUID()); - } - else - { - queueName = body.getQueue().intern(); - } - - AMQQueue queue; - - //TODO: do we need to check that the queue already exists with exactly the same "configuration"? - - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - if(body.getPassive()) - { - queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry()); - } - else - { - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " - + queue.getName() - + " is exclusive, but not created on this Connection.", - _connection.getMethodRegistry()); - } - - //set this as the default queue on the channel: - channel.setDefaultQueue(queue); - } - } - else - { - - try - { - - queue = createQueue(channel, queueName, body, virtualHost, _connection); - - } - catch(QueueExistsException qe) - { - - queue = qe.getExistingQueue(); - - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " - + queue.getName() - + " is exclusive, but not created on this Connection.", - _connection.getMethodRegistry()); - } - else if(queue.isExclusive() != body.getExclusive()) - { - - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" - + queue.getName() - + "' with different exclusivity (was: " - + queue.isExclusive() - + " requested " - + body.getExclusive() - + ")", - _connection.getMethodRegistry()); - } - else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) - || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT))) - {
[... 478 lines stripped ...] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
