Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 Fri Oct 17 14:23:19 2014
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import static org.apache.qpid.transport.util.Functions.hex;
+
 import java.nio.ByteBuffer;
 import java.security.AccessControlException;
 import java.security.PrivilegedAction;
@@ -47,14 +49,8 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.TransactionTimeoutHelper;
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
@@ -62,6 +58,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -69,6 +66,7 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.filter.SimpleFilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -85,13 +83,18 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.UnknownConfiguredObjectException;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
@@ -103,12 +106,18 @@ import org.apache.qpid.server.txn.LocalT
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.TransportException;
 
-public class AMQChannel<T extends AMQProtocolSession<T>>
-        implements AMQSessionModel<AMQChannel<T>,T>,
-                   AsyncAutoCommitTransaction.FutureRecorder
+public class AMQChannel
+        implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
+                   AsyncAutoCommitTransaction.FutureRecorder,
+                   ServerChannelMethodProcessor
 {
     public static final int DEFAULT_PREFETCH = 4096;
 
@@ -159,7 +168,7 @@ public class AMQChannel<T extends AMQPro
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
 
-    private final T _session;
+    private final AMQProtocolEngine _connection;
     private AtomicBoolean _closing = new AtomicBoolean(false);
 
     private final Set<Object> _blockingEntities = 
Collections.synchronizedSet(new HashSet<Object>());
@@ -181,8 +190,8 @@ public class AMQChannel<T extends AMQPro
     private final TransactionTimeoutHelper _transactionTimeoutHelper;
     private final UUID _id = UUID.randomUUID();
 
-    private final List<Action<? super AMQChannel<T>>> _taskList =
-            new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
+    private final List<Action<? super AMQChannel>> _taskList =
+            new CopyOnWriteArrayList<Action<? super AMQChannel>>();
 
 
     private final CapacityCheckAction _capacityCheckAction = new 
CapacityCheckAction();
@@ -194,15 +203,14 @@ public class AMQChannel<T extends AMQPro
     private Session<?> _modelObject;
 
 
-    public AMQChannel(T session, int channelId, final MessageStore 
messageStore)
-            throws AMQException
+    public AMQChannel(AMQProtocolEngine connection, int channelId, final 
MessageStore messageStore)
     {
-        _session = session;
+        _connection = connection;
         _channelId = channelId;
 
-        _subject = new Subject(false, 
session.getAuthorizedSubject().getPrincipals(),
-                               
session.getAuthorizedSubject().getPublicCredentials(),
-                               
session.getAuthorizedSubject().getPrivateCredentials());
+        _subject = new Subject(false, 
connection.getAuthorizedSubject().getPrincipals(),
+                               
connection.getAuthorizedSubject().getPublicCredentials(),
+                               
connection.getAuthorizedSubject().getPrivateCredentials());
         _subject.getPrincipals().add(new SessionPrincipal(this));
         _logSubject = new ChannelLogSubject(this);
 
@@ -211,7 +219,7 @@ public class AMQChannel<T extends AMQPro
         // by default the session is non-transactional
         _transaction = new AsyncAutoCommitTransaction(_messageStore, this);
 
-        _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+        _clientDeliveryMethod = connection.createDeliveryMethod(_channelId);
 
         _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, 
new CloseAction()
         {
@@ -242,6 +250,52 @@ public class AMQChannel<T extends AMQPro
 
     }
 
+    private boolean performGet(final AMQQueue queue,
+                               final boolean acks)
+            throws MessageSource.ExistingConsumerPreventsExclusive,
+                   MessageSource.ExistingExclusiveConsumer, 
MessageSource.ConsumerAccessRefused
+    {
+
+        final FlowCreditManager singleMessageCredit = new 
MessageOnlyCreditManager(1L);
+
+        final GetDeliveryMethod getDeliveryMethod =
+                new GetDeliveryMethod(singleMessageCredit, queue);
+        final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+        {
+
+            public void recordMessageDelivery(final ConsumerImpl sub,
+                                              final MessageInstance entry,
+                                              final long deliveryTag)
+            {
+                addUnacknowledgedMessage(entry, deliveryTag, null);
+            }
+        };
+
+        ConsumerTarget_0_8 target;
+        EnumSet<ConsumerImpl.Option> options = 
EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
+                                                          
ConsumerImpl.Option.SEES_REQUEUES);
+        if (acks)
+        {
+
+            target = ConsumerTarget_0_8.createAckTarget(this,
+                                                        
AMQShortString.EMPTY_STRING, null,
+                                                        singleMessageCredit, 
getDeliveryMethod, getRecordMethod);
+        }
+        else
+        {
+            target = ConsumerTarget_0_8.createGetNoAckTarget(this,
+                                                             
AMQShortString.EMPTY_STRING, null,
+                                                             
singleMessageCredit, getDeliveryMethod, getRecordMethod);
+        }
+
+        ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, 
"", options);
+        sub.flush();
+        sub.close();
+        return getDeliveryMethod.hasDeliveredMessage();
+
+
+    }
+
     /** Sets this channel to be part of a local transaction */
     public void setLocalTransactional()
     {
@@ -250,7 +304,7 @@ public class AMQChannel<T extends AMQPro
             @Override
             public long getActivityTime()
             {
-                return _session.getLastReceivedTime();
+                return _connection.getLastReceivedTime();
             }
         });
         _txnStarts.incrementAndGet();
@@ -324,27 +378,18 @@ public class AMQChannel<T extends AMQPro
     }
 
     public void publishContentHeader(ContentHeaderBody contentHeaderBody)
-            throws AMQException
     {
-        if (_currentMessage == null)
+        if (_logger.isDebugEnabled())
         {
-            throw new AMQException("Received content header without previously 
receiving a BasicPublish frame");
+            _logger.debug("Content header received on channel " + _channelId);
         }
-        else
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Content header received on channel " + 
_channelId);
-            }
 
-            _currentMessage.setContentHeaderBody(contentHeaderBody);
+        _currentMessage.setContentHeaderBody(contentHeaderBody);
 
-            deliverCurrentMessageIfComplete();
-        }
+        deliverCurrentMessageIfComplete();
     }
 
     private void deliverCurrentMessageIfComplete()
-            throws AMQException
     {
         // check and deliver if header says body length is zero
         if (_currentMessage.allContentReceived())
@@ -355,7 +400,7 @@ public class AMQChannel<T extends AMQPro
                 final MessageMetaData messageMetaData =
                         new 
MessageMetaData(_currentMessage.getMessagePublishInfo(),
                                             _currentMessage.getContentHeader(),
-                                            
getProtocolSession().getLastReceivedTime());
+                                            
getConnection().getLastReceivedTime());
 
                 final StoredMessage<MessageMetaData> handle = 
_messageStore.addMessage(messageMetaData);
                 final AMQMessage amqMessage = 
createAMQMessage(_currentMessage, handle);
@@ -430,7 +475,7 @@ public class AMQChannel<T extends AMQPro
             {
                 long bodySize = _currentMessage.getSize();
                 long timestamp = 
_currentMessage.getContentHeader().getProperties().getTimestamp();
-                _session.registerMessageReceived(bodySize, timestamp);
+                _connection.registerMessageReceived(bodySize, timestamp);
                 _currentMessage = null;
             }
         }
@@ -443,13 +488,13 @@ public class AMQChannel<T extends AMQPro
      * Pre-requisite: the current message is judged to have no destination 
queues.
      *
      * @throws AMQConnectionException if the message is mandatory 
close-on-no-route
-     * @see AMQProtocolSession#isCloseWhenNoRoute()
+     * @see AMQProtocolEngine#isCloseWhenNoRoute()
      */
-    private void handleUnroutableMessage(AMQMessage message) throws 
AMQConnectionException
+    private void handleUnroutableMessage(AMQMessage message)
     {
         boolean mandatory = message.isMandatory();
         String description = currentMessageDescription();
-        boolean closeOnNoRoute = _session.isCloseWhenNoRoute();
+        boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
 
         if(_logger.isDebugEnabled())
         {
@@ -458,29 +503,29 @@ public class AMQChannel<T extends AMQPro
                     description, mandatory, isTransactional(), 
closeOnNoRoute));
         }
 
