Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java?rev=1628473&r1=1628472&r2=1628473&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java Tue Sep 30 15:30:43 2014 @@ -20,92 +20,71 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; +import java.util.Collection; +import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.UUID; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.MessageOnlyCreditManager; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.NoFactoryForTypeException; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.UnknownConfiguredObjectException; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; +import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8; +import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; +import org.apache.qpid.server.protocol.v0_8.state.AMQState; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; +import org.apache.qpid.server.virtualhost.ExchangeExistsException; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.QueueExistsException; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; +import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class ServerMethodDispatcherImpl implements MethodDispatcher { - private final AMQProtocolSession<?> _connection; - - private static interface DispatcherFactory - { - public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection); - } - - private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories = - new HashMap<ProtocolVersion, DispatcherFactory>(); - - - static - { - _dispatcherFactories.put(ProtocolVersion.v8_0, - new DispatcherFactory() - { - public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection) - { - return new ServerMethodDispatcherImpl_8_0(connection); - } - }); - - _dispatcherFactories.put(ProtocolVersion.v0_9, - new DispatcherFactory() - { - public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection) - { - return new ServerMethodDispatcherImpl_0_9(connection); - } - }); - _dispatcherFactories.put(ProtocolVersion.v0_91, - new DispatcherFactory() - { - public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection) - { - return new ServerMethodDispatcherImpl_0_91(connection); - } - }); - - } - - - private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance(); - private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance(); - private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance(); - private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance(); - private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); - private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance(); - private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance(); - private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance(); - private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance(); - private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance(); - private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance(); - private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance(); - private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance(); - private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance(); - private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance(); - private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance(); - private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance(); - private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance(); - private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance(); - private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance(); - private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance(); - private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance(); - private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance(); - private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance(); - private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance(); - private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance(); - private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance(); - private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance(); - private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance(); + private static final Logger _logger = Logger.getLogger(ServerMethodDispatcherImpl.class); + private final AMQProtocolSession<?> _connection; public static MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection) { - return _dispatcherFactories.get(connection.getProtocolVersion()).createMethodDispatcher(connection); + return new ServerMethodDispatcherImpl(connection); } @@ -122,61 +101,618 @@ public class ServerMethodDispatcherImpl public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException { - _accessRequestHandler.methodReceived(getConnection(), body, channelId); + final AMQChannel channel = _connection.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + + if(ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()) ) + { + throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9"); + } + + // We don't implement access control class, but to keep clients happy that expect it + // always use the "0" ticket. + AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0); + channel.sync(); + _connection.writeFrame(response.generateFrame(channelId)); return true; } public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException { - _basicAckMethodHandler.methodReceived(getConnection(), body, channelId); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId); + } + + final AMQChannel channel = _connection.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + + // this method throws an AMQException if the delivery tag is not known + channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple()); return true; } public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException { - _basicCancelMethodHandler.methodReceived(getConnection(), body, channelId); + final AMQChannel channel = _connection.getChannel(channelId); + + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("BasicCancel: for:" + body.getConsumerTag() + + " nowait:" + body.getNowait()); + } + + channel.unsubscribeConsumer(body.getConsumerTag()); + if (!body.getNowait()) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag()); + channel.sync(); + _connection.writeFrame(cancelOkBody.generateFrame(channelId)); + } return true; } public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException { - _basicConsumeMethodHandler.methodReceived(getConnection(), body, channelId); + AMQChannel channel = _connection.getChannel(channelId); + VirtualHostImpl<?,?,?> vHost = _connection.getVirtualHost(); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + else + { + channel.sync(); + String queueName = body.getQueue() == null ? null : body.getQueue().asString(); + if (_logger.isDebugEnabled()) + { + _logger.debug("BasicConsume: from '" + queueName + + "' for:" + body.getConsumerTag() + + " nowait:" + body.getNowait() + + " args:" + body.getArguments()); + } + + MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName); + final Collection<MessageSource> sources = new HashSet<>(); + if(queue != null) + { + sources.add(queue); + } + else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") + && body.getArguments() != null + && body.getArguments().get("x-multiqueue") instanceof Collection) + { + for(Object object : (Collection<Object>) body.getArguments().get("x-multiqueue")) + { + String sourceName = String.valueOf(object); + sourceName = sourceName.trim(); + if(sourceName.length() != 0) + { + MessageSource source = vHost.getMessageSource(sourceName); + if(source == null) + { + sources.clear(); + break; + } + else + { + sources.add(source); + } + } + } + queueName = body.getArguments().get("x-multiqueue").toString(); + } + + if (sources.isEmpty()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("No queue for '" + queueName + "'"); + } + if (queueName != null) + { + String msg = "No such queue, '" + queueName + "'"; + throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry()); + } + else + { + String msg = "No queue name provided, no default queue defined."; + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, _connection.getMethodRegistry()); + } + } + else + { + final AMQShortString consumerTagName; + + if (body.getConsumerTag() != null) + { + consumerTagName = body.getConsumerTag().intern(false); + } + else + { + consumerTagName = null; + } + + try + { + if(consumerTagName == null || channel.getSubscription(consumerTagName) == null) + { + + AMQShortString consumerTag = channel.consumeFromSource(consumerTagName, + sources, + !body.getNoAck(), + body.getArguments(), + body.getExclusive(), + body.getNoLocal()); + if (!body.getNowait()) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag); + _connection.writeFrame(responseBody.generateFrame(channelId)); + + } + } + else + { + AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'"); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode + msg, // replytext + body.getClazz(), + body.getMethod()); + _connection.writeFrame(responseBody.generateFrame(0)); + } + + } + catch (AMQInvalidArgumentException ise) + { + _logger.debug("Closing connection due to invalid selector"); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(), + AMQShortString.validValueOf(ise.getMessage()), + body.getClazz(), + body.getMethod()); + _connection.writeFrame(responseBody.generateFrame(channelId)); + + + } + catch (AMQQueue.ExistingExclusiveConsumer e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + "Cannot subscribe to queue " + + queue.getName() + + " as it already has an existing exclusive consumer", + _connection.getMethodRegistry()); + } + catch (AMQQueue.ExistingConsumerPreventsExclusive e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + "Cannot subscribe to queue " + + queue.getName() + + " exclusively as it already has a consumer", + _connection.getMethodRegistry()); + } + catch (AccessControlException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + "Cannot subscribe to queue " + + queue.getName() + + " permission denied", _connection.getMethodRegistry()); + } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + "Cannot subscribe to queue " + + queue.getName() + + " as it already has an incompatible exclusivity policy", + _connection.getMethodRegistry()); + } + + } + } return true; } public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException { - _basicGetMethodHandler.methodReceived(getConnection(), body, channelId); + + VirtualHostImpl vHost = _connection.getVirtualHost(); + + AMQChannel channel = _connection.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + else + { + channel.sync(); + AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString()); + if (queue == null) + { + _logger.info("No queue for '" + body.getQueue() + "'"); + if(body.getQueue()!=null) + { + throw body.getConnectionException(AMQConstant.NOT_FOUND, + "No such queue, '" + body.getQueue() + "'", + _connection.getMethodRegistry()); + } + else + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "No queue name provided, no default queue defined.", + _connection.getMethodRegistry()); + } + } + else + { + + try + { + if (!performGet(queue, _connection, channel, !body.getNoAck())) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + + BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); + + + _connection.writeFrame(responseBody.generateFrame(channelId)); + } + } + catch (AccessControlException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + e.getMessage(), _connection.getMethodRegistry()); + } + catch (MessageSource.ExistingExclusiveConsumer e) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue has an exclusive consumer", + _connection.getMethodRegistry()); + } + catch (MessageSource.ExistingConsumerPreventsExclusive e) + { + throw body.getConnectionException(AMQConstant.INTERNAL_ERROR, + "The GET request has been evaluated as an exclusive consumer, " + + "this is likely due to a programming error in the Qpid broker", + _connection.getMethodRegistry()); + } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue has an incompatible exclusivit policy", + _connection.getMethodRegistry()); + } + } + } return true; } + public static boolean performGet(final AMQQueue queue, + final AMQProtocolSession session, + final AMQChannel channel, + final boolean acks) + throws AMQException, MessageSource.ExistingConsumerPreventsExclusive, + MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused + { + + final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); + + final GetDeliveryMethod getDeliveryMethod = + new GetDeliveryMethod(singleMessageCredit, session, channel, queue); + final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() + { + + public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) + { + channel.addUnacknowledgedMessage(entry, deliveryTag, null); + } + }; + + ConsumerTarget_0_8 target; + EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES); + if(acks) + { + + target = ConsumerTarget_0_8.createAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); + } + else + { + target = ConsumerTarget_0_8.createGetNoAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); + } + + ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options); + sub.flush(); + sub.close(); + return(getDeliveryMethod.hasDeliveredMessage()); + + + } + + + private static class GetDeliveryMethod implements ClientDeliveryMethod + { + + private final FlowCreditManager _singleMessageCredit; + private final AMQProtocolSession _session; + private final AMQChannel _channel; + private final AMQQueue _queue; + private boolean _deliveredMessage; + + public GetDeliveryMethod(final FlowCreditManager singleMessageCredit, + final AMQProtocolSession session, + final AMQChannel channel, final AMQQueue queue) + { + _singleMessageCredit = singleMessageCredit; + _session = session; + _channel = channel; + _queue = queue; + } + + @Override + public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, + final InstanceProperties props, final long deliveryTag) + { + _singleMessageCredit.useCreditForMessage(message.getSize()); + long size =_session.getProtocolOutputConverter().writeGetOk(message, + props, + _channel.getChannelId(), + deliveryTag, + _queue.getQueueDepthMessages()); + + _deliveredMessage = true; + return size; + } + + public boolean hasDeliveredMessage() + { + return _deliveredMessage; + } + } + public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException { - _basicPublishMethodHandler.methodReceived(getConnection(), body, channelId); + if (_logger.isDebugEnabled()) + { + _logger.debug("Publish received on channel " + channelId); + } + + AMQShortString exchangeName = body.getExchange(); + VirtualHostImpl vHost = _connection.getVirtualHost(); + + // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? + + MessageDestination destination; + + if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName)) + { + destination = vHost.getDefaultDestination(); + } + else + { + destination = vHost.getMessageDestination(exchangeName.toString()); + } + + // if the exchange does not exist we raise a channel exception + if (destination == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name", + _connection.getMethodRegistry()); + } + else + { + // The partially populated BasicDeliver frame plus the received route body + // is stored in the channel. Once the final body frame has been received + // it is routed to the exchange. + AMQChannel channel = _connection.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + + MessagePublishInfo info = new MessagePublishInfo(body.getExchange(), + body.getImmediate(), + body.getMandatory(), + body.getRoutingKey()); + info.setExchange(exchangeName); + try + { + channel.setPublishFrame(info, destination); + } + catch (AccessControlException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + e.getMessage(), + _connection.getMethodRegistry()); + } + } return true; } public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException { - _basicQosHandler.methodReceived(getConnection(), body, channelId); + AMQChannel channel = _connection.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + channel.sync(); + channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount()); + + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody(); + _connection.writeFrame(responseBody.generateFrame(channelId)); + return true; } public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException { - _basicRecoverMethodHandler.methodReceived(getConnection(), body, channelId); + _logger.debug("Recover received on protocol session " + _connection + + " and channel " + channelId); + AMQChannel channel = _connection.getChannel(channelId); + + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + + channel.resend(); + + // Qpid 0-8 hacks a synchronous -ok onto recover. + // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant + if(_connection.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); + channel.sync(); + _connection.writeFrame(recoverOk.generateFrame(channelId)); + + } + return true; } public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException { - _basicRejectMethodHandler.methodReceived(getConnection(), body, channelId); + + AMQChannel channel = _connection.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting:" + body.getDeliveryTag() + + ": Requeue:" + body.getRequeue() + + " on channel:" + channel.debugIdentity()); + } + + long deliveryTag = body.getDeliveryTag(); + + MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag); + + if (message == null) + { + _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); + } + else + { + + if (message.getMessage() == null) + { + _logger.warn("Message has already been purged, unable to Reject."); + } + else + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + + ": Requeue:" + body.getRequeue() + + " on channel:" + channel.debugIdentity()); + } + + if (body.getRequeue()) + { + //this requeue represents a message rejected from the pre-dispatch queue + //therefore we need to amend the delivery counter. + message.decrementDeliveryCount(); + + channel.requeue(deliveryTag); + } + else + { + // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here + // as it would prevent redelivery + // message.reject(); + + final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); + _logger.debug("maxDeliveryCountEnabled: " + + maxDeliveryCountEnabled + + " deliveryTag " + + deliveryTag); + if (maxDeliveryCountEnabled) + { + final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag); + _logger.debug("deliveredTooManyTimes: " + + deliveredTooManyTimes + + " deliveryTag " + + deliveryTag); + if (deliveredTooManyTimes) + { + channel.deadLetter(body.getDeliveryTag()); + } + else + { + //this requeue represents a message rejected because of a recover/rollback that we + //are not ready to DLQ. We rely on the reject command to resend from the unacked map + //and therefore need to increment the delivery counter so we cancel out the effect + //of the AMQChannel#resend() decrement. + message.incrementDeliveryCount(); + } + } + else + { + channel.requeue(deliveryTag); + } + } + } + } return true; } public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException { - _channelOpenHandler.methodReceived(getConnection(), body, channelId); + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + + // Protect the broker against out of order frame request. + if (virtualHost == null) + { + throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null); + } + _logger.info("Connecting to: " + virtualHost.getName()); + + final AMQChannel channel = new AMQChannel(_connection, channelId, virtualHost.getMessageStore()); + + _connection.addChannel(channel); + + ChannelOpenOkBody response; + + + response = _connection.getMethodRegistry().createChannelOpenOkBody(); + + + _connection.writeFrame(response.generateFrame(channelId)); return true; } @@ -186,20 +722,6 @@ public class ServerMethodDispatcherImpl throw new UnexpectedMethodException(body); } - @Override - public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody body, final int channelId) - throws AMQException - { - throw new UnexpectedMethodException(body); - } - - @Override - public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody body, - final int channelId) - throws AMQException - { - throw new UnexpectedMethodException(body); - } public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException { @@ -238,21 +760,64 @@ public class ServerMethodDispatcherImpl public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException { - _channelCloseHandler.methodReceived(getConnection(), body, channelId); + + if (_logger.isInfoEnabled()) + { + _logger.info("Received channel close for id " + channelId + + " citing class " + body.getClassId() + + " and method " + body.getMethodId()); + } + + + AMQChannel channel = _connection.getChannel(channelId); + + if (channel == null) + { + throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, + "Trying to close unknown channel", + _connection.getMethodRegistry()); + } + channel.sync(); + _connection.closeChannel(channelId); + // Client requested closure so we don't wait for ok we send it + _connection.closeChannelOk(channelId); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody(); + _connection.writeFrame(responseBody.generateFrame(channelId)); return true; } public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException { - _channelCloseOkHandler.methodReceived(getConnection(), body, channelId); + + _logger.info("Received channel-close-ok for channel-id " + channelId); + + // Let the Protocol Session know the channel is now closed. + _connection.closeChannelOk(channelId); return true; } public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException { - _channelFlowHandler.methodReceived(getConnection(), body, channelId); + final AMQProtocolSession<?> connection = getConnection(); + + + AMQChannel channel = connection.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); + } + channel.sync(); + channel.setSuspended(!body.getActive()); + _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive()); + + MethodRegistry methodRegistry = connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive()); + connection.writeFrame(responseBody.generateFrame(channelId)); return true; } @@ -269,23 +834,103 @@ public class ServerMethodDispatcherImpl public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException { - _connectionOpenMethodHandler.methodReceived(getConnection(), body, channelId); + + //ignore leading '/' + String virtualHostName; + if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/') + { + virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString(); + } + else + { + virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost()); + } + + VirtualHostImpl virtualHost = ((AmqpPort) _connection.getPort()).getVirtualHost(virtualHostName); + + if (virtualHost == null) + { + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'", + _connection.getMethodRegistry()); + } + else + { + // Check virtualhost access + if (virtualHost.getState() != State.ACTIVE) + { + throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, + "Virtual host '" + virtualHost.getName() + "' is not active", + _connection.getMethodRegistry()); + } + + _connection.setVirtualHost(virtualHost); + try + { + virtualHost.getSecurityManager().authoriseCreateConnection(_connection); + } + catch (AccessControlException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + e.getMessage(), + _connection.getMethodRegistry()); + } + + // See Spec (0.8.2). Section 3.1.2 Virtual Hosts + if (_connection.getContextKey() == null) + { + _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); + } + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); + + _connection.changeState(AMQState.CONNECTION_OPEN); + + _connection.writeFrame(responseBody.generateFrame(channelId)); + } return true; } public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException { - _connectionCloseMethodHandler.methodReceived(getConnection(), body, channelId); + if (_logger.isInfoEnabled()) + { + _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" + + body.getReplyText() + " for " + _connection); + } + try + { + _connection.closeSession(); + } + catch (Exception e) + { + _logger.error("Error closing protocol session: " + e, e); + } + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); + _connection.writeFrame(responseBody.generateFrame(channelId)); + + _connection.closeProtocolSession(); + return true; } public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException { - _connectionCloseOkMethodHandler.methodReceived( - getConnection(), - body, channelId); + _logger.info("Received Connection-close-ok"); + + try + { + _connection.changeState(AMQState.CONNECTION_CLOSED); + _connection.closeSession(); + } + catch (Exception e) + { + _logger.error("Error closing protocol session: " + e, e); + } return true; } @@ -368,92 +1013,1246 @@ public class ServerMethodDispatcherImpl public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException { - _connectionSecureOkMethodHandler.methodReceived( - getConnection(), - body, channelId); + Broker<?> broker = _connection.getBroker(); + + SubjectCreator subjectCreator = _connection.getSubjectCreator(); + + SaslServer ss = _connection.getSaslServer(); + if (ss == null) + { + throw new AMQException("No SASL context set up in session"); + } + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); + switch (authResult.getStatus()) + { + case ERROR: + Exception cause = authResult.getCause(); + + _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); + + // This should be abstracted + _connection.changeState(AMQState.CONNECTION_CLOSING); + + ConnectionCloseBody connectionCloseBody = + methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), + AMQConstant.NOT_ALLOWED.getName(), + body.getClazz(), + body.getMethod()); + + _connection.writeFrame(connectionCloseBody.generateFrame(0)); + disposeSaslServer(_connection); + break; + case SUCCESS: + if (_logger.isInfoEnabled()) + { + _logger.info("Connected as: " + authResult.getSubject()); + } + _connection.changeState(AMQState.CONNECTION_NOT_TUNED); + + int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); + + if(frameMax <= 0) + { + frameMax = Integer.MAX_VALUE; + } + + ConnectionTuneBody tuneBody = + methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), + frameMax, + broker.getConnection_heartBeatDelay()); + _connection.writeFrame(tuneBody.generateFrame(0)); + _connection.setAuthorizedSubject(authResult.getSubject()); + disposeSaslServer(_connection); + break; + case CONTINUE: + _connection.changeState(AMQState.CONNECTION_NOT_AUTH); + + ConnectionSecureBody + secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); + _connection.writeFrame(secureBody.generateFrame(0)); + } return true; } + private void disposeSaslServer(AMQProtocolSession ps) + { + SaslServer ss = ps.getSaslServer(); + if (ss != null) + { + ps.setSaslServer(null); + try + { + ss.dispose(); + } + catch (SaslException e) + { + _logger.error("Error disposing of Sasl server: " + e); + } + } + } + public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException { - _connectionStartOkMethodHandler.methodReceived( - getConnection(), - body, channelId); + Broker<?> broker = _connection.getBroker(); + + _logger.info("SASL Mechanism selected: " + body.getMechanism()); + _logger.info("Locale selected: " + body.getLocale()); + + SubjectCreator subjectCreator = _connection.getSubjectCreator(); + SaslServer ss = null; + try + { + ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), + _connection.getLocalFQDN(), + _connection.getPeerPrincipal()); + + if (ss == null) + { + throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, + "Unable to create SASL Server:" + body.getMechanism(), + _connection.getMethodRegistry()); + } + + _connection.setSaslServer(ss); + + final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); + //save clientProperties + _connection.setClientProperties(body.getClientProperties()); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + + switch (authResult.getStatus()) + { + case ERROR: + Exception cause = authResult.getCause(); + + _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); + + _connection.changeState(AMQState.CONNECTION_CLOSING); + + ConnectionCloseBody closeBody = + methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode + AMQConstant.NOT_ALLOWED.getName(), + body.getClazz(), + body.getMethod()); + + _connection.writeFrame(closeBody.generateFrame(0)); + disposeSaslServer(_connection); + break; + + case SUCCESS: + if (_logger.isInfoEnabled()) + { + _logger.info("Connected as: " + authResult.getSubject()); + } + _connection.setAuthorizedSubject(authResult.getSubject()); + + _connection.changeState(AMQState.CONNECTION_NOT_TUNED); + int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); + + if(frameMax <= 0) + { + frameMax = Integer.MAX_VALUE; + } + + ConnectionTuneBody + tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), + frameMax, + broker.getConnection_heartBeatDelay()); + _connection.writeFrame(tuneBody.generateFrame(0)); + break; + case CONTINUE: + _connection.changeState(AMQState.CONNECTION_NOT_AUTH); + + ConnectionSecureBody + secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); + _connection.writeFrame(secureBody.generateFrame(0)); + } + } + catch (SaslException e) + { + disposeSaslServer(_connection); + throw new AMQException("SASL error: " + e, e); + } return true; } public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException { - _connectionTuneOkMethodHandler.methodReceived(getConnection(), body, channelId); + final AMQProtocolSession<?> connection = getConnection(); + + if (_logger.isDebugEnabled()) + { + _logger.debug(body); + } + connection.changeState(AMQState.CONNECTION_NOT_OPENED); + + connection.initHeartbeats(body.getHeartbeat()); + + int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); + if(brokerFrameMax <= 0) + { + brokerFrameMax = Integer.MAX_VALUE; + } + + if(body.getFrameMax() > (long) brokerFrameMax) + { + throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + body.getFrameMax() + + " greater than the broker will allow: " + + brokerFrameMax, + body.getClazz(), body.getMethod(), + connection.getMethodRegistry(),null); + } + else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) + { + throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + body.getFrameMax() + + " which is smaller than the specification definined minimum: " + + AMQConstant.FRAME_MIN_SIZE.getCode(), + body.getClazz(), body.getMethod(), + connection.getMethodRegistry(),null); + } + int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax(); + connection.setMaxFrameSize(frameMax); + + long maxChannelNumber = body.getChannelMax(); + //0 means no implied limit, except that forced by protocol limitations (0xFFFF) + connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber); return true; } + public static final int OK = 0; + public static final int EXCHANGE_NOT_FOUND = 1; + public static final int QUEUE_NOT_FOUND = 2; + public static final int NO_BINDINGS = 3; + public static final int QUEUE_NOT_BOUND = 4; + public static final int NO_QUEUE_BOUND_WITH_RK = 5; + public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6; + public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException { - _exchangeBoundHandler.methodReceived(getConnection(), body, channelId); + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + + final AMQChannel channel = _connection.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + channel.sync(); + + + AMQShortString exchangeName = body.getExchange(); + AMQShortString queueName = body.getQueue(); + AMQShortString routingKey = body.getRoutingKey(); + ExchangeBoundOkBody response; + + if(isDefaultExchange(exchangeName)) + { + if(routingKey == null) + { + if(queueName == null) + { + response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null); + } + else + { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText + } + else + { + response = methodRegistry.createExchangeBoundOkBody(OK, null); + } + } + } + else + { + if(queueName == null) + { + response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null); + } + else + { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText + } + else + { + response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null); + } + } + } + } + else + { + ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); + if (exchange == null) + { + + + response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND, + AMQShortString.validValueOf("Exchange '" + exchangeName + "' not found")); + } + else if (routingKey == null) + { + if (queueName == null) + { + if (exchange.hasBindings()) + { + response = methodRegistry.createExchangeBoundOkBody(OK, null); + } + else + { + + response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode + null); // replyText + } + } + else + { + + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText + } + else + { + if (exchange.isBound(queue)) + { + + response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode + null); // replyText + } + else + { + + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode + AMQShortString.validValueOf("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText + } + } + } + } + else if (queueName != null) + { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText + } + else + { + String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString(); + if (exchange.isBound(bindingKey, queue)) + { + + response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode + null); // replyText + } + else + { + + String message = "Queue '" + queueName + "' not bound with routing key '" + + body.getRoutingKey() + "' to exchange '" + exchangeName + "'"; + + response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode + AMQShortString.validValueOf(message)); // replyText + } + } + } + else + { + if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString())) + { + + response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode + null); // replyText + } + else + { + + response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode + AMQShortString.validValueOf("No queue bound with routing key '" + body.getRoutingKey() + + "' to exchange '" + exchangeName + "'")); // replyText + } + } + } + _connection.writeFrame(response.generateFrame(channelId)); return true; } public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException { - _exchangeDeclareHandler.methodReceived(getConnection(), body, channelId); + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + final AMQChannel channel = _connection.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + + final AMQShortString exchangeName = body.getExchange(); + if (_logger.isDebugEnabled()) + { + _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName); + } + + ExchangeImpl exchange; + + if(isDefaultExchange(exchangeName)) + { + if(!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(body.getType())) + { + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: " + + " of type " + + ExchangeDefaults.DIRECT_EXCHANGE_CLASS + + " to " + body.getType() +".", + body.getClazz(), body.getMethod(), + _connection.getMethodRegistry(),null); + } + } + else + { + if (body.getPassive()) + { + exchange = virtualHost.getExchange(exchangeName.toString()); + if(exchange == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName, + _connection.getMethodRegistry()); + } + else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString())) + { + + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + exchange.getType() + + " to " + body.getType() +".", + body.getClazz(), body.getMethod(), + _connection.getMethodRegistry(),null); + } + + } + else + { + try + { + String name = exchangeName == null ? null : exchangeName.intern().toString(); + String type = body.getType() == null ? null : body.getType().intern().toString(); + + Map<String,Object> attributes = new HashMap<String, Object>(); + if(body.getArguments() != null) + { + attributes.putAll(FieldTable.convertToMap(body.getArguments())); + } + attributes.put(org.apache.qpid.server.model.Exchange.ID, null); + attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE)) + { + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + } + exchange = virtualHost.createExchange(attributes); + + } + catch(ReservedExchangeNameException e) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Attempt to declare exchange: " + exchangeName + + " which begins with reserved prefix.", + _connection.getMethodRegistry()); + + } + catch(ExchangeExistsException e) + { + exchange = e.getExistingExchange(); + if(!new AMQShortString(exchange.getType()).equals(body.getType())) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + + exchange.getType() + + " to " + body.getType() + ".", + _connection.getMethodRegistry()); + } + } + catch(NoFactoryForTypeException e) + { + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, + "Unknown exchange type '" + + e.getType() + + "' for exchange '" + + exchangeName + + "'", + _connection.getMethodRegistry()); + } + catch (AccessControlException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + e.getMessage(), + _connection.getMethodRegistry()); + } + catch (UnknownConfiguredObjectException e) + { + // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur + throw body.getConnectionException(AMQConstant.NOT_FOUND, + "Unknown alternate exchange " + + (e.getName() != null + ? "name: \"" + e.getName() + "\"" + : "id: " + e.getId()), + _connection.getMethodRegistry()); + } + catch (IllegalArgumentException e) + { + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, + "Error creating exchange '" + + exchangeName + + "': " + + e.getMessage(), + _connection.getMethodRegistry()); + } + } + } + + if(!body.getNowait()) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); + channel.sync(); + _connection.writeFrame(responseBody.generateFrame(channelId)); + } return true; } public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException { - _exchangeDeleteHandler.methodReceived(getConnection(), body, channelId); + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + final AMQChannel channel = _connection.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + channel.sync(); + try + { + + if(isDefaultExchange(body.getExchange())) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Default Exchange cannot be deleted", + _connection.getMethodRegistry()); + } + + final String exchangeName = body.getExchange().toString(); + + final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); + if(exchange == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(), + _connection.getMethodRegistry()); + } + + virtualHost.removeExchange(exchange, !body.getIfUnused()); + + ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody(); + + _connection.writeFrame(responseBody.generateFrame(channelId)); + } + + catch (ExchangeIsAlternateException e) + { + throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange", + _connection.getMethodRegistry()); + + } + catch (RequiredExchangeException e) + { + throw body.getChannelException(AMQConstant.NOT_ALLOWED, + "Exchange '" + body.getExchange() + "' cannot be deleted", + _connection.getMethodRegistry()); + } + catch (AccessControlException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + e.getMessage(), + _connection.getMethodRegistry()); + } return true; } + private boolean isDefaultExchange(final AMQShortString exchangeName) + { + return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); + } + public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException { - _queueBindHandler.methodReceived(getConnection(), body, channelId); + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + AMQChannel channel = _connection.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + + final AMQQueue queue; + final AMQShortString routingKey; + + final AMQShortString queueName = body.getQueue(); + + if (queueName == null) + { + + queue = channel.getDefaultQueue(); + + if (queue == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, + "No default queue defined on channel and queue was null", + _connection.getMethodRegistry()); + } + + if (body.getRoutingKey() == null) + { + routingKey = AMQShortString.valueOf(queue.getName()); + } + else + { + routingKey = body.getRoutingKey().intern(); + } + } + else + { + queue = virtualHost.getQueue(queueName.toString()); + routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern(); + } + + if (queue == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.", + _connection.getMethodRegistry()); + } + + if(isDefaultExchange(body.getExchange())) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Cannot bind the queue " + queueName + " to the default exchange", + _connection.getMethodRegistry()); + } + + final String exchangeName = body.getExchange().toString(); + + final ExchangeImpl exch = virtualHost.getExchange(exchangeName); + if (exch == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.", + _connection.getMethodRegistry()); + } + + + try + { + + Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments()); + String bindingKey = String.valueOf(routingKey); + + if (!exch.isBound(bindingKey, arguments, queue)) + { + + if(!exch.addBinding(bindingKey, queue, arguments) && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType())) + { + exch.replaceBinding(bindingKey, queue, arguments); + } + } + } + catch (AccessControlException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + e.getMessage(), + _connection.getMethodRegistry()); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); + } + if (!body.getNowait()) + { + channel.sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); + _connection.writeFrame(responseBody.generateFrame(channelId)); + + } return true; } public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException { - _queueDeclareHandler.methodReceived(getConnection(), body, channelId); + final AMQSessionModel session = _connection.getChannel(channelId); + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + + final AMQShortString queueName; + + // if we aren't given a queue name, we create one which we return to the client + if ((body.getQueue() == null) || (body.getQueue().length() == 0)) + { + queueName = new AMQShortString("tmp_" + UUID.randomUUID()); + } + else + { + queueName = body.getQueue().intern(); + } + + AMQQueue queue; + + //TODO: do we need to check that the queue already exists with exactly the same "configuration"? + + AMQChannel channel = _connection.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + + if(body.getPassive()) + { + queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; + throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry()); + } + else + { + if (!queue.verifySessionAccess(channel)) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + + queue.getName() + + " is exclusive, but not created on this Connection.", + _connection.getMethodRegistry()); + } + + //set this as the default queue on the channel: + channel.setDefaultQueue(queue); + } + } + else + { + + try + { + + queue = createQueue(channel, queueName, body, virtualHost, _connection); + + } + catch(QueueExistsException qe) + { + + queue = qe.getExistingQueue(); + + if (!queue.verifySessionAccess(channel)) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + + queue.getName() + + " is exclusive, but not created on this Connection.", + _connection.getMethodRegistry()); + } + else if(queue.isExclusive() != body.getExclusive()) + { + + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + + queue.getName() + + "' with different exclusivity (was: " + + queue.isExclusive() + + " requested " + + body.getExclusive() + + ")", + _connection.getMethodRegistry()); + } + else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) + || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT))) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + + queue.getName() + + "' with different lifetime policy (was: " + + queue.getLifetimePolicy() + + " requested autodelete: " + + body.getAutoDelete() + + ")", + _connection.getMethodRegistry()); + } + else if(queue.isDurable() != body.getDurable()) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + + queue.getName() + + "' with different durability (was: " + + queue.isDurable() + + " requested " + + body.getDurable() + + ")", + _connection.getMethodRegistry()); + } + + } + catch (AccessControlException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + e.getMessage(), + _connection.getMethodRegistry()); + } + + //set this as the default queue on the channel: + channel.setDefaultQueue(queue); + } + + if (!body.getNowait()) + { + channel.sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeclareOkBody responseBody = + methodRegistry.createQueueDeclareOkBody(queueName, + queue.getQueueDepthMessages(), + queue.getConsumerCount()); + _connection.writeFrame(responseBody.generateFrame(channelId)); + + _logger.info("Queue " + queueName + " declared successfully"); + } return true; } + protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName, + QueueDeclareBody body, + final VirtualHostImpl virtualHost, + final AMQProtocolSession session) + throws AMQException, QueueExistsException + { + + final boolean durable = body.getDurable(); + final boolean autoDelete = body.getAutoDelete(); + final boolean exclusive = body.getExclusive(); + + + Map<String, Object> attributes = + QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); + final String queueNameString = AMQShortString.toString(queueName); + attributes.put(Queue.NAME, queueNameString); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.DURABLE, durable); + + LifetimePolicy lifetimePolicy; + ExclusivityPolicy exclusivityPolicy; + + if(exclusive) + { + lifetimePolicy = autoDelete + ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; + exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; + } + else + { + lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; + exclusivityPolicy = ExclusivityPolicy.NONE; + } + + attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); + attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); + + + final AMQQueue queue = virtualHost.createQueue(attributes); + + return queue; + } + public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException { - _queueDeleteHandler.methodReceived(getConnection(), body, channelId); + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + + AMQChannel channel = _connection.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + } + channel.sync(); + AMQQueue queue; + if (body.getQueue() == null) + { + + //get the default queue on the channel: + queue = channel.getDefaultQueue(); + } + else + { + queue = virtualHost.getQueue(body.getQueue().toString()); + } + + if (queue == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", + _connection.getMethodRegistry()); + + } + else + { + if (body.getIfEmpty() && !queue.isEmpty()) + { + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.", + _connection.getMethodRegistry()); + } + else if (body.getIfUnused() && !queue.isUnused()) + { + // TODO - Error code + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.", + _connection.getMethodRegistry()); + } + else + { + if (!queue.verifySessionAccess(channel)) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + + queue.getName() + + " is exclusive, but not created on this Connection.", + _connection.getMethodRegistry()); + } + + int purged = 0; + try + { + purged = virtualHost.removeQueue(queue); + } + catch (AccessControlException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + e.getMessage(),
[... 318 lines stripped ...] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
