Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Oct 17 14:23:19 2014 @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8; +import static org.apache.qpid.transport.util.Functions.hex; + import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.PrivilegedAction; @@ -47,14 +49,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; @@ -62,6 +58,7 @@ import org.apache.qpid.server.configurat import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -69,6 +66,7 @@ import org.apache.qpid.server.filter.Fil import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -85,13 +83,18 @@ import org.apache.qpid.server.model.Conf import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.NoFactoryForTypeException; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.UnknownConfiguredObjectException; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.protocol.ConsumerListener; -import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -103,12 +106,18 @@ import org.apache.qpid.server.txn.LocalT import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.virtualhost.ExchangeExistsException; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.QueueExistsException; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; +import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.TransportException; -public class AMQChannel<T extends AMQProtocolSession<T>> - implements AMQSessionModel<AMQChannel<T>,T>, - AsyncAutoCommitTransaction.FutureRecorder +public class AMQChannel + implements AMQSessionModel<AMQChannel, AMQProtocolEngine>, + AsyncAutoCommitTransaction.FutureRecorder, + ServerChannelMethodProcessor { public static final int DEFAULT_PREFETCH = 4096; @@ -159,7 +168,7 @@ public class AMQChannel<T extends AMQPro private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final T _session; + private final AMQProtocolEngine _connection; private AtomicBoolean _closing = new AtomicBoolean(false); private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); @@ -181,8 +190,8 @@ public class AMQChannel<T extends AMQPro private final TransactionTimeoutHelper _transactionTimeoutHelper; private final UUID _id = UUID.randomUUID(); - private final List<Action<? super AMQChannel<T>>> _taskList = - new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>(); + private final List<Action<? super AMQChannel>> _taskList = + new CopyOnWriteArrayList<Action<? super AMQChannel>>(); private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); @@ -194,15 +203,14 @@ public class AMQChannel<T extends AMQPro private Session<?> _modelObject; - public AMQChannel(T session, int channelId, final MessageStore messageStore) - throws AMQException + public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) { - _session = session; + _connection = connection; _channelId = channelId; - _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(), - session.getAuthorizedSubject().getPublicCredentials(), - session.getAuthorizedSubject().getPrivateCredentials()); + _subject = new Subject(false, connection.getAuthorizedSubject().getPrincipals(), + connection.getAuthorizedSubject().getPublicCredentials(), + connection.getAuthorizedSubject().getPrivateCredentials()); _subject.getPrincipals().add(new SessionPrincipal(this)); _logSubject = new ChannelLogSubject(this); @@ -211,7 +219,7 @@ public class AMQChannel<T extends AMQPro // by default the session is non-transactional _transaction = new AsyncAutoCommitTransaction(_messageStore, this); - _clientDeliveryMethod = session.createDeliveryMethod(_channelId); + _clientDeliveryMethod = connection.createDeliveryMethod(_channelId); _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() { @@ -242,6 +250,52 @@ public class AMQChannel<T extends AMQPro } + private boolean performGet(final AMQQueue queue, + final boolean acks) + throws MessageSource.ExistingConsumerPreventsExclusive, + MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused + { + + final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); + + final GetDeliveryMethod getDeliveryMethod = + new GetDeliveryMethod(singleMessageCredit, queue); + final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() + { + + public void recordMessageDelivery(final ConsumerImpl sub, + final MessageInstance entry, + final long deliveryTag) + { + addUnacknowledgedMessage(entry, deliveryTag, null); + } + }; + + ConsumerTarget_0_8 target; + EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES); + if (acks) + { + + target = ConsumerTarget_0_8.createAckTarget(this, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); + } + else + { + target = ConsumerTarget_0_8.createGetNoAckTarget(this, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); + } + + ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options); + sub.flush(); + sub.close(); + return getDeliveryMethod.hasDeliveredMessage(); + + + } + /** Sets this channel to be part of a local transaction */ public void setLocalTransactional() { @@ -250,7 +304,7 @@ public class AMQChannel<T extends AMQPro @Override public long getActivityTime() { - return _session.getLastReceivedTime(); + return _connection.getLastReceivedTime(); } }); _txnStarts.incrementAndGet(); @@ -324,27 +378,18 @@ public class AMQChannel<T extends AMQPro } public void publishContentHeader(ContentHeaderBody contentHeaderBody) - throws AMQException { - if (_currentMessage == null) + if (_logger.isDebugEnabled()) { - throw new AMQException("Received content header without previously receiving a BasicPublish frame"); + _logger.debug("Content header received on channel " + _channelId); } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content header received on channel " + _channelId); - } - _currentMessage.setContentHeaderBody(contentHeaderBody); + _currentMessage.setContentHeaderBody(contentHeaderBody); - deliverCurrentMessageIfComplete(); - } + deliverCurrentMessageIfComplete(); } private void deliverCurrentMessageIfComplete() - throws AMQException { // check and deliver if header says body length is zero if (_currentMessage.allContentReceived()) @@ -355,7 +400,7 @@ public class AMQChannel<T extends AMQPro final MessageMetaData messageMetaData = new MessageMetaData(_currentMessage.getMessagePublishInfo(), _currentMessage.getContentHeader(), - getProtocolSession().getLastReceivedTime()); + getConnection().getLastReceivedTime()); final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData); final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle); @@ -430,7 +475,7 @@ public class AMQChannel<T extends AMQPro { long bodySize = _currentMessage.getSize(); long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp(); - _session.registerMessageReceived(bodySize, timestamp); + _connection.registerMessageReceived(bodySize, timestamp); _currentMessage = null; } } @@ -443,13 +488,13 @@ public class AMQChannel<T extends AMQPro * Pre-requisite: the current message is judged to have no destination queues. * * @throws AMQConnectionException if the message is mandatory close-on-no-route - * @see AMQProtocolSession#isCloseWhenNoRoute() + * @see AMQProtocolEngine#isCloseWhenNoRoute() */ - private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException + private void handleUnroutableMessage(AMQMessage message) { boolean mandatory = message.isMandatory(); String description = currentMessageDescription(); - boolean closeOnNoRoute = _session.isCloseWhenNoRoute(); + boolean closeOnNoRoute = _connection.isCloseWhenNoRoute(); if(_logger.isDebugEnabled()) { @@ -458,29 +503,29 @@ public class AMQChannel<T extends AMQPro description, mandatory, isTransactional(), closeOnNoRoute)); } - if (mandatory && isTransactional() && _session.isCloseWhenNoRoute()) - { - throw new AMQConnectionException( - AMQConstant.NO_ROUTE, - "No route for message " + currentMessageDescription(), - 0, 0, // default class and method ids - getProtocolSession().getProtocolVersion().getMajorVersion(), - getProtocolSession().getProtocolVersion().getMinorVersion(), - (Throwable) null); - } - - if (mandatory || message.isImmediate()) + if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute()) { - _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message)); + _connection.closeConnection(AMQConstant.NO_ROUTE, + "No route for message " + currentMessageDescription(), _channelId); } else { - AMQShortString exchangeName = _currentMessage.getExchangeName(); - AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey(); + if (mandatory || message.isImmediate()) + { + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, + "No Route for message " + + currentMessageDescription(), + message)); + } + else + { + AMQShortString exchangeName = _currentMessage.getExchangeName(); + AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey(); - getVirtualHost().getEventLogger().message( - ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(), - routingKey == null ? null : routingKey.asString())); + getVirtualHost().getEventLogger().message( + ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(), + routingKey == null ? null : routingKey.asString())); + } } } @@ -499,13 +544,8 @@ public class AMQChannel<T extends AMQPro : _currentMessage.getMessagePublishInfo().getRoutingKey().toString()); } - public void publishContentBody(ContentBody contentBody) throws AMQException + public void publishContentBody(ContentBody contentBody) { - if (_currentMessage == null) - { - throw new AMQException("Received content body without previously receiving a Content Header"); - } - if (_logger.isDebugEnabled()) { _logger.debug(debugIdentity() + " content body received on channel " + _channelId); @@ -517,13 +557,6 @@ public class AMQChannel<T extends AMQPro deliverCurrentMessageIfComplete(); } - catch (AMQException e) - { - // we want to make sure we don't keep a reference to the message in the - // event of an error - _currentMessage = null; - throw e; - } catch (RuntimeException e) { // we want to make sure we don't keep a reference to the message in the @@ -566,9 +599,10 @@ public class AMQChannel<T extends AMQPro */ public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) - throws AMQException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, - MessageSource.ConsumerAccessRefused + throws MessageSource.ExistingConsumerPreventsExclusive, + MessageSource.ExistingExclusiveConsumer, + AMQInvalidArgumentException, + MessageSource.ConsumerAccessRefused, ConsumerTagInUseException { if (tag == null) { @@ -577,7 +611,7 @@ public class AMQChannel<T extends AMQPro if (_tag2SubscriptionTargetMap.containsKey(tag)) { - throw new AMQException("Consumer already exists with same tag: " + tag); + throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag); } ConsumerTarget_0_8 target; @@ -649,27 +683,11 @@ public class AMQChannel<T extends AMQPro } } } - catch (AccessControlException e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (MessageSource.ExistingExclusiveConsumer e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (MessageSource.ExistingConsumerPreventsExclusive e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (AMQInvalidArgumentException e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (MessageSource.ConsumerAccessRefused e) + catch (AccessControlException + | MessageSource.ExistingExclusiveConsumer + | MessageSource.ExistingConsumerPreventsExclusive + | AMQInvalidArgumentException + | MessageSource.ConsumerAccessRefused e) { _tag2SubscriptionTargetMap.remove(tag); throw e; @@ -730,7 +748,7 @@ public class AMQChannel<T extends AMQPro unsubscribeAllConsumers(); - for (Action<? super AMQChannel<T>> task : _taskList) + for (Action<? super AMQChannel> task : _taskList) { task.performAction(this); } @@ -897,9 +915,8 @@ public class AMQChannel<T extends AMQPro /** * Called to resend all outstanding unacknowledged messages to this same channel. * - * @throws AMQException When something goes wrong. */ - public void resend() throws AMQException + public void resend() { @@ -985,9 +1002,8 @@ public class AMQChannel<T extends AMQPro * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only * acknowledges the single message specified by the delivery tag * - * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException + public void acknowledgeMessage(long deliveryTag, boolean multiple) { Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); @@ -1084,22 +1100,13 @@ public class AMQChannel<T extends AMQPro public boolean isSuspended() { - return _suspended.get() || _closing.get() || _session.isClosing(); + return _suspended.get() || _closing.get() || _connection.isClosing(); } - public void commit() throws AMQException - { - commit(null, false); - } - - public void commit(final Runnable immediateAction, boolean async) throws AMQException + public void commit(final Runnable immediateAction, boolean async) { - if (!isTransactional()) - { - throw new AMQException("Fatal error: commit called on non-transactional channel"); - } if(async && _transaction instanceof LocalTransaction) { @@ -1132,17 +1139,8 @@ public class AMQChannel<T extends AMQPro } } - public void rollback() throws AMQException - { - rollback(NULL_TASK); - } - - public void rollback(Runnable postRollbackTask) throws AMQException + public void rollback(Runnable postRollbackTask) { - if (!isTransactional()) - { - throw new AMQException("Fatal error: commit called on non-transactional channel"); - } // stop all subscriptions _rollingBack = true; @@ -1200,7 +1198,7 @@ public class AMQChannel<T extends AMQPro public String toString() { - return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]"; + return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]"; } public void setDefaultQueue(AMQQueue queue) @@ -1219,9 +1217,9 @@ public class AMQChannel<T extends AMQPro return _closing.get(); } - public AMQProtocolSession getProtocolSession() + public AMQProtocolEngine getConnection() { - return _session; + return _connection; } public FlowCreditManager getCreditManager() @@ -1261,13 +1259,9 @@ public class AMQChannel<T extends AMQPro private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle) - throws AMQException { - AMQMessage message = new AMQMessage(handle, _session.getReference()); - - final BasicContentHeaderProperties properties = - incomingMessage.getContentHeader().getProperties(); + AMQMessage message = new AMQMessage(handle, _connection.getReference()); return message; } @@ -1275,7 +1269,7 @@ public class AMQChannel<T extends AMQPro private boolean checkMessageUserId(ContentHeaderBody header) { AMQShortString userID = header.getProperties().getUserId(); - return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); + return (!_messageAuthorizationRequired || _connection.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); } @@ -1286,14 +1280,14 @@ public class AMQChannel<T extends AMQPro } @Override - public T getConnectionModel() + public AMQProtocolEngine getConnectionModel() { - return _session; + return _connection; } public String getClientID() { - return String.valueOf(_session.getContextKey()); + return String.valueOf(_connection.getContextKey()); } public LogSubject getLogSubject() @@ -1308,13 +1302,13 @@ public class AMQChannel<T extends AMQPro } @Override - public void addDeleteTask(final Action<? super AMQChannel<T>> task) + public void addDeleteTask(final Action<? super AMQChannel> task) { _taskList.add(task); } @Override - public void removeDeleteTask(final Action<? super AMQChannel<T>> task) + public void removeDeleteTask(final Action<? super AMQChannel> task) { _taskList.remove(task); } @@ -1324,6 +1318,46 @@ public class AMQChannel<T extends AMQPro return _subject; } + public boolean hasCurrentMessage() + { + return _currentMessage != null; + } + + private class GetDeliveryMethod implements ClientDeliveryMethod + { + + private final FlowCreditManager _singleMessageCredit; + private final AMQQueue _queue; + private boolean _deliveredMessage; + + public GetDeliveryMethod(final FlowCreditManager singleMessageCredit, + final AMQQueue queue) + { + _singleMessageCredit = singleMessageCredit; + _queue = queue; + } + + @Override + public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, + final InstanceProperties props, final long deliveryTag) + { + _singleMessageCredit.useCreditForMessage(message.getSize()); + long size = _connection.getProtocolOutputConverter().writeGetOk(message, + props, + AMQChannel.this.getChannelId(), + deliveryTag, + _queue.getQueueDepthMessages()); + + _deliveredMessage = true; + return size; + } + + public boolean hasDeliveredMessage() + { + return _deliveredMessage; + } + } + private class ImmediateAction implements Action<MessageInstance> { @@ -1352,7 +1386,7 @@ public class AMQChannel<T extends AMQPro public void postCommit() { final ProtocolOutputConverter outputConverter = - _session.getProtocolOutputConverter(); + _connection.getProtocolOutputConverter(); outputConverter.writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), @@ -1475,7 +1509,7 @@ public class AMQChannel<T extends AMQPro public void postCommit() { AMQMessage message = _reference.getMessage(); - _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), + _connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), message, _channelId, @@ -1544,7 +1578,7 @@ public class AMQChannel<T extends AMQPro @Override public Object getConnectionReference() { - return getProtocolSession().getReference(); + return getConnection().getReference(); } public int getUnacknowledgedMessageCount() @@ -1554,9 +1588,9 @@ public class AMQChannel<T extends AMQPro private void flow(boolean flow) { - MethodRegistry methodRegistry = _session.getMethodRegistry(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); - _session.writeFrame(responseBody.generateFrame(_channelId)); + _connection.writeFrame(responseBody.generateFrame(_channelId)); } @Override @@ -1567,7 +1601,7 @@ public class AMQChannel<T extends AMQPro public VirtualHostImpl getVirtualHost() { - return getProtocolSession().getVirtualHost(); + return getConnection().getVirtualHost(); } public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) @@ -1581,11 +1615,11 @@ public class AMQChannel<T extends AMQPro */ private void closeConnection(String reason) throws AMQException { - Lock receivedLock = _session.getReceivedLock(); + Lock receivedLock = _connection.getReceivedLock(); receivedLock.lock(); try { - _session.close(AMQConstant.RESOURCE_ERROR, reason); + _connection.close(AMQConstant.RESOURCE_ERROR, reason); } finally { @@ -1593,7 +1627,7 @@ public class AMQChannel<T extends AMQPro } } - public void deadLetter(long deliveryTag) throws AMQException + public void deadLetter(long deliveryTag) { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag); @@ -1816,4 +1850,1521 @@ public class AMQChannel<T extends AMQPro return 0L; } } + + @Override + public void receiveAccessRequest(final AMQShortString realm, + final boolean exclusive, + final boolean passive, + final boolean active, final boolean write, final boolean read) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] AccessRequest[" +" realm: " + realm + + " exclusive: " + exclusive + + " passive: " + passive + + " active: " + active + + " write: " + write + " read: " + read + " ]"); + } + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + + if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion())) + { + _connection.closeConnection(AMQConstant.COMMAND_INVALID, + "AccessRequest not present in AMQP versions other than 0-8, 0-9", + _channelId); + } + else + { + // We don't implement access control class, but to keep clients happy that expect it + // always use the "0" ticket. + AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0); + sync(); + _connection.writeFrame(response.generateFrame(_channelId)); + } + } + + @Override + public void receiveBasicAck(final long deliveryTag, final boolean multiple) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicAck[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " ]"); + } + + acknowledgeMessage(deliveryTag, multiple); + } + + @Override + public void receiveBasicCancel(final AMQShortString consumerTag, final boolean nowait) + { + + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicCancel[" +" consumerTag: " + consumerTag + " noWait: " + nowait + " ]"); + } + + unsubscribeConsumer(consumerTag); + if (!nowait) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag); + sync(); + _connection.writeFrame(cancelOkBody.generateFrame(_channelId)); + } + } + + @Override + public void receiveBasicConsume(final AMQShortString queue, + final AMQShortString consumerTag, + final boolean noLocal, + final boolean noAck, + final boolean exclusive, final boolean nowait, final FieldTable arguments) + { + + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicConsume[" +" queue: " + queue + + " consumerTag: " + consumerTag + + " noLocal: " + noLocal + + " noAck: " + noAck + + " exclusive: " + exclusive + " nowait: " + nowait + " arguments: " + arguments + " ]"); + } + + AMQShortString consumerTag1 = consumerTag; + VirtualHostImpl<?, ?, ?> vHost = _connection.getVirtualHost(); + sync(); + String queueName = queue == null ? null : queue.asString(); + + MessageSource queue1 = queueName == null ? getDefaultQueue() : vHost.getQueue(queueName); + final Collection<MessageSource> sources = new HashSet<>(); + if (queue1 != null) + { + sources.add(queue1); + } + else if (vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") + && arguments != null + && arguments.get("x-multiqueue") instanceof Collection) + { + for (Object object : (Collection<Object>) arguments.get("x-multiqueue")) + { + String sourceName = String.valueOf(object); + sourceName = sourceName.trim(); + if (sourceName.length() != 0) + { + MessageSource source = vHost.getMessageSource(sourceName); + if (source == null) + { + sources.clear(); + break; + } + else + { + sources.add(source); + } + } + } + queueName = arguments.get("x-multiqueue").toString(); + } + + if (sources.isEmpty()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("No queue for '" + queueName + "'"); + } + if (queueName != null) + { + closeChannel(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'"); + } + else + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, + "No queue name provided, no default queue defined.", _channelId); + } + } + else + { + try + { + consumerTag1 = consumeFromSource(consumerTag1, + sources, + !noAck, + arguments, + exclusive, + noLocal); + if (!nowait) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag1); + _connection.writeFrame(responseBody.generateFrame(_channelId)); + + } + } + catch (ConsumerTagInUseException cte) + { + + _connection.closeConnection(AMQConstant.NOT_ALLOWED, + "Non-unique consumer tag, '" + consumerTag1 + + "'", _channelId); + } + catch (AMQInvalidArgumentException ise) + { + _connection.closeConnection(AMQConstant.ARGUMENT_INVALID, ise.getMessage(), _channelId); + + + } + catch (AMQQueue.ExistingExclusiveConsumer e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, + "Cannot subscribe to queue " + + queue1.getName() + + " as it already has an existing exclusive consumer", _channelId); + + } + catch (AMQQueue.ExistingConsumerPreventsExclusive e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, + "Cannot subscribe to queue " + + queue1.getName() + + " exclusively as it already has a consumer", _channelId); + + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + + queue1.getName() + + " permission denied", _channelId); + + } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, + "Cannot subscribe to queue " + + queue1.getName() + + " as it already has an incompatible exclusivity policy", _channelId); + + } + + } + } + + @Override + public void receiveBasicGet(final AMQShortString queueName, final boolean noAck) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicGet[" +" queue: " + queueName + " noAck: " + noAck + " ]"); + } + + VirtualHostImpl vHost = _connection.getVirtualHost(); + sync(); + AMQQueue queue = queueName == null ? getDefaultQueue() : vHost.getQueue(queueName.toString()); + if (queue == null) + { + _logger.info("No queue for '" + queueName + "'"); + if (queueName != null) + { + _connection.closeConnection(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'", _channelId); + + } + else + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, + "No queue name provided, no default queue defined.", _channelId); + + } + } + else + { + + try + { + if (!performGet(queue, !noAck)) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + + BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); + + _connection.writeFrame(responseBody.generateFrame(_channelId)); + } + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), _channelId); + } + catch (MessageSource.ExistingExclusiveConsumer e) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue has an exclusive consumer", _channelId); + } + catch (MessageSource.ExistingConsumerPreventsExclusive e) + { + _connection.closeConnection(AMQConstant.INTERNAL_ERROR, + "The GET request has been evaluated as an exclusive consumer, " + + "this is likely due to a programming error in the Qpid broker", _channelId); + } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, + "Queue has an incompatible exclusivity policy", _channelId); + } + } + } + + @Override + public void receiveBasicPublish(final AMQShortString exchangeName, + final AMQShortString routingKey, + final boolean mandatory, + final boolean immediate) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicPublish[" +" exchange: " + exchangeName + + " routingKey: " + routingKey + + " mandatory: " + mandatory + + " immediate: " + immediate + " ]"); + } + + VirtualHostImpl vHost = _connection.getVirtualHost(); + + MessageDestination destination; + + if (isDefaultExchange(exchangeName)) + { + destination = vHost.getDefaultDestination(); + } + else + { + destination = vHost.getMessageDestination(exchangeName.toString()); + } + + // if the exchange does not exist we raise a channel exception + if (destination == null) + { + closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName); + } + else + { + + MessagePublishInfo info = new MessagePublishInfo(exchangeName, + immediate, + mandatory, + routingKey); + + try + { + setPublishFrame(info, destination); + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + + } + } + } + + @Override + public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicQos[" +" prefetchSize: " + prefetchSize + " prefetchCount: " + prefetchCount + " global: " + global + " ]"); + } + + sync(); + setCredit(prefetchSize, prefetchCount); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + } + + @Override + public void receiveBasicRecover(final boolean requeue, final boolean sync) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicRecover[" + " requeue: " + requeue + " sync: " + sync + " ]"); + } + + resend(); + + if (sync) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); + sync(); + _connection.writeFrame(recoverOk.generateFrame(getChannelId())); + + } + + } + + @Override + public void receiveBasicReject(final long deliveryTag, final boolean requeue) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicReject[" +" deliveryTag: " + deliveryTag + " requeue: " + requeue + " ]"); + } + + MessageInstance message = getUnacknowledgedMessageMap().get(deliveryTag); + + if (message == null) + { + _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); + } + else + { + + if (message.getMessage() == null) + { + _logger.warn("Message has already been purged, unable to Reject."); + } + else + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting: DT:" + deliveryTag + + "-" + message.getMessage() + + ": Requeue:" + requeue + + + " on channel:" + debugIdentity()); + } + + if (requeue) + { + //this requeue represents a message rejected from the pre-dispatch queue + //therefore we need to amend the delivery counter. + message.decrementDeliveryCount(); + + requeue(deliveryTag); + } + else + { + // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here + // as it would prevent redelivery + // message.reject(); + + final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag); + _logger.debug("maxDeliveryCountEnabled: " + + maxDeliveryCountEnabled + + " deliveryTag " + + deliveryTag); + if (maxDeliveryCountEnabled) + { + final boolean deliveredTooManyTimes = isDeliveredTooManyTimes(deliveryTag); + _logger.debug("deliveredTooManyTimes: " + + deliveredTooManyTimes + + " deliveryTag " + + deliveryTag); + if (deliveredTooManyTimes) + { + deadLetter(deliveryTag); + } + else + { + //this requeue represents a message rejected because of a recover/rollback that we + //are not ready to DLQ. We rely on the reject command to resend from the unacked map + //and therefore need to increment the delivery counter so we cancel out the effect + //of the AMQChannel#resend() decrement. + message.incrementDeliveryCount(); + } + } + else + { + requeue(deliveryTag); + } + } + } + } + } + + @Override + public void receiveChannelClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ChannelClose[" +" replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]"); + } + + + sync(); + _connection.closeChannel(this); + + _connection.writeFrame(new AMQFrame(getChannelId(), + _connection.getMethodRegistry().createChannelCloseOkBody())); + } + + @Override + public void receiveChannelCloseOk() + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ChannelCloseOk"); + } + + _connection.closeChannelOk(getChannelId()); + } + + @Override + public void receiveMessageContent(final byte[] data) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] MessageContent[" +" data: " + hex(data,_connection.getBinaryDataLimit()) + " ] "); + } + + if(hasCurrentMessage()) + { + publishContentBody(new ContentBody(data)); + } + else + { + _connection.closeConnection(AMQConstant.COMMAND_INVALID, + "Attempt to send a content header without first sending a publish frame", + _channelId); + } + } + + @Override + public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] MessageHeader[ properties: {" + properties + "} bodySize: " + bodySize + " ]"); + } + + if(hasCurrentMessage()) + { + publishContentHeader(new ContentHeaderBody(properties, bodySize)); + } + else + { + _connection.closeConnection(AMQConstant.COMMAND_INVALID, + "Attempt to send a content header without first sending a publish frame", + _channelId); + } + } + + @Override + public boolean ignoreAllButCloseOk() + { + return _connection.ignoreAllButCloseOk() || _connection.channelAwaitingClosure(_channelId); + } + + @Override + public void receiveChannelFlow(final boolean active) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ChannelFlow[" +" active: " + active + " ]"); + } + + + sync(); + setSuspended(!active); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(active); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + } + + @Override + public void receiveChannelFlowOk(final boolean active) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ChannelFlowOk[" +" active: " + active + " ]"); + } + + // TODO - should we do anything here? + } + + @Override + public void receiveExchangeBound(final AMQShortString exchangeName, + final AMQShortString routingKey, + final AMQShortString queueName) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ExchangeBound[" +" exchange: " + exchangeName + " routingKey: " + + routingKey + " queue: " + queueName + " ]"); + } + + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + + sync(); + + int replyCode; + String replyText; + + if (isDefaultExchange(exchangeName)) + { + if (routingKey == null) + { + if (queueName == null) + { + replyCode = virtualHost.getQueues().isEmpty() + ? ExchangeBoundOkBody.NO_BINDINGS + : ExchangeBoundOkBody.OK; + replyText = null; + + } + else + { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; + replyText = "Queue '" + queueName + "' not found"; + } + else + { + replyCode = ExchangeBoundOkBody.OK; + replyText = null; + } + } + } + else + { + if (queueName == null) + { + replyCode = virtualHost.getQueue(routingKey.toString()) == null + ? ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK + : ExchangeBoundOkBody.OK; + replyText = null; + } + else + { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + + replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; + replyText = "Queue '" + queueName + "' not found"; + } + else + { + replyCode = queueName.equals(routingKey) + ? ExchangeBoundOkBody.OK + : ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK; + replyText = null; + } + } + } + } + else + { + ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); + if (exchange == null) + { + + replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND; + replyText = "Exchange '" + exchangeName + "' not found"; + } + else if (routingKey == null) + { + if (queueName == null) + { + if (exchange.hasBindings()) + { + replyCode = ExchangeBoundOkBody.OK; + replyText = null; + } + else + { + replyCode = ExchangeBoundOkBody.NO_BINDINGS; + replyText = null; + } + } + else + { + + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; + replyText = "Queue '" + queueName + "' not found"; + } + else + { + if (exchange.isBound(queue)) + { + replyCode = ExchangeBoundOkBody.OK; + replyText = null; + } + else + { + replyCode = ExchangeBoundOkBody.QUEUE_NOT_BOUND; + replyText = "Queue '" + + queueName + + "' not bound to exchange '" + + exchangeName + + "'"; + } + } + } + } + else if (queueName != null) + { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; + replyText = "Queue '" + queueName + "' not found"; + } + else + { + String bindingKey = routingKey == null ? null : routingKey.asString(); + if (exchange.isBound(bindingKey, queue)) + { + + replyCode = ExchangeBoundOkBody.OK; + replyText = null; + } + else + { + replyCode = ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK; + replyText = "Queue '" + queueName + "' not bound with routing key '" + + routingKey + "' to exchange '" + exchangeName + "'"; + + } + } + } + else + { + if (exchange.isBound(routingKey == null ? "" : routingKey.asString())) + { + + replyCode = ExchangeBoundOkBody.OK; + replyText = null; + } + else + { + replyCode = ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK; + replyText = + "No queue bound with routing key '" + routingKey + "' to exchange '" + exchangeName + "'"; + } + } + } + + ExchangeBoundOkBody exchangeBoundOkBody = + methodRegistry.createExchangeBoundOkBody(replyCode, AMQShortString.validValueOf(replyText)); + + _connection.writeFrame(exchangeBoundOkBody.generateFrame(getChannelId())); + + } + + @Override + public void receiveExchangeDeclare(final AMQShortString exchangeName, + final AMQShortString type, + final boolean passive, + final boolean durable, + final boolean autoDelete, + final boolean internal, + final boolean nowait, + final FieldTable arguments) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ExchangeDeclare[" +" exchange: " + exchangeName + + " type: " + type + + " passive: " + passive + + " durable: " + durable + + " autoDelete: " + autoDelete + + " internal: " + internal + " nowait: " + nowait + " arguments: " + arguments + " ]"); + } + + ExchangeImpl exchange; + VirtualHostImpl<?, ?, ?> virtualHost = _connection.getVirtualHost(); + if (isDefaultExchange(exchangeName)) + { + if (!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type)) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: " + + " of type " + + ExchangeDefaults.DIRECT_EXCHANGE_CLASS + + " to " + type + ".", getChannelId()); + } + else if (!nowait) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); + sync(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + + } + else + { + if (passive) + { + exchange = virtualHost.getExchange(exchangeName.toString()); + if (exchange == null) + { + closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName); + } + else if (!(type == null || type.length() == 0) && !exchange.getType().equals(type.asString())) + { + + _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + + exchangeName + + " of type " + + exchange.getType() + + " to " + + type + + ".", getChannelId()); + } + else if (!nowait) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); + sync(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + + } + else + { + try + { + String name = exchangeName == null ? null : exchangeName.intern().toString(); + String typeString = type == null ? null : type.intern().toString(); + + Map<String, Object> attributes = new HashMap<String, Object>(); + if (arguments != null) + { + attributes.putAll(FieldTable.convertToMap(arguments)); + } + attributes.put(Exchange.ID, null); + attributes.put(Exchange.NAME, name); + attributes.put(Exchange.TYPE, typeString); + attributes.put(Exchange.DURABLE, durable); + attributes.put(Exchange.LIFETIME_POLICY, + autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + if (!attributes.containsKey(Exchange.ALTERNATE_EXCHANGE)) + { + attributes.put(Exchange.ALTERNATE_EXCHANGE, null); + } + exchange = virtualHost.createExchange(attributes); + + if (!nowait) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); + sync(); + _connection.writeFrame(responseBody.generateFrame( + getChannelId())); + } + + } + catch (ReservedExchangeNameException e) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, + "Attempt to declare exchange: " + exchangeName + + " which begins with reserved prefix.", getChannelId()); + + + } + catch (ExchangeExistsException e) + { + exchange = e.getExistingExchange(); + if (!new AMQShortString(exchange.getType()).equals(type)) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + + exchange.getType() + + " to " + type + ".", getChannelId()); + + } + } + catch (NoFactoryForTypeException e) + { + _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unknown exchange type '" + + e.getType() + + "' for exchange '" + + exchangeName + + "'", getChannelId()); + + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + + } + catch (UnknownConfiguredObjectException e) + { + // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur + final String message = "Unknown alternate exchange " + + (e.getName() != null + ? "name: \"" + e.getName() + "\"" + : "id: " + e.getId()); + _connection.closeConnection(AMQConstant.NOT_FOUND, message, getChannelId()); + + } + catch (IllegalArgumentException e) + { + _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Error creating exchange '" + + exchangeName + + "': " + + e.getMessage(), getChannelId()); + + } + } + } + + } + + @Override + public void receiveExchangeDelete(final AMQShortString exchangeStr, final boolean ifUnused, final boolean nowait) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ExchangeDelete[" +" exchange: " + exchangeStr + " ifUnused: " + ifUnused + " nowait: " + nowait + " ]"); + } + + + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + sync(); + try + { + + if (isDefaultExchange(exchangeStr)) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, + "Default Exchange cannot be deleted", getChannelId()); + + } + + else + { + final String exchangeName = exchangeStr.toString(); + + final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); + if (exchange == null) + { + closeChannel(AMQConstant.NOT_FOUND, "No such exchange: " + exchangeStr); + } + else + { + virtualHost.removeExchange(exchange, !ifUnused); + + ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody(); + + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + } + } + catch (ExchangeIsAlternateException e) + { + closeChannel(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange"); + } + catch (RequiredExchangeException e) + { + closeChannel(AMQConstant.NOT_ALLOWED, + "Exchange '" + exchangeStr + "' cannot be deleted"); + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + } + } + + @Override + public void receiveQueueBind(final AMQShortString queueName, + final AMQShortString exchange, + AMQShortString routingKey, + final boolean nowait, + final FieldTable argumentsTable) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] QueueBind[" +" queue: " + queueName + + " exchange: " + exchange + + " bindingKey: " + routingKey + + " nowait: " + nowait + " arguments: " + argumentsTable + " ]"); + } + + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + AMQQueue<?> queue; + if (queueName == null) + { + + queue = getDefaultQueue(); + + if (queue != null) + { + if (routingKey == null) + { + routingKey = AMQShortString.valueOf(queue.getName()); + } + else + { + routingKey = routingKey.intern(); + } + } + } + else + { + queue = virtualHost.getQueue(queueName.toString()); + routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey.intern(); + } + + if (queue == null) + { + String message = queueName == null + ? "No default queue defined on channel and queue was null" + : "Queue " + queueName + " does not exist."; + closeChannel(AMQConstant.NOT_FOUND, message); + } + else if (isDefaultExchange(exchange)) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, + "Cannot bind the queue " + queueName + " to the default exchange", getChannelId()); + + } + else + { + + final String exchangeName = exchange.toString(); + + final ExchangeImpl exch = virtualHost.getExchange(exchangeName); + if (exch == null) + { + closeChannel(AMQConstant.NOT_FOUND, + "Exchange " + exchangeName + " does not exist."); + } + else + { + + try + { + + Map<String, Object> arguments = FieldTable.convertToMap(argumentsTable); + String bindingKey = String.valueOf(routingKey); + + if (!exch.isBound(bindingKey, arguments, queue)) + { + + if (!exch.addBinding(bindingKey, queue, arguments) + && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals( + exch.getType())) + { + exch.replaceBinding(bindingKey, queue, arguments); + } + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Binding queue " + + queue + + " to exchange " + + exch + + " with routing key " + + routingKey); + } + if (!nowait) + { + sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + } + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + } + } + } + } + + @Override + public void receiveQueueDeclare(final AMQShortString queueStr, + final boolean passive, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, + final boolean nowait, + final FieldTable arguments) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] QueueDeclare[" +" queue: " + queueStr + + " passive: " + passive + + " durable: " + durable + + " exclusive: " + exclusive + + " autoDelete: " + autoDelete + " nowait: " + nowait + " arguments: " + arguments + " ]"); + } + + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + + final AMQShortString queueName; + + // if we aren't given a queue name, we create one which we return to the client + if ((queueStr == null) || (queueStr.length() == 0)) + { + queueName = new AMQShortString("tmp_" + UUID.randomUUID()); + } + else + { + queueName = queueStr.intern(); + } + + AMQQueue queue; + + //TODO: do we need to check that the queue already exists with exactly the same "configuration"? + + + if (passive) + { + queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + closeChannel(AMQConstant.NOT_FOUND, + "Queue: " + + queueName + + " not found on VirtualHost(" + + virtualHost + + ")."); + } + else + { + if (!queue.verifySessionAccess(this)) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue " + + queue.getName() + + " is exclusive, but not created on this Connection.", getChannelId()); + } + else + { + //set this as the default queue on the channel: + setDefaultQueue(queue); + if (!nowait) + { + sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeclareOkBody responseBody = + methodRegistry.createQueueDeclareOkBody(queueName, + queue.getQueueDepthMessages(), + queue.getConsumerCount()); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + _logger.info("Queue " + queueName + " declared successfully"); + } + } + } + } + else + { + + try + { + Map<String, Object> attributes = + QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(arguments)); + final String queueNameString = AMQShortString.toString(queueName); + attributes.put(Queue.NAME, queueNameString); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.DURABLE, durable); + + LifetimePolicy lifetimePolicy; + ExclusivityPolicy exclusivityPolicy; + + if (exclusive) + { + lifetimePolicy = autoDelete + ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; + exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; + } + else + { + lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; + exclusivityPolicy = ExclusivityPolicy.NONE; + } + + attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); + attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); + + + queue = virtualHost.createQueue(attributes); + + setDefaultQueue(queue); + + if (!nowait) + { + sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeclareOkBody responseBody = + methodRegistry.createQueueDeclareOkBody(queueName, + queue.getQueueDepthMessages(), + queue.getConsumerCount()); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + _logger.info("Queue " + queueName + " declared successfully"); + } + } + catch (QueueExistsException qe) + { + + queue = qe.getExistingQueue(); + + if (!queue.verifySessionAccess(this)) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue " + + queue.getName() + + " is exclusive, but not created on this Connection.", getChannelId()); + + } + else if (queue.isExclusive() != exclusive) + { + + closeChannel(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + + queue.getName() + + "' with different exclusivity (was: " + + queue.isExclusive() + + " requested " + + exclusive + + ")"); + } + else if ((autoDelete + && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) + || (!autoDelete && queue.getLifetimePolicy() != ((exclusive + && !durable) + ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + : LifetimePolicy.PERMANENT))) + { + closeChannel(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + + queue.getName() + + "' with different lifetime policy (was: " + + queue.getLifetimePolicy() + + " requested autodelete: " + + autoDelete + + ")"); + } + else if (queue.isDurable() != durable) + { + closeChannel(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + + queue.getName() + + "' with different durability (was: " + + queue.isDurable() + + " requested " + + durable + + ")"); + } + else + { + setDefaultQueue(queue); + if (!nowait) + { + sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeclareOkBody responseBody = + methodRegistry.createQueueDeclareOkBody(queueName, + queue.getQueueDepthMessages(), + queue.getConsumerCount()); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + _logger.info("Queue " + queueName + " declared successfully"); + } + } + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + } + + } + } + + @Override + public void receiveQueueDelete(final AMQShortString queueName, + final boolean ifUnused, + final boolean ifEmpty, + final boolean nowait) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] QueueDelete[" +" queue: " + queueName + " ifUnused: " + ifUnused + " ifEmpty: " + ifEmpty + " nowait: " + nowait + " ]"); + } + + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + sync(); + AMQQueue queue; + if (queueName == null) + { + + //get the default queue on the channel: + queue = getDefaultQueue(); + } + else + { + queue = virtualHost.getQueue(queueName.toString()); + } + + if (queue == null) + { + closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); + + } + else + { + if (ifEmpty && !queue.isEmpty()) + { + closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is not empty."); + } + else if (ifUnused && !queue.isUnused()) + { + // TODO - Error code + closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is still used."); + } + else + { + if (!queue.verifySessionAccess(this)) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue " + + queue.getName() + + " is exclusive, but not created on this Connection.", getChannelId()); + + } + else + { + try + { + int purged = virtualHost.removeQueue(queue); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + + } + } + } + } + } + + @Override + public void receiveQueuePurge(final AMQShortString queueName, final boolean nowait) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] QueuePurge[" +" queue: " + queueName + " nowait: " + nowait + " ]"); + } + + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + AMQQueue queue = null; + if (queueName == null && (queue = getDefaultQueue()) == null) + { + + _connection.closeConnection(AMQConstant.NOT_ALLOWED, "No queue specified.", getChannelId()); + } + else if ((queueName != null) && (queue = virtualHost.getQueue(queueName.toString())) == null) + { + closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); + } + else if (!queue.verifySessionAccess(this)) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, + "Queue is exclusive, but not created on this Connection.", getChannelId()); + } + else + { + try + { + long purged = queue.clearQueue(); + if (!nowait) + { + sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + } + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + + } + + } + } + + @Override + public void receiveQueueUnbind(final AMQShortString queueName, + final AMQShortString exchange, + final AMQShortString routingKey, + final FieldTable arguments) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] QueueUnbind[" +" queue: " + queueName + + " exchange: " + exchange + + " bindingKey: " + routingKey + + " arguments: " + arguments + " ]"); + } + + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + + + final boolean useDefaultQueue = queueName == null; + final AMQQueue queue = useDefaultQueue + ? getDefaultQueue() + : virtualHost.getQueue(queueName.toString()); + + + if (queue == null) + { + String message = useDefaultQueue + ? "No default queue defined on channel and queue was null" + : "Queue " + queueName + " does not exist."; + closeChannel(AMQConstant.NOT_FOUND, message); + } + else if (isDefaultExchange(exchange)) + { + _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue " + + queue.getName() + + " from the default exchange", getChannelId()); + + } + else + { + + final ExchangeImpl exch = virtualHost.getExchange(exchange.toString()); + + if (exch == null) + { + closeChannel(AMQConstant.NOT_FOUND, "Exchange " + exchange + " does not exist."); + } + else if (!exch.hasBinding(String.valueOf(routingKey), queue)) + { + closeChannel(AMQConstant.NOT_FOUND, "No such binding"); + } + else + { + try + { + exch.deleteBinding(String.valueOf(routingKey), queue); + + final AMQMethodBody responseBody = _connection.getMethodRegistry().createQueueUnbindOkBody(); + sync(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + + } + } + + } + } + + @Override + public void receiveTxSelect() + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] TxSelect"); + } + + setLocalTransactional(); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody(); + _connection.writeFrame(responseBody.generateFrame(_channelId)); + + } + + @Override + public void receiveTxCommit() + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] TxCommit"); + } + + + if (!isTransactional()) + { + closeChannel(AMQConstant.COMMAND_INVALID, + "Fatal error: commit called on non-transactional channel"); + } + commit(new Runnable() + { + + @Override + public void run() + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody(); + _connection.writeFrame(responseBody.generateFrame(_channelId)); + } + }, true); + + } + + @Override + public void receiveTxRollback() + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] TxRollback"); + } + + if (!isTransactional()) + { + closeChannel(AMQConstant.COMMAND_INVALID, + "Fatal error: rollback called on non-transactional channel"); + } + + final MethodRegistry methodRegistry = _connection.getMethodRegistry(); + final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody(); +
[... 31 lines stripped ...] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