-        if (mandatory && isTransactional() && _session.isCloseWhenNoRoute())
-        {
-            throw new AMQConnectionException(
-                    AMQConstant.NO_ROUTE,
-                    "No route for message " + currentMessageDescription(),
-                    0, 0, // default class and method ids
-                    
getProtocolSession().getProtocolVersion().getMajorVersion(),
-                    
getProtocolSession().getProtocolVersion().getMinorVersion(),
-                    (Throwable) null);
-        }
-
-        if (mandatory || message.isImmediate())
+        if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
         {
-            _transaction.addPostTransactionAction(new 
WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + 
currentMessageDescription(), message));
+            _connection.closeConnection(AMQConstant.NO_ROUTE,
+                    "No route for message " + currentMessageDescription(), 
_channelId);
         }
         else
         {
-            AMQShortString exchangeName = _currentMessage.getExchangeName();
-            AMQShortString routingKey = 
_currentMessage.getMessagePublishInfo().getRoutingKey();
+            if (mandatory || message.isImmediate())
+            {
+                _transaction.addPostTransactionAction(new 
WriteReturnAction(AMQConstant.NO_ROUTE,
+                                                                            
"No Route for message "
+                                                                            + 
currentMessageDescription(),
+                                                                            
message));
+            }
+            else
+            {
+                AMQShortString exchangeName = 
_currentMessage.getExchangeName();
+                AMQShortString routingKey = 
_currentMessage.getMessagePublishInfo().getRoutingKey();
 
-            getVirtualHost().getEventLogger().message(
-                    ExchangeMessages.DISCARDMSG(exchangeName == null ? null : 
exchangeName.asString(),
-                                                routingKey == null ? null : 
routingKey.asString()));
+                getVirtualHost().getEventLogger().message(
+                        ExchangeMessages.DISCARDMSG(exchangeName == null ? 
null : exchangeName.asString(),
+                                                    routingKey == null ? null 
: routingKey.asString()));
+            }
         }
     }
 
@@ -499,13 +544,8 @@ public class AMQChannel<T extends AMQPro
                         : 
_currentMessage.getMessagePublishInfo().getRoutingKey().toString());
     }
 
-    public void publishContentBody(ContentBody contentBody) throws AMQException
+    public void publishContentBody(ContentBody contentBody)
     {
-        if (_currentMessage == null)
-        {
-            throw new AMQException("Received content body without previously 
receiving a Content Header");
-        }
-
         if (_logger.isDebugEnabled())
         {
             _logger.debug(debugIdentity() + " content body received on channel 
" + _channelId);
@@ -517,13 +557,6 @@ public class AMQChannel<T extends AMQPro
 
             deliverCurrentMessageIfComplete();
         }
-        catch (AMQException e)
-        {
-            // we want to make sure we don't keep a reference to the message 
in the
-            // event of an error
-            _currentMessage = null;
-            throw e;
-        }
         catch (RuntimeException e)
         {
             // we want to make sure we don't keep a reference to the message 
in the
@@ -566,9 +599,10 @@ public class AMQChannel<T extends AMQPro
      */
     public AMQShortString consumeFromSource(AMQShortString tag, 
Collection<MessageSource> sources, boolean acks,
                                             FieldTable filters, boolean 
exclusive, boolean noLocal)
-            throws AMQException, 
MessageSource.ExistingConsumerPreventsExclusive,
-                   MessageSource.ExistingExclusiveConsumer, 
AMQInvalidArgumentException,
-                   MessageSource.ConsumerAccessRefused
+            throws MessageSource.ExistingConsumerPreventsExclusive,
+                   MessageSource.ExistingExclusiveConsumer,
+                   AMQInvalidArgumentException,
+                   MessageSource.ConsumerAccessRefused, 
ConsumerTagInUseException
     {
         if (tag == null)
         {
@@ -577,7 +611,7 @@ public class AMQChannel<T extends AMQPro
 
         if (_tag2SubscriptionTargetMap.containsKey(tag))
         {
-            throw new AMQException("Consumer already exists with same tag: " + 
tag);
+            throw new ConsumerTagInUseException("Consumer already exists with 
same tag: " + tag);
         }
 
         ConsumerTarget_0_8 target;
@@ -649,27 +683,11 @@ public class AMQChannel<T extends AMQPro
                 }
             }
         }
-        catch (AccessControlException e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (MessageSource.ExistingExclusiveConsumer e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (MessageSource.ExistingConsumerPreventsExclusive e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (AMQInvalidArgumentException e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (MessageSource.ConsumerAccessRefused e)
+        catch (AccessControlException
+                | MessageSource.ExistingExclusiveConsumer
+                | MessageSource.ExistingConsumerPreventsExclusive
+                | AMQInvalidArgumentException
+                | MessageSource.ConsumerAccessRefused e)
         {
             _tag2SubscriptionTargetMap.remove(tag);
             throw e;
@@ -730,7 +748,7 @@ public class AMQChannel<T extends AMQPro
 
         unsubscribeAllConsumers();
 
-        for (Action<? super AMQChannel<T>> task : _taskList)
+        for (Action<? super AMQChannel> task : _taskList)
         {
             task.performAction(this);
         }
@@ -897,9 +915,8 @@ public class AMQChannel<T extends AMQPro
     /**
      * Called to resend all outstanding unacknowledged messages to this same 
channel.
      *
-     * @throws AMQException When something goes wrong.
      */
-    public void resend() throws AMQException
+    public void resend()
     {
 
 
@@ -985,9 +1002,8 @@ public class AMQChannel<T extends AMQPro
      * @param multiple    if true will acknowledge all messages up to an 
including the delivery tag. if false only
      *                    acknowledges the single message specified by the 
delivery tag
      *
-     * @throws AMQException if the delivery tag is unknown (e.g. not 
outstanding) on this channel
      */
-    public void acknowledgeMessage(long deliveryTag, boolean multiple) throws 
AMQException
+    public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         Collection<MessageInstance> ackedMessages = 
getAckedMessages(deliveryTag, multiple);
         _transaction.dequeue(ackedMessages, new 
MessageAcknowledgeAction(ackedMessages));
@@ -1084,22 +1100,13 @@ public class AMQChannel<T extends AMQPro
 
     public boolean isSuspended()
     {
-        return _suspended.get()  || _closing.get() || _session.isClosing();
+        return _suspended.get()  || _closing.get() || _connection.isClosing();
     }
 
-    public void commit() throws AMQException
-    {
-        commit(null, false);
-    }
 
-
-    public void commit(final Runnable immediateAction, boolean async) throws 
AMQException
+    public void commit(final Runnable immediateAction, boolean async)
     {
 
-        if (!isTransactional())
-        {
-            throw new AMQException("Fatal error: commit called on 
non-transactional channel");
-        }
 
         if(async && _transaction instanceof LocalTransaction)
         {
@@ -1132,17 +1139,8 @@ public class AMQChannel<T extends AMQPro
         }
     }
 
-    public void rollback() throws AMQException
-    {
-        rollback(NULL_TASK);
-    }
-
-    public void rollback(Runnable postRollbackTask) throws AMQException
+    public void rollback(Runnable postRollbackTask)
     {
-        if (!isTransactional())
-        {
-            throw new AMQException("Fatal error: commit called on 
non-transactional channel");
-        }
 
         // stop all subscriptions
         _rollingBack = true;
@@ -1200,7 +1198,7 @@ public class AMQChannel<T extends AMQPro
 
     public String toString()
     {
-        return "("+ _suspended.get() + ", " + _closing.get() + ", " + 
_session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]";
+        return "("+ _suspended.get() + ", " + _closing.get() + ", " + 
_connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]";
     }
 
     public void setDefaultQueue(AMQQueue queue)
@@ -1219,9 +1217,9 @@ public class AMQChannel<T extends AMQPro
         return _closing.get();
     }
 
-    public AMQProtocolSession getProtocolSession()
+    public AMQProtocolEngine getConnection()
     {
-        return _session;
+        return _connection;
     }
 
     public FlowCreditManager getCreditManager()
@@ -1261,13 +1259,9 @@ public class AMQChannel<T extends AMQPro
 
 
     private AMQMessage createAMQMessage(IncomingMessage incomingMessage, 
StoredMessage<MessageMetaData> handle)
-            throws AMQException
     {
 
-        AMQMessage message = new AMQMessage(handle, _session.getReference());
-
-        final BasicContentHeaderProperties properties =
-                  incomingMessage.getContentHeader().getProperties();
+        AMQMessage message = new AMQMessage(handle, 
_connection.getReference());
 
         return message;
     }
@@ -1275,7 +1269,7 @@ public class AMQChannel<T extends AMQPro
     private boolean checkMessageUserId(ContentHeaderBody header)
     {
         AMQShortString userID = header.getProperties().getUserId();
-        return (!_messageAuthorizationRequired || 
_session.getAuthorizedPrincipal().getName().equals(userID == null? "" : 
userID.toString()));
+        return (!_messageAuthorizationRequired || 
_connection.getAuthorizedPrincipal().getName().equals(userID == null? "" : 
userID.toString()));
 
     }
 
@@ -1286,14 +1280,14 @@ public class AMQChannel<T extends AMQPro
     }
 
     @Override
-    public T getConnectionModel()
+    public AMQProtocolEngine getConnectionModel()
     {
-        return _session;
+        return _connection;
     }
 
     public String getClientID()
     {
-        return String.valueOf(_session.getContextKey());
+        return String.valueOf(_connection.getContextKey());
     }
 
     public LogSubject getLogSubject()
@@ -1308,13 +1302,13 @@ public class AMQChannel<T extends AMQPro
     }
 
     @Override
-    public void addDeleteTask(final Action<? super AMQChannel<T>> task)
+    public void addDeleteTask(final Action<? super AMQChannel> task)
     {
         _taskList.add(task);
     }
 
     @Override
-    public void removeDeleteTask(final Action<? super AMQChannel<T>> task)
+    public void removeDeleteTask(final Action<? super AMQChannel> task)
     {
         _taskList.remove(task);
     }
@@ -1324,6 +1318,46 @@ public class AMQChannel<T extends AMQPro
         return _subject;
     }
 
+    public boolean hasCurrentMessage()
+    {
+        return _currentMessage != null;
+    }
+
+    private class GetDeliveryMethod implements ClientDeliveryMethod
+    {
+
+        private final FlowCreditManager _singleMessageCredit;
+        private final AMQQueue _queue;
+        private boolean _deliveredMessage;
+
+        public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+                                 final AMQQueue queue)
+        {
+            _singleMessageCredit = singleMessageCredit;
+            _queue = queue;
+        }
+
+        @Override
+        public long deliverToClient(final ConsumerImpl sub, final 
ServerMessage message,
+                                    final InstanceProperties props, final long 
deliveryTag)
+        {
+            _singleMessageCredit.useCreditForMessage(message.getSize());
+            long size = 
_connection.getProtocolOutputConverter().writeGetOk(message,
+                                                                            
props,
+                                                                            
AMQChannel.this.getChannelId(),
+                                                                            
deliveryTag,
+                                                                            
_queue.getQueueDepthMessages());
+
+            _deliveredMessage = true;
+            return size;
+        }
+
+        public boolean hasDeliveredMessage()
+        {
+            return _deliveredMessage;
+        }
+    }
+
 
     private class ImmediateAction implements Action<MessageInstance>
     {
@@ -1352,7 +1386,7 @@ public class AMQChannel<T extends AMQPro
                                     public void postCommit()
                                     {
                                         final ProtocolOutputConverter 
outputConverter =
-                                                    
_session.getProtocolOutputConverter();
+                                                    
_connection.getProtocolOutputConverter();
 
                                         
outputConverter.writeReturn(message.getMessagePublishInfo(),
                                                                     
message.getContentHeaderBody(),
@@ -1475,7 +1509,7 @@ public class AMQChannel<T extends AMQPro
         public void postCommit()
         {
             AMQMessage message = _reference.getMessage();
-            
_session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+            
_connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
                                                           
message.getContentHeaderBody(),
                                                           message,
                                                           _channelId,
@@ -1544,7 +1578,7 @@ public class AMQChannel<T extends AMQPro
     @Override
     public Object getConnectionReference()
     {
-        return getProtocolSession().getReference();
+        return getConnection().getReference();
     }
 
     public int getUnacknowledgedMessageCount()
@@ -1554,9 +1588,9 @@ public class AMQChannel<T extends AMQPro
 
     private void flow(boolean flow)
     {
-        MethodRegistry methodRegistry = _session.getMethodRegistry();
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
         AMQMethodBody responseBody = 
methodRegistry.createChannelFlowBody(flow);
-        _session.writeFrame(responseBody.generateFrame(_channelId));
+        _connection.writeFrame(responseBody.generateFrame(_channelId));
     }
 
     @Override
@@ -1567,7 +1601,7 @@ public class AMQChannel<T extends AMQPro
 
     public VirtualHostImpl getVirtualHost()
     {
-        return getProtocolSession().getVirtualHost();
+        return getConnection().getVirtualHost();
     }
 
     public void checkTransactionStatus(long openWarn, long openClose, long 
idleWarn, long idleClose)
@@ -1581,11 +1615,11 @@ public class AMQChannel<T extends AMQPro
      */
     private void closeConnection(String reason) throws AMQException
     {
-        Lock receivedLock = _session.getReceivedLock();
+        Lock receivedLock = _connection.getReceivedLock();
         receivedLock.lock();
         try
         {
-            _session.close(AMQConstant.RESOURCE_ERROR, reason);
+            _connection.close(AMQConstant.RESOURCE_ERROR, reason);
         }
         finally
         {
@@ -1593,7 +1627,7 @@ public class AMQChannel<T extends AMQPro
         }
     }
 
-    public void deadLetter(long deliveryTag) throws AMQException
+    public void deadLetter(long deliveryTag)
     {
         final UnacknowledgedMessageMap unackedMap = 
getUnacknowledgedMessageMap();
         final MessageInstance rejectedQueueEntry = 
unackedMap.remove(deliveryTag);
@@ -1816,4 +1850,1521 @@ public class AMQChannel<T extends AMQPro
             return 0L;
         }
     }
+
+    @Override
+    public void receiveAccessRequest(final AMQShortString realm,
+                                     final boolean exclusive,
+                                     final boolean passive,
+                                     final boolean active, final boolean 
write, final boolean read)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] AccessRequest[" +" realm: 
" + realm +
+                          " exclusive: " + exclusive +
+                          " passive: " + passive +
+                          " active: " + active +
+                          " write: " + write + " read: " + read + " ]");
+        }
+
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+        if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()))
+        {
+            _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+                                                    "AccessRequest not present 
in AMQP versions other than 0-8, 0-9",
+                                                    _channelId);
+        }
+        else
+        {
+            // We don't implement access control class, but to keep clients 
happy that expect it
+            // always use the "0" ticket.
+            AccessRequestOkBody response = 
methodRegistry.createAccessRequestOkBody(0);
+            sync();
+            _connection.writeFrame(response.generateFrame(_channelId));
+        }
+    }
+
+    @Override
+    public void receiveBasicAck(final long deliveryTag, final boolean multiple)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicAck[" +" deliveryTag: 
" + deliveryTag + " multiple: " + multiple + " ]");
+        }
+
+        acknowledgeMessage(deliveryTag, multiple);
+    }
+
+    @Override
+    public void receiveBasicCancel(final AMQShortString consumerTag, final 
boolean nowait)
+    {
+
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicCancel[" +" 
consumerTag: " + consumerTag + " noWait: " + nowait + " ]");
+        }
+
+        unsubscribeConsumer(consumerTag);
+        if (!nowait)
+        {
+            MethodRegistry methodRegistry = _connection.getMethodRegistry();
+            BasicCancelOkBody cancelOkBody = 
methodRegistry.createBasicCancelOkBody(consumerTag);
+            sync();
+            _connection.writeFrame(cancelOkBody.generateFrame(_channelId));
+        }
+    }
+
+    @Override
+    public void receiveBasicConsume(final AMQShortString queue,
+                                    final AMQShortString consumerTag,
+                                    final boolean noLocal,
+                                    final boolean noAck,
+                                    final boolean exclusive, final boolean 
nowait, final FieldTable arguments)
+    {
+
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicConsume[" +" queue: " 
+ queue +
+                          " consumerTag: " + consumerTag +
+                          " noLocal: " + noLocal +
+                          " noAck: " + noAck +
+                          " exclusive: " + exclusive + " nowait: " + nowait + 
" arguments: " + arguments + " ]");
+        }
+
+        AMQShortString consumerTag1 = consumerTag;
+        VirtualHostImpl<?, ?, ?> vHost = _connection.getVirtualHost();
+        sync();
+        String queueName = queue == null ? null : queue.asString();
+
+        MessageSource queue1 = queueName == null ? getDefaultQueue() : 
vHost.getQueue(queueName);
+        final Collection<MessageSource> sources = new HashSet<>();
+        if (queue1 != null)
+        {
+            sources.add(queue1);
+        }
+        else if (vHost.getContextValue(Boolean.class, 
"qpid.enableMultiQueueConsumers")
+                 && arguments != null
+                 && arguments.get("x-multiqueue") instanceof Collection)
+        {
+            for (Object object : (Collection<Object>) 
arguments.get("x-multiqueue"))
+            {
+                String sourceName = String.valueOf(object);
+                sourceName = sourceName.trim();
+                if (sourceName.length() != 0)
+                {
+                    MessageSource source = vHost.getMessageSource(sourceName);
+                    if (source == null)
+                    {
+                        sources.clear();
+                        break;
+                    }
+                    else
+                    {
+                        sources.add(source);
+                    }
+                }
+            }
+            queueName = arguments.get("x-multiqueue").toString();
+        }
+
+        if (sources.isEmpty())
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("No queue for '" + queueName + "'");
+            }
+            if (queueName != null)
+            {
+                closeChannel(AMQConstant.NOT_FOUND, "No such queue, '" + 
queueName + "'");
+            }
+            else
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                            "No queue name provided, no 
default queue defined.", _channelId);
+            }
+        }
+        else
+        {
+            try
+            {
+                consumerTag1 = consumeFromSource(consumerTag1,
+                                                 sources,
+                                                 !noAck,
+                                                 arguments,
+                                                 exclusive,
+                                                 noLocal);
+                if (!nowait)
+                {
+                    MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+                    AMQMethodBody responseBody = 
methodRegistry.createBasicConsumeOkBody(consumerTag1);
+                    
_connection.writeFrame(responseBody.generateFrame(_channelId));
+
+                }
+            }
+            catch (ConsumerTagInUseException cte)
+            {
+
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                            "Non-unique consumer tag, '" + 
consumerTag1
+                                            + "'", _channelId);
+            }
+            catch (AMQInvalidArgumentException ise)
+            {
+                _connection.closeConnection(AMQConstant.ARGUMENT_INVALID, 
ise.getMessage(), _channelId);
+
+
+            }
+            catch (AMQQueue.ExistingExclusiveConsumer e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+                                            "Cannot subscribe to queue "
+                                                                    + 
queue1.getName()
+                                                                    + " as it 
already has an existing exclusive consumer", _channelId);
+
+            }
+            catch (AMQQueue.ExistingConsumerPreventsExclusive e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+                                            "Cannot subscribe to queue "
+                                                                    + 
queue1.getName()
+                                                                    + " 
exclusively as it already has a consumer", _channelId);
+
+            }
+            catch (AccessControlException e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED, 
"Cannot subscribe to queue "
+                                                                    + 
queue1.getName()
+                                                                    + " 
permission denied", _channelId);
+
+            }
+            catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+                                            "Cannot subscribe to queue "
+                                                                    + 
queue1.getName()
+                                                                    + " as it 
already has an incompatible exclusivity policy", _channelId);
+
+            }
+
+        }
+    }
+
+    @Override
+    public void receiveBasicGet(final AMQShortString queueName, final boolean 
noAck)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicGet[" +" queue: " + 
queueName + " noAck: " + noAck + " ]");
+        }
+
+        VirtualHostImpl vHost = _connection.getVirtualHost();
+        sync();
+        AMQQueue queue = queueName == null ? getDefaultQueue() : 
vHost.getQueue(queueName.toString());
+        if (queue == null)
+        {
+            _logger.info("No queue for '" + queueName + "'");
+            if (queueName != null)
+            {
+                _connection.closeConnection(AMQConstant.NOT_FOUND, "No such 
queue, '" + queueName + "'", _channelId);
+
+            }
+            else
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                            "No queue name provided, no 
default queue defined.", _channelId);
+
+            }
+        }
+        else
+        {
+
+            try
+            {
+                if (!performGet(queue, !noAck))
+                {
+                    MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+
+                    BasicGetEmptyBody responseBody = 
methodRegistry.createBasicGetEmptyBody(null);
+
+                    
_connection.writeFrame(responseBody.generateFrame(_channelId));
+                }
+            }
+            catch (AccessControlException e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED, 
e.getMessage(), _channelId);
+            }
+            catch (MessageSource.ExistingExclusiveConsumer e)
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue 
has an exclusive consumer", _channelId);
+            }
+            catch (MessageSource.ExistingConsumerPreventsExclusive e)
+            {
+                _connection.closeConnection(AMQConstant.INTERNAL_ERROR,
+                                            "The GET request has been 
evaluated as an exclusive consumer, " +
+                                        "this is likely due to a programming 
error in the Qpid broker", _channelId);
+            }
+            catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                            "Queue has an incompatible 
exclusivity policy", _channelId);
+            }
+        }
+    }
+
+    @Override
+    public void receiveBasicPublish(final AMQShortString exchangeName,
+                                    final AMQShortString routingKey,
+                                    final boolean mandatory,
+                                    final boolean immediate)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicPublish[" +" 
exchange: " + exchangeName +
+                          " routingKey: " + routingKey +
+                          " mandatory: " + mandatory +
+                          " immediate: " + immediate + " ]");
+        }
+
+        VirtualHostImpl vHost = _connection.getVirtualHost();
+
+        MessageDestination destination;
+
+        if (isDefaultExchange(exchangeName))
+        {
+            destination = vHost.getDefaultDestination();
+        }
+        else
+        {
+            destination = vHost.getMessageDestination(exchangeName.toString());
+        }
+
+        // if the exchange does not exist we raise a channel exception
+        if (destination == null)
+        {
+            closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + 
exchangeName);
+        }
+        else
+        {
+
+            MessagePublishInfo info = new MessagePublishInfo(exchangeName,
+                                                             immediate,
+                                                             mandatory,
+                                                             routingKey);
+
+            try
+            {
+                setPublishFrame(info, destination);
+            }
+            catch (AccessControlException e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED, 
e.getMessage(), getChannelId());
+
+            }
+        }
+    }
+
+    @Override
+    public void receiveBasicQos(final long prefetchSize, final int 
prefetchCount, final boolean global)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicQos[" +" 
prefetchSize: " + prefetchSize + " prefetchCount: " + prefetchCount + " global: 
" + global + " ]");
+        }
+
+        sync();
+        setCredit(prefetchSize, prefetchCount);
+
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+        AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
+        _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+    }
+
+    @Override
+    public void receiveBasicRecover(final boolean requeue, final boolean sync)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicRecover[" + " 
requeue: " + requeue + " sync: " + sync + " ]");
+        }
+
+        resend();
+
+        if (sync)
+        {
+            MethodRegistry methodRegistry = _connection.getMethodRegistry();
+            AMQMethodBody recoverOk = 
methodRegistry.createBasicRecoverSyncOkBody();
+            sync();
+            _connection.writeFrame(recoverOk.generateFrame(getChannelId()));
+
+        }
+
+    }
+
+    @Override
+    public void receiveBasicReject(final long deliveryTag, final boolean 
requeue)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicReject[" +" 
deliveryTag: " + deliveryTag + " requeue: " + requeue + " ]");
+        }
+
+        MessageInstance message = 
getUnacknowledgedMessageMap().get(deliveryTag);
+
+        if (message == null)
+        {
+            _logger.warn("Dropping reject request as message is null for tag:" 
+ deliveryTag);
+        }
+        else
+        {
+
+            if (message.getMessage() == null)
+            {
+                _logger.warn("Message has already been purged, unable to 
Reject.");
+            }
+            else
+            {
+
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Rejecting: DT:" + deliveryTag
+                                                             + "-" + 
message.getMessage() +
+                                  ": Requeue:" + requeue
+                                  +
+                                  " on channel:" + debugIdentity());
+                }
+
+                if (requeue)
+                {
+                    //this requeue represents a message rejected from the 
pre-dispatch queue
+                    //therefore we need to amend the delivery counter.
+                    message.decrementDeliveryCount();
+
+                    requeue(deliveryTag);
+                }
+                else
+                {
+                    // Since the Java client abuses the reject flag for 
requeing after rollback, we won't set reject here
+                    // as it would prevent redelivery
+                    // message.reject();
+
+                    final boolean maxDeliveryCountEnabled = 
isMaxDeliveryCountEnabled(deliveryTag);
+                    _logger.debug("maxDeliveryCountEnabled: "
+                                  + maxDeliveryCountEnabled
+                                  + " deliveryTag "
+                                  + deliveryTag);
+                    if (maxDeliveryCountEnabled)
+                    {
+                        final boolean deliveredTooManyTimes = 
isDeliveredTooManyTimes(deliveryTag);
+                        _logger.debug("deliveredTooManyTimes: "
+                                      + deliveredTooManyTimes
+                                      + " deliveryTag "
+                                      + deliveryTag);
+                        if (deliveredTooManyTimes)
+                        {
+                            deadLetter(deliveryTag);
+                        }
+                        else
+                        {
+                            //this requeue represents a message rejected 
because of a recover/rollback that we
+                            //are not ready to DLQ. We rely on the reject 
command to resend from the unacked map
+                            //and therefore need to increment the delivery 
counter so we cancel out the effect
+                            //of the AMQChannel#resend() decrement.
+                            message.incrementDeliveryCount();
+                        }
+                    }
+                    else
+                    {
+                        requeue(deliveryTag);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void receiveChannelClose(final int replyCode,
+                                    final AMQShortString replyText,
+                                    final int classId,
+                                    final int methodId)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ChannelClose[" +" 
replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId 
+ " methodId: " + methodId + " ]");
+        }
+
+
+        sync();
+        _connection.closeChannel(this);
+
+        _connection.writeFrame(new AMQFrame(getChannelId(),
+                                            
_connection.getMethodRegistry().createChannelCloseOkBody()));
+    }
+
+    @Override
+    public void receiveChannelCloseOk()
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ChannelCloseOk");
+        }
+
+        _connection.closeChannelOk(getChannelId());
+    }
+
+    @Override
+    public void receiveMessageContent(final byte[] data)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] MessageContent[" +" data: 
" + hex(data,_connection.getBinaryDataLimit()) + " ] ");
+        }
+
+        if(hasCurrentMessage())
+        {
+            publishContentBody(new ContentBody(data));
+        }
+        else
+        {
+            _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+                                        "Attempt to send a content header 
without first sending a publish frame",
+                                        _channelId);
+        }
+    }
+
+    @Override
+    public void receiveMessageHeader(final BasicContentHeaderProperties 
properties, final long bodySize)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] MessageHeader[ properties: 
{" + properties + "} bodySize: " + bodySize + " ]");
+        }
+
+        if(hasCurrentMessage())
+        {
+            publishContentHeader(new ContentHeaderBody(properties, bodySize));
+        }
+        else
+        {
+            _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+                                        "Attempt to send a content header 
without first sending a publish frame",
+                                        _channelId);
+        }
+    }
+
+    @Override
+    public boolean ignoreAllButCloseOk()
+    {
+        return _connection.ignoreAllButCloseOk() || 
_connection.channelAwaitingClosure(_channelId);
+    }
+
+    @Override
+    public void receiveChannelFlow(final boolean active)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ChannelFlow[" +" active: " 
+ active + " ]");
+        }
+
+
+        sync();
+        setSuspended(!active);
+
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+        AMQMethodBody responseBody = 
methodRegistry.createChannelFlowOkBody(active);
+        _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+    }
+
+    @Override
+    public void receiveChannelFlowOk(final boolean active)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ChannelFlowOk[" +" active: 
" + active + " ]");
+        }
+
+        // TODO - should we do anything here?
+    }
+
+    @Override
+    public void receiveExchangeBound(final AMQShortString exchangeName,
+                                     final AMQShortString routingKey,
+                                     final AMQShortString queueName)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ExchangeBound[" +" 
exchange: " + exchangeName + " routingKey: " +
+                          routingKey + " queue: " + queueName + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+        sync();
+
+        int replyCode;
+        String replyText;
+
+        if (isDefaultExchange(exchangeName))
+        {
+            if (routingKey == null)
+            {
+                if (queueName == null)
+                {
+                    replyCode = virtualHost.getQueues().isEmpty()
+                            ? ExchangeBoundOkBody.NO_BINDINGS
+                            : ExchangeBoundOkBody.OK;
+                    replyText = null;
+
+                }
+                else
+                {
+                    AMQQueue queue = 
virtualHost.getQueue(queueName.toString());
+                    if (queue == null)
+                    {
+                        replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+                        replyText = "Queue '" + queueName + "' not found";
+                    }
+                    else
+                    {
+                        replyCode = ExchangeBoundOkBody.OK;
+                        replyText = null;
+                    }
+                }
+            }
+            else
+            {
+                if (queueName == null)
+                {
+                    replyCode = virtualHost.getQueue(routingKey.toString()) == 
null
+                            ? ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK
+                            : ExchangeBoundOkBody.OK;
+                    replyText = null;
+                }
+                else
+                {
+                    AMQQueue queue = 
virtualHost.getQueue(queueName.toString());
+                    if (queue == null)
+                    {
+
+                        replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+                        replyText = "Queue '" + queueName + "' not found";
+                    }
+                    else
+                    {
+                        replyCode = queueName.equals(routingKey)
+                                ? ExchangeBoundOkBody.OK
+                                : 
ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
+                        replyText = null;
+                    }
+                }
+            }
+        }
+        else
+        {
+            ExchangeImpl exchange = 
virtualHost.getExchange(exchangeName.toString());
+            if (exchange == null)
+            {
+
+                replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND;
+                replyText = "Exchange '" + exchangeName + "' not found";
+            }
+            else if (routingKey == null)
+            {
+                if (queueName == null)
+                {
+                    if (exchange.hasBindings())
+                    {
+                        replyCode = ExchangeBoundOkBody.OK;
+                        replyText = null;
+                    }
+                    else
+                    {
+                        replyCode = ExchangeBoundOkBody.NO_BINDINGS;
+                        replyText = null;
+                    }
+                }
+                else
+                {
+
+                    AMQQueue queue = 
virtualHost.getQueue(queueName.toString());
+                    if (queue == null)
+                    {
+                        replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+                        replyText = "Queue '" + queueName + "' not found";
+                    }
+                    else
+                    {
+                        if (exchange.isBound(queue))
+                        {
+                            replyCode = ExchangeBoundOkBody.OK;
+                            replyText = null;
+                        }
+                        else
+                        {
+                            replyCode = ExchangeBoundOkBody.QUEUE_NOT_BOUND;
+                            replyText = "Queue '"
+                                        + queueName
+                                        + "' not bound to exchange '"
+                                        + exchangeName
+                                        + "'";
+                        }
+                    }
+                }
+            }
+            else if (queueName != null)
+            {
+                AMQQueue queue = virtualHost.getQueue(queueName.toString());
+                if (queue == null)
+                {
+                    replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+                    replyText = "Queue '" + queueName + "' not found";
+                }
+                else
+                {
+                    String bindingKey = routingKey == null ? null : 
routingKey.asString();
+                    if (exchange.isBound(bindingKey, queue))
+                    {
+
+                        replyCode = ExchangeBoundOkBody.OK;
+                        replyText = null;
+                    }
+                    else
+                    {
+                        replyCode = 
ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
+                        replyText = "Queue '" + queueName + "' not bound with 
routing key '" +
+                                    routingKey + "' to exchange '" + 
exchangeName + "'";
+
+                    }
+                }
+            }
+            else
+            {
+                if (exchange.isBound(routingKey == null ? "" : 
routingKey.asString()))
+                {
+
+                    replyCode = ExchangeBoundOkBody.OK;
+                    replyText = null;
+                }
+                else
+                {
+                    replyCode = ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK;
+                    replyText =
+                            "No queue bound with routing key '" + routingKey + 
"' to exchange '" + exchangeName + "'";
+                }
+            }
+        }
+
+        ExchangeBoundOkBody exchangeBoundOkBody =
+                methodRegistry.createExchangeBoundOkBody(replyCode, 
AMQShortString.validValueOf(replyText));
+
+        
_connection.writeFrame(exchangeBoundOkBody.generateFrame(getChannelId()));
+
+    }
+
+    @Override
+    public void receiveExchangeDeclare(final AMQShortString exchangeName,
+                                       final AMQShortString type,
+                                       final boolean passive,
+                                       final boolean durable,
+                                       final boolean autoDelete,
+                                       final boolean internal,
+                                       final boolean nowait,
+                                       final FieldTable arguments)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ExchangeDeclare[" +" 
exchange: " + exchangeName +
+                          " type: " + type +
+                          " passive: " + passive +
+                          " durable: " + durable +
+                          " autoDelete: " + autoDelete +
+                          " internal: " + internal + " nowait: " + nowait + " 
arguments: " + arguments + " ]");
+        }
+
+        ExchangeImpl exchange;
+        VirtualHostImpl<?, ?, ?> virtualHost = _connection.getVirtualHost();
+        if (isDefaultExchange(exchangeName))
+        {
+            if (!new 
AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type))
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt 
to redeclare default exchange: "
+                                                                 + " of type "
+                                                                 + 
ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+                                                                 + " to " + 
type + ".", getChannelId());
+            }
+            else if (!nowait)
+            {
+                MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+                AMQMethodBody responseBody = 
methodRegistry.createExchangeDeclareOkBody();
+                sync();
+                
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+            }
+
+        }
+        else
+        {
+            if (passive)
+            {
+                exchange = virtualHost.getExchange(exchangeName.toString());
+                if (exchange == null)
+                {
+                    closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: " + 
exchangeName);
+                }
+                else if (!(type == null || type.length() == 0) && 
!exchange.getType().equals(type.asString()))
+                {
+
+                    _connection.closeConnection(AMQConstant.NOT_ALLOWED, 
"Attempt to redeclare exchange: "
+                                                                         +
+                                                                         
exchangeName
+                                                                         + " 
of type "
+                                                                         + 
exchange.getType()
+                                                                         + " 
to "
+                                                                         + type
+                                                                         + 
".", getChannelId());
+                }
+                else if (!nowait)
+                {
+                    MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+                    AMQMethodBody responseBody = 
methodRegistry.createExchangeDeclareOkBody();
+                    sync();
+                    
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+                }
+
+            }
+            else
+            {
+                try
+                {
+                    String name = exchangeName == null ? null : 
exchangeName.intern().toString();
+                    String typeString = type == null ? null : 
type.intern().toString();
+
+                    Map<String, Object> attributes = new HashMap<String, 
Object>();
+                    if (arguments != null)
+                    {
+                        attributes.putAll(FieldTable.convertToMap(arguments));
+                    }
+                    attributes.put(Exchange.ID, null);
+                    attributes.put(Exchange.NAME, name);
+                    attributes.put(Exchange.TYPE, typeString);
+                    attributes.put(Exchange.DURABLE, durable);
+                    attributes.put(Exchange.LIFETIME_POLICY,
+                                   autoDelete ? 
LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+                    if (!attributes.containsKey(Exchange.ALTERNATE_EXCHANGE))
+                    {
+                        attributes.put(Exchange.ALTERNATE_EXCHANGE, null);
+                    }
+                    exchange = virtualHost.createExchange(attributes);
+
+                    if (!nowait)
+                    {
+                        MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+                        AMQMethodBody responseBody = 
methodRegistry.createExchangeDeclareOkBody();
+                        sync();
+                        _connection.writeFrame(responseBody.generateFrame(
+                                getChannelId()));
+                    }
+
+                }
+                catch (ReservedExchangeNameException e)
+                {
+                    _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                                "Attempt to declare exchange: 
" + exchangeName +
+                                                                         " 
which begins with reserved prefix.", getChannelId());
+
+
+                }
+                catch (ExchangeExistsException e)
+                {
+                    exchange = e.getExistingExchange();
+                    if (!new AMQShortString(exchange.getType()).equals(type))
+                    {
+                        _connection.closeConnection(AMQConstant.NOT_ALLOWED, 
"Attempt to redeclare exchange: "
+                                                                               
  + exchangeName + " of type "
+                                                                               
  + exchange.getType()
+                                                                               
  + " to " + type + ".", getChannelId());
+
+                    }
+                }
+                catch (NoFactoryForTypeException e)
+                {
+                    _connection.closeConnection(AMQConstant.COMMAND_INVALID, 
"Unknown exchange type '"
+                                                                             + 
e.getType()
+                                                                             + 
"' for exchange '"
+                                                                             + 
exchangeName
+                                                                             + 
"'", getChannelId());
+
+                }
+                catch (AccessControlException e)
+                {
+                    _connection.closeConnection(AMQConstant.ACCESS_REFUSED, 
e.getMessage(), getChannelId());
+
+                }
+                catch (UnknownConfiguredObjectException e)
+                {
+                    // note - since 0-8/9/9-1 can't set the alt. exchange this 
exception should never occur
+                    final String message = "Unknown alternate exchange "
+                                           + (e.getName() != null
+                            ? "name: \"" + e.getName() + "\""
+                            : "id: " + e.getId());
+                    _connection.closeConnection(AMQConstant.NOT_FOUND, 
message, getChannelId());
+
+                }
+                catch (IllegalArgumentException e)
+                {
+                    _connection.closeConnection(AMQConstant.COMMAND_INVALID, 
"Error creating exchange '"
+                                                                             + 
exchangeName
+                                                                             + 
"': "
+                                                                             + 
e.getMessage(), getChannelId());
+
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public void receiveExchangeDelete(final AMQShortString exchangeStr, final 
boolean ifUnused, final boolean nowait)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ExchangeDelete[" +" 
exchange: " + exchangeStr + " ifUnused: " + ifUnused + " nowait: " + nowait + " 
]");
+        }
+
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+        sync();
+        try
+        {
+
+            if (isDefaultExchange(exchangeStr))
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                            "Default Exchange cannot be 
deleted", getChannelId());
+
+            }
+
+            else
+            {
+                final String exchangeName = exchangeStr.toString();
+
+                final ExchangeImpl exchange = 
virtualHost.getExchange(exchangeName);
+                if (exchange == null)
+                {
+                    closeChannel(AMQConstant.NOT_FOUND, "No such exchange: " + 
exchangeStr);
+                }
+                else
+                {
+                    virtualHost.removeExchange(exchange, !ifUnused);
+
+                    ExchangeDeleteOkBody responseBody = 
_connection.getMethodRegistry().createExchangeDeleteOkBody();
+
+                    
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+                }
+            }
+        }
+        catch (ExchangeIsAlternateException e)
+        {
+            closeChannel(AMQConstant.NOT_ALLOWED, "Exchange in use as an 
alternate exchange");
+        }
+        catch (RequiredExchangeException e)
+        {
+            closeChannel(AMQConstant.NOT_ALLOWED,
+                         "Exchange '" + exchangeStr + "' cannot be deleted");
+        }
+        catch (AccessControlException e)
+        {
+            _connection.closeConnection(AMQConstant.ACCESS_REFUSED, 
e.getMessage(), getChannelId());
+        }
+    }
+
+    @Override
+    public void receiveQueueBind(final AMQShortString queueName,
+                                 final AMQShortString exchange,
+                                 AMQShortString routingKey,
+                                 final boolean nowait,
+                                 final FieldTable argumentsTable)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] QueueBind[" +" queue: " + 
queueName +
+                          " exchange: " + exchange +
+                          " bindingKey: " + routingKey +
+                          " nowait: " + nowait + " arguments: " + 
argumentsTable + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+        AMQQueue<?> queue;
+        if (queueName == null)
+        {
+
+            queue = getDefaultQueue();
+
+            if (queue != null)
+            {
+                if (routingKey == null)
+                {
+                    routingKey = AMQShortString.valueOf(queue.getName());
+                }
+                else
+                {
+                    routingKey = routingKey.intern();
+                }
+            }
+        }
+        else
+        {
+            queue = virtualHost.getQueue(queueName.toString());
+            routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : 
routingKey.intern();
+        }
+
+        if (queue == null)
+        {
+            String message = queueName == null
+                    ? "No default queue defined on channel and queue was null"
+                    : "Queue " + queueName + " does not exist.";
+                closeChannel(AMQConstant.NOT_FOUND, message);
+        }
+        else if (isDefaultExchange(exchange))
+        {
+            _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                        "Cannot bind the queue " + queueName + 
" to the default exchange", getChannelId());
+
+        }
+        else
+        {
+
+            final String exchangeName = exchange.toString();
+
+            final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
+            if (exch == null)
+            {
+                closeChannel(AMQConstant.NOT_FOUND,
+                             "Exchange " + exchangeName + " does not exist.");
+            }
+            else
+            {
+
+                try
+                {
+
+                    Map<String, Object> arguments = 
FieldTable.convertToMap(argumentsTable);
+                    String bindingKey = String.valueOf(routingKey);
+
+                    if (!exch.isBound(bindingKey, arguments, queue))
+                    {
+
+                        if (!exch.addBinding(bindingKey, queue, arguments)
+                            && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(
+                                exch.getType()))
+                        {
+                            exch.replaceBinding(bindingKey, queue, arguments);
+                        }
+                    }
+
+                    if (_logger.isInfoEnabled())
+                    {
+                        _logger.info("Binding queue "
+                                     + queue
+                                     + " to exchange "
+                                     + exch
+                                     + " with routing key "
+                                     + routingKey);
+                    }
+                    if (!nowait)
+                    {
+                        sync();
+                        MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+                        AMQMethodBody responseBody = 
methodRegistry.createQueueBindOkBody();
+                        
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+                    }
+                }
+                catch (AccessControlException e)
+                {
+                    _connection.closeConnection(AMQConstant.ACCESS_REFUSED, 
e.getMessage(), getChannelId());
+                }
+            }
+        }
+    }
+
+    @Override
+    public void receiveQueueDeclare(final AMQShortString queueStr,
+                                    final boolean passive,
+                                    final boolean durable,
+                                    final boolean exclusive,
+                                    final boolean autoDelete,
+                                    final boolean nowait,
+                                    final FieldTable arguments)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] QueueDeclare[" +" queue: " 
+ queueStr +
+                          " passive: " + passive +
+                          " durable: " + durable +
+                          " exclusive: " + exclusive +
+                          " autoDelete: " + autoDelete + " nowait: " + nowait 
+ " arguments: " + arguments + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+        final AMQShortString queueName;
+
+        // if we aren't given a queue name, we create one which we return to 
the client
+        if ((queueStr == null) || (queueStr.length() == 0))
+        {
+            queueName = new AMQShortString("tmp_" + UUID.randomUUID());
+        }
+        else
+        {
+            queueName = queueStr.intern();
+        }
+
+        AMQQueue queue;
+
+        //TODO: do we need to check that the queue already exists with exactly 
the same "configuration"?
+
+
+        if (passive)
+        {
+            queue = virtualHost.getQueue(queueName.toString());
+            if (queue == null)
+            {
+                closeChannel(AMQConstant.NOT_FOUND,
+                                                     "Queue: "
+                                                     + queueName
+                                                     + " not found on 
VirtualHost("
+                                                     + virtualHost
+                                                     + ").");
+            }
+            else
+            {
+                if (!queue.verifySessionAccess(this))
+                {
+                    _connection.closeConnection(AMQConstant.NOT_ALLOWED, 
"Queue "
+                                                + queue.getName()
+                                                + " is exclusive, but not 
created on this Connection.", getChannelId());
+                }
+                else
+                {
+                    //set this as the default queue on the channel:
+                    setDefaultQueue(queue);
+                    if (!nowait)
+                    {
+                        sync();
+                        MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+                        QueueDeclareOkBody responseBody =
+                                
methodRegistry.createQueueDeclareOkBody(queueName,
+                                                                        
queue.getQueueDepthMessages(),
+                                                                        
queue.getConsumerCount());
+                        
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+                        _logger.info("Queue " + queueName + " declared 
successfully");
+                    }
+                }
+            }
+        }
+        else
+        {
+
+            try
+            {
+                Map<String, Object> attributes =
+                        
QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(arguments));
+                final String queueNameString = 
AMQShortString.toString(queueName);
+                attributes.put(Queue.NAME, queueNameString);
+                attributes.put(Queue.ID, UUID.randomUUID());
+                attributes.put(Queue.DURABLE, durable);
+
+                LifetimePolicy lifetimePolicy;
+                ExclusivityPolicy exclusivityPolicy;
+
+                if (exclusive)
+                {
+                    lifetimePolicy = autoDelete
+                            ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+                            : durable ? LifetimePolicy.PERMANENT : 
LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+                    exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER 
: ExclusivityPolicy.CONNECTION;
+                }
+                else
+                {
+                    lifetimePolicy = autoDelete ? 
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+                    exclusivityPolicy = ExclusivityPolicy.NONE;
+                }
+
+                attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+                attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+
+
+                queue = virtualHost.createQueue(attributes);
+
+                setDefaultQueue(queue);
+
+                if (!nowait)
+                {
+                    sync();
+                    MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+                    QueueDeclareOkBody responseBody =
+                            methodRegistry.createQueueDeclareOkBody(queueName,
+                                                                    
queue.getQueueDepthMessages(),
+                                                                    
queue.getConsumerCount());
+                    
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+                    _logger.info("Queue " + queueName + " declared 
successfully");
+                }
+            }
+            catch (QueueExistsException qe)
+            {
+
+                queue = qe.getExistingQueue();
+
+                if (!queue.verifySessionAccess(this))
+                {
+                    _connection.closeConnection(AMQConstant.NOT_ALLOWED, 
"Queue "
+                                                + queue.getName()
+                                                + " is exclusive, but not 
created on this Connection.", getChannelId());
+
+                }
+                else if (queue.isExclusive() != exclusive)
+                {
+
+                    closeChannel(AMQConstant.ALREADY_EXISTS,
+                                                         "Cannot re-declare 
queue '"
+                                                         + queue.getName()
+                                                         + "' with different 
exclusivity (was: "
+                                                         + queue.isExclusive()
+                                                         + " requested "
+                                                         + exclusive
+                                                         + ")");
+                }
+                else if ((autoDelete
+                          && queue.getLifetimePolicy() != 
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+                         || (!autoDelete && queue.getLifetimePolicy() != 
((exclusive
+                                                                           && 
!durable)
+                        ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+                        : LifetimePolicy.PERMANENT)))
+                {
+                    closeChannel(AMQConstant.ALREADY_EXISTS,
+                                                         "Cannot re-declare 
queue '"
+                                                         + queue.getName()
+                                                         + "' with different 
lifetime policy (was: "
+                                                         + 
queue.getLifetimePolicy()
+                                                         + " requested 
autodelete: "
+                                                         + autoDelete
+                                                         + ")");
+                }
+                else if (queue.isDurable() != durable)
+                {
+                    closeChannel(AMQConstant.ALREADY_EXISTS,
+                                                         "Cannot re-declare 
queue '"
+                                                         + queue.getName()
+                                                         + "' with different 
durability (was: "
+                                                         + queue.isDurable()
+                                                         + " requested "
+                                                         + durable
+                                                         + ")");
+                }
+                else
+                {
+                    setDefaultQueue(queue);
+                    if (!nowait)
+                    {
+                        sync();
+                        MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+                        QueueDeclareOkBody responseBody =
+                                
methodRegistry.createQueueDeclareOkBody(queueName,
+                                                                        
queue.getQueueDepthMessages(),
+                                                                        
queue.getConsumerCount());
+                        
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+                        _logger.info("Queue " + queueName + " declared 
successfully");
+                    }
+                }
+            }
+            catch (AccessControlException e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED, 
e.getMessage(), getChannelId());
+            }
+
+        }
+    }
+
+    @Override
+    public void receiveQueueDelete(final AMQShortString queueName,
+                                   final boolean ifUnused,
+                                   final boolean ifEmpty,
+                                   final boolean nowait)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] QueueDelete[" +" queue: " 
+ queueName + " ifUnused: " + ifUnused + " ifEmpty: " + ifEmpty + " nowait: " + 
nowait + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+        sync();
+        AMQQueue queue;
+        if (queueName == null)
+        {
+
+            //get the default queue on the channel:
+            queue = getDefaultQueue();
+        }
+        else
+        {
+            queue = virtualHost.getQueue(queueName.toString());
+        }
+
+        if (queue == null)
+        {
+            closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does 
not exist.");
+
+        }
+        else
+        {
+            if (ifEmpty && !queue.isEmpty())
+            {
+                closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is 
not empty.");
+            }
+            else if (ifUnused && !queue.isUnused())
+            {
+                // TODO - Error code
+                closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is 
still used.");
+            }
+            else
+            {
+                if (!queue.verifySessionAccess(this))
+                {
+                    _connection.closeConnection(AMQConstant.NOT_ALLOWED, 
"Queue "
+                                                + queue.getName()
+                                                + " is exclusive, but not 
created on this Connection.", getChannelId());
+
+                }
+                else
+                {
+                    try
+                    {
+                        int purged = virtualHost.removeQueue(queue);
+
+                        MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+                        QueueDeleteOkBody responseBody = 
methodRegistry.createQueueDeleteOkBody(purged);
+                        
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+                    }
+                    catch (AccessControlException e)
+                    {
+                        
_connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), 
getChannelId());
+
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void receiveQueuePurge(final AMQShortString queueName, final 
boolean nowait)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] QueuePurge[" +" queue: " + 
queueName + " nowait: " + nowait + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+        AMQQueue queue = null;
+        if (queueName == null && (queue = getDefaultQueue()) == null)
+        {
+
+            _connection.closeConnection(AMQConstant.NOT_ALLOWED, "No queue 
specified.", getChannelId());
+        }
+        else if ((queueName != null) && (queue = 
virtualHost.getQueue(queueName.toString())) == null)
+        {
+            closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does 
not exist.");
+        }
+        else if (!queue.verifySessionAccess(this))
+        {
+            _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                        "Queue is exclusive, but not created 
on this Connection.", getChannelId());
+        }
+        else
+        {
+            try
+            {
+                long purged = queue.clearQueue();
+                if (!nowait)
+                {
+                    sync();
+                    MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+                    AMQMethodBody responseBody = 
methodRegistry.createQueuePurgeOkBody(purged);
+                    
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+                }
+            }
+            catch (AccessControlException e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED, 
e.getMessage(), getChannelId());
+
+            }
+
+        }
+    }
+
+    @Override
+    public void receiveQueueUnbind(final AMQShortString queueName,
+                                   final AMQShortString exchange,
+                                   final AMQShortString routingKey,
+                                   final FieldTable arguments)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] QueueUnbind[" +" queue: " 
+ queueName +
+                          " exchange: " + exchange +
+                          " bindingKey: " + routingKey +
+                          " arguments: " + arguments + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+
+        final boolean useDefaultQueue = queueName == null;
+        final AMQQueue queue = useDefaultQueue
+                ? getDefaultQueue()
+                : virtualHost.getQueue(queueName.toString());
+
+
+        if (queue == null)
+        {
+            String message = useDefaultQueue
+                    ? "No default queue defined on channel and queue was null"
+                    : "Queue " + queueName + " does not exist.";
+            closeChannel(AMQConstant.NOT_FOUND, message);
+        }
+        else if (isDefaultExchange(exchange))
+        {
+            _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Cannot 
unbind the queue "
+                                                         + queue.getName()
+                                                         + " from the default 
exchange", getChannelId());
+
+        }
+        else
+        {
+
+            final ExchangeImpl exch = 
virtualHost.getExchange(exchange.toString());
+
+            if (exch == null)
+            {
+                closeChannel(AMQConstant.NOT_FOUND, "Exchange " + exchange + " 
does not exist.");
+            }
+            else if (!exch.hasBinding(String.valueOf(routingKey), queue))
+            {
+                closeChannel(AMQConstant.NOT_FOUND, "No such binding");
+            }
+            else
+            {
+                try
+                {
+                    exch.deleteBinding(String.valueOf(routingKey), queue);
+
+                    final AMQMethodBody responseBody = 
_connection.getMethodRegistry().createQueueUnbindOkBody();
+                    sync();
+                    
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+                }
+                catch (AccessControlException e)
+                {
+                    _connection.closeConnection(AMQConstant.ACCESS_REFUSED, 
e.getMessage(), getChannelId());
+
+                }
+            }
+
+        }
+    }
+
+    @Override
+    public void receiveTxSelect()
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] TxSelect");
+        }
+
+        setLocalTransactional();
+
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+        TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
+        _connection.writeFrame(responseBody.generateFrame(_channelId));
+
+    }
+
+    @Override
+    public void receiveTxCommit()
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] TxCommit");
+        }
+
+
+        if (!isTransactional())
+        {
+            closeChannel(AMQConstant.COMMAND_INVALID,
+                         "Fatal error: commit called on non-transactional 
channel");
+        }
+        commit(new Runnable()
+        {
+
+            @Override
+            public void run()
+            {
+                MethodRegistry methodRegistry = 
_connection.getMethodRegistry();
+                AMQMethodBody responseBody = 
methodRegistry.createTxCommitOkBody();
+                _connection.writeFrame(responseBody.generateFrame(_channelId));
+            }
+        }, true);
+
+    }
+
+    @Override
+    public void receiveTxRollback()
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] TxRollback");
+        }
+
+        if (!isTransactional())
+        {
+            closeChannel(AMQConstant.COMMAND_INVALID,
+                         "Fatal error: rollback called on non-transactional 
channel");
+        }
+
+        final MethodRegistry methodRegistry = _connection.getMethodRegistry();
+        final AMQMethodBody responseBody = 
methodRegistry.createTxRollbackOkBody();
+

[... 31 lines stripped ...]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to