Author: rgodfrey
Date: Mon Oct 13 00:58:45 2014
New Revision: 1631275

URL: http://svn.apache.org/r1631275
Log:
Migrate broker to new direct method dispatch mechanism

Added:
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
   (with props)
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
   (with props)
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
   (with props)
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
   (with props)
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
   (with props)
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java
   (with props)
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
   (with props)
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java
   (with props)
Removed:
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
Modified:
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
    
qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 Mon Oct 13 00:58:45 2014
@@ -115,7 +115,7 @@ import org.apache.qpid.transport.Transpo
 public class AMQChannel
         implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
                    AsyncAutoCommitTransaction.FutureRecorder,
-                   ChannelMethodProcessor
+                   ServerChannelMethodProcessor
 {
     public static final int DEFAULT_PREFETCH = 4096;
 
@@ -376,27 +376,18 @@ public class AMQChannel
     }
 
     public void publishContentHeader(ContentHeaderBody contentHeaderBody)
-            throws AMQException
     {
-        if (_currentMessage == null)
+        if (_logger.isDebugEnabled())
         {
-            throw new AMQException("Received content header without previously 
receiving a BasicPublish frame");
+            _logger.debug("Content header received on channel " + _channelId);
         }
-        else
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Content header received on channel " + 
_channelId);
-            }
 
-            _currentMessage.setContentHeaderBody(contentHeaderBody);
+        _currentMessage.setContentHeaderBody(contentHeaderBody);
 
-            deliverCurrentMessageIfComplete();
-        }
+        deliverCurrentMessageIfComplete();
     }
 
     private void deliverCurrentMessageIfComplete()
-            throws AMQException
     {
         // check and deliver if header says body length is zero
         if (_currentMessage.allContentReceived())
@@ -497,7 +488,7 @@ public class AMQChannel
      * @throws AMQConnectionException if the message is mandatory 
close-on-no-route
      * @see AMQProtocolEngine#isCloseWhenNoRoute()
      */
-    private void handleUnroutableMessage(AMQMessage message) throws 
AMQConnectionException
+    private void handleUnroutableMessage(AMQMessage message)
     {
         boolean mandatory = message.isMandatory();
         String description = currentMessageDescription();
@@ -512,26 +503,27 @@ public class AMQChannel
 
         if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
         {
-            throw new AMQConnectionException(
-                    AMQConstant.NO_ROUTE,
-                    "No route for message " + currentMessageDescription(),
-                    0, 0, // default class and method ids
-                    getConnection().getMethodRegistry(),
-                    (Throwable) null);
-        }
-
-        if (mandatory || message.isImmediate())
-        {
-            _transaction.addPostTransactionAction(new 
WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + 
currentMessageDescription(), message));
+            _connection.closeConnection(AMQConstant.NO_ROUTE,
+                    "No route for message " + currentMessageDescription(), 
_channelId);
         }
         else
         {
-            AMQShortString exchangeName = _currentMessage.getExchangeName();
-            AMQShortString routingKey = 
_currentMessage.getMessagePublishInfo().getRoutingKey();
+            if (mandatory || message.isImmediate())
+            {
+                _transaction.addPostTransactionAction(new 
WriteReturnAction(AMQConstant.NO_ROUTE,
+                                                                            
"No Route for message "
+                                                                            + 
currentMessageDescription(),
+                                                                            
message));
+            }
+            else
+            {
+                AMQShortString exchangeName = 
_currentMessage.getExchangeName();
+                AMQShortString routingKey = 
_currentMessage.getMessagePublishInfo().getRoutingKey();
 
-            getVirtualHost().getEventLogger().message(
-                    ExchangeMessages.DISCARDMSG(exchangeName == null ? null : 
exchangeName.asString(),
-                                                routingKey == null ? null : 
routingKey.asString()));
+                getVirtualHost().getEventLogger().message(
+                        ExchangeMessages.DISCARDMSG(exchangeName == null ? 
null : exchangeName.asString(),
+                                                    routingKey == null ? null 
: routingKey.asString()));
+            }
         }
     }
 
@@ -550,13 +542,8 @@ public class AMQChannel
                         : 
_currentMessage.getMessagePublishInfo().getRoutingKey().toString());
     }
 
-    public void publishContentBody(ContentBody contentBody) throws AMQException
+    public void publishContentBody(ContentBody contentBody)
     {
-        if (_currentMessage == null)
-        {
-            throw new AMQException("Received content body without previously 
receiving a Content Header");
-        }
-
         if (_logger.isDebugEnabled())
         {
             _logger.debug(debugIdentity() + " content body received on channel 
" + _channelId);
@@ -568,13 +555,6 @@ public class AMQChannel
 
             deliverCurrentMessageIfComplete();
         }
-        catch (AMQException e)
-        {
-            // we want to make sure we don't keep a reference to the message 
in the
-            // event of an error
-            _currentMessage = null;
-            throw e;
-        }
         catch (RuntimeException e)
         {
             // we want to make sure we don't keep a reference to the message 
in the
@@ -1277,14 +1257,10 @@ public class AMQChannel
 
 
     private AMQMessage createAMQMessage(IncomingMessage incomingMessage, 
StoredMessage<MessageMetaData> handle)
-            throws AMQException
     {
 
         AMQMessage message = new AMQMessage(handle, 
_connection.getReference());
 
-        final BasicContentHeaderProperties properties =
-                  incomingMessage.getContentHeader().getProperties();
-
         return message;
     }
 
@@ -1340,6 +1316,11 @@ public class AMQChannel
         return _subject;
     }
 
+    public boolean hasCurrentMessage()
+    {
+        return _currentMessage != null;
+    }
+
     private class GetDeliveryMethod implements ClientDeliveryMethod
     {
 
@@ -2242,7 +2223,10 @@ public class AMQChannel
     }
 
     @Override
-    public void receiveChannelClose()
+    public void receiveChannelClose(final int replyCode,
+                                    final AMQShortString replyText,
+                                    final int classId,
+                                    final int methodId)
     {
         sync();
         _connection.closeChannel(this);
@@ -2258,6 +2242,43 @@ public class AMQChannel
     }
 
     @Override
+    public void receiveMessageContent(final byte[] data)
+    {
+
+        if(hasCurrentMessage())
+        {
+            publishContentBody(new ContentBody(data));
+        }
+        else
+        {
+            _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+                                        "Attempt to send a content header 
without first sending a publish frame",
+                                        _channelId);
+        }
+    }
+
+    @Override
+    public void receiveMessageHeader(final BasicContentHeaderProperties 
properties, final long bodySize)
+    {
+        if(hasCurrentMessage())
+        {
+            publishContentHeader(new ContentHeaderBody(properties, bodySize));
+        }
+        else
+        {
+            _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+                                        "Attempt to send a content header 
without first sending a publish frame",
+                                        _channelId);
+        }
+    }
+
+    @Override
+    public boolean ignoreAllButCloseOk()
+    {
+        return _connection.ignoreAllButCloseOk() || 
_connection.channelAwaitingClosure(_channelId);
+    }
+
+    @Override
     public void receiveChannelFlow(final boolean active)
     {
         sync();
@@ -2270,9 +2291,15 @@ public class AMQChannel
     }
 
     @Override
+    public void receiveChannelFlowOk(final boolean active)
+    {
+        // TODO - should we do anything here?
+    }
+
+    @Override
     public void receiveExchangeBound(final AMQShortString exchangeName,
-                                     final AMQShortString queueName,
-                                     final AMQShortString routingKey)
+                                     final AMQShortString routingKey,
+                                     final AMQShortString queueName)
     {
         VirtualHostImpl virtualHost = _connection.getVirtualHost();
         MethodRegistry methodRegistry = _connection.getMethodRegistry();

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 Mon Oct 13 00:58:45 2014
@@ -21,6 +21,9 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -28,8 +31,6 @@ import java.security.AccessControlExcept
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -49,7 +50,6 @@ import javax.security.sasl.SaslServer;
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.AMQDecoder;
@@ -58,8 +58,6 @@ import org.apache.qpid.common.ServerProp
 import org.apache.qpid.framing.*;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -93,8 +91,7 @@ import org.apache.qpid.util.BytesDataOut
 
 public class AMQProtocolEngine implements ServerProtocolEngine,
                                           
AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
-                                          AMQVersionAwareProtocolSession,
-                                          ConnectionMethodProcessor
+                                          
ServerMethodProcessor<ServerChannelMethodProcessor>
 {
     private static final Logger _logger = 
Logger.getLogger(AMQProtocolEngine.class);
 
@@ -114,9 +111,9 @@ public class AMQProtocolEngine implement
     private VirtualHostImpl<?,?,?> _virtualHost;
 
     private final Map<Integer, AMQChannel> _channelMap =
-            new HashMap<Integer, AMQChannel>();
+            new HashMap<>();
     private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners 
=
-            new CopyOnWriteArrayList<SessionModelListener>();
+            new CopyOnWriteArrayList<>();
 
     private final AMQChannel[] _cachedChannels = new 
AMQChannel[CHANNEL_CACHE_SIZE + 1];
 
@@ -128,7 +125,7 @@ public class AMQProtocolEngine implement
      * Thread-safety: guarded by {@link #_receivedLock}.
      */
     private final Set<AMQChannel> _channelsForCurrentMessage =
-            new HashSet<AMQChannel>();
+            new HashSet<>();
 
     private AMQDecoder _decoder;
 
@@ -142,14 +139,12 @@ public class AMQProtocolEngine implement
     /* AMQP Version for this session */
     private ProtocolVersion _protocolVersion = 
ProtocolVersion.getLatestSupportedVersion();
     private final MethodRegistry _methodRegistry = new 
MethodRegistry(_protocolVersion);
-    private final FrameCreatingMethodProcessor _methodProcessor = new 
FrameCreatingMethodProcessor(_protocolVersion);
     private final List<Action<? super AMQProtocolEngine>> _taskList =
-            new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
+            new CopyOnWriteArrayList<>();
 
-    private Map<Integer, Long> _closingChannelsList = new 
ConcurrentHashMap<Integer, Long>();
+    private Map<Integer, Long> _closingChannelsList = new 
ConcurrentHashMap<>();
     private ProtocolOutputConverter _protocolOutputConverter;
     private final Subject _authorizedSubject = new Subject();
-    private MethodDispatcher _dispatcher;
 
     private final long _connectionID;
     private Object _reference = new Object();
@@ -183,6 +178,8 @@ public class AMQProtocolEngine implement
     private boolean _authenticated;
     private boolean _compressionSupported;
     private int _messageCompressionThreshold;
+    private int _currentClassId;
+    private int _currentMethodId;
 
     public AMQProtocolEngine(Broker broker,
                              final NetworkConnection network,
@@ -195,7 +192,7 @@ public class AMQProtocolEngine implement
         _transport = transport;
         _maxNoOfChannels = broker.getConnection_sessionCountLimit();
         _receivedLock = new ReentrantLock();
-        _decoder = new AMQDecoder(true, _methodProcessor);
+        _decoder = new BrokerDecoder(this);
         _connectionID = connectionId;
         _logSubject = new ConnectionLogSubject(this);
 
@@ -306,32 +303,9 @@ public class AMQProtocolEngine implement
                 _readBytes += msg.remaining();
 
                 _receivedLock.lock();
-                List<AMQDataBlock> processedMethods = 
_methodProcessor.getProcessedMethods();
                 try
                 {
                     _decoder.decodeBuffer(msg);
-                    for (AMQDataBlock dataBlock : processedMethods)
-                    {
-                        try
-                        {
-                            dataBlockReceived(dataBlock);
-                        }
-                        catch(AMQConnectionException e)
-                        {
-                            if(_logger.isDebugEnabled())
-                            {
-                                _logger.debug("Caught AMQConnectionException 
but will simply stop processing data blocks - the connection should already be 
closed.", e);
-                            }
-                            break;
-                        }
-                        catch (AMQException e)
-                        {
-                            _logger.error("Unexpected exception when 
processing datablock", e);
-                            closeProtocolSession();
-                            break;
-                        }
-                    }
-                    processedMethods.clear();
                     receivedComplete();
                 }
                 catch (ConnectionScopedRuntimeException e)
@@ -361,7 +335,6 @@ public class AMQProtocolEngine implement
                 }
                 finally
                 {
-                    processedMethods.clear();
                     _receivedLock.unlock();
                 }
                 return null;
@@ -399,112 +372,10 @@ public class AMQProtocolEngine implement
         }
     }
 
-    /**
-     * Process the data block.
-     * If the message is for a channel it is added to {@link 
#_channelsForCurrentMessage}.
-     *
-     * @throws AMQConnectionException if unable to process the data block. In 
this case,
-     * the connection is already closed by the time the exception is thrown. 
If any other
-     * type of exception is thrown, the connection is not already closed.
-     */
-    private void dataBlockReceived(AMQDataBlock message) throws AMQException
-    {
-        if (message instanceof ProtocolInitiation)
-        {
-            protocolInitiationReceived((ProtocolInitiation) message);
-
-        }
-        else if (message instanceof AMQFrame)
-        {
-            AMQFrame frame = (AMQFrame) message;
-            frameReceived(frame);
 
-        }
-        else
-        {
-            throw new AMQException("Unknown message type: " + 
message.getClass().getName() + ": " + message);
-        }
-    }
-
-    /**
-     * Handle the supplied frame.
-     * Adds this frame's channel to {@link #_channelsForCurrentMessage}.
-     *
-     * @throws AMQConnectionException if unable to process the data block. In 
this case,
-     * the connection is already closed by the time the exception is thrown. 
If any other
-     * type of exception is thrown, the connection is not already closed.
-     */
-    private void frameReceived(AMQFrame frame) throws AMQException
+    void channelRequiresSync(final AMQChannel amqChannel)
     {
-        int channelId = frame.getChannel();
-        AMQChannel amqChannel = _channelMap.get(channelId);
-        if(amqChannel != null)
-        {
-            // The _receivedLock is already acquired in the caller
-            // It is safe to add channel
-            _channelsForCurrentMessage.add(amqChannel);
-        }
-        else
-        {
-            // Not an error. The frame is probably a channel Open for this 
channel id, which
-            // does not require asynchronous work therefore its absence from
-            // _channelsForCurrentMessage is ok.
-        }
-
-        AMQBody body = frame.getBodyFrame();
-
-        long startTime = 0;
-        String frameToString = null;
-        if (_logger.isDebugEnabled())
-        {
-            startTime = System.currentTimeMillis();
-            frameToString = frame.toString();
-            _logger.debug("RECV: " + frame);
-        }
-
-        // Check that this channel is not closing
-        if (channelAwaitingClosure(channelId))
-        {
-            if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
-            {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Channel[" + channelId + "] awaiting closure 
- processing close-ok");
-                }
-            }
-            else
-            {
-                // The channel has been told to close, we don't process any 
more frames until
-                // it's closed.
-                return;
-            }
-        }
-
-        try
-        {
-            body.handle(channelId, this);
-        }
-        catch(AMQConnectionException e)
-        {
-            _logger.info(e.getMessage() + " whilst processing frame: " + body);
-            closeConnection(channelId, e);
-            throw e;
-        }
-        catch (AMQException e)
-        {
-            closeChannel(channelId, e.getErrorCode() == null ? 
AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
-            throw e;
-        }
-        catch (TransportException e)
-        {
-            closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage());
-            throw e;
-        }
-
-        if(_logger.isDebugEnabled())
-        {
-            _logger.debug("Frame handled in " + (System.currentTimeMillis() - 
startTime) + " ms. Frame: " + frameToString);
-        }
+        _channelsForCurrentMessage.add(amqChannel);
     }
 
     private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
@@ -623,148 +494,6 @@ public class AMQProtocolEngine implement
         return buf;
     }
 
-    public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
-    {
-        final AMQMethodEvent<AMQMethodBody> evt = new 
AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
-
-        try
-        {
-            try
-            {
-                boolean wasAnyoneInterested = methodReceived(evt);
-
-                if (!wasAnyoneInterested)
-                {
-                    throw new AMQNoMethodHandlerException(evt);
-                }
-            }
-            catch (AMQChannelException e)
-            {
-                if (getChannel(channelId) != null)
-                {
-                    if (_logger.isInfoEnabled())
-                    {
-                        _logger.info("Closing channel due to: " + 
e.getMessage());
-                    }
-
-                    AMQConstant errorType = e.getErrorCode();
-                    if(errorType == null)
-                    {
-                        errorType = AMQConstant.INTERNAL_ERROR;
-                    }
-                    writeFrame(new AMQFrame(channelId,
-                                            
getMethodRegistry().createChannelCloseBody(errorType.getCode(),
-                                                                               
        AMQShortString.validValueOf(e.getMessage()),
-                                                                               
        e.getClassId(),
-                                                                               
        e.getMethodId())));
-                    closeChannel(channelId, errorType, e.getMessage());
-                }
-                else
-                {
-                    if (_logger.isDebugEnabled())
-                    {
-                        _logger.debug("ChannelException occurred on 
non-existent channel:" + e.getMessage());
-                    }
-
-                    if (_logger.isInfoEnabled())
-                    {
-                        _logger.info("Closing connection due to: " + 
e.getMessage());
-                    }
-
-                    AMQConnectionException ce = new 
AMQConnectionException(AMQConstant.CHANNEL_ERROR,
-                                                                           
AMQConstant.CHANNEL_ERROR.getName().toString(),
-                                                                           
methodBody, getMethodRegistry());
-
-                    _logger.info(e.getMessage() + " whilst processing:" + 
methodBody);
-                    closeConnection(channelId, ce);
-                }
-            }
-            catch (AMQConnectionException e)
-            {
-                _logger.info(e.getMessage() + " whilst processing:" + 
methodBody);
-                closeConnection(channelId, e);
-            }
-        }
-        catch (Exception e)
-        {
-            _logger.error("Unexpected exception while processing frame.  
Closing connection.", e);
-
-            closeProtocolSession();
-        }
-    }
-    private <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> 
evt) throws AMQException
-    {
-        final MethodDispatcher dispatcher = getMethodDispatcher();
-
-        final int channelId = evt.getChannelId();
-        final B body = evt.getMethod();
-
-        final AMQChannel channel = getChannel(channelId);
-        if(channelId != 0 && channel == null)
-        {
-
-            if(! ((body instanceof ChannelOpenBody)
-                  || (body instanceof ChannelCloseOkBody)
-                  || (body instanceof ChannelCloseBody)))
-            {
-                throw new AMQConnectionException(AMQConstant.CHANNEL_ERROR, 
"channel is closed won't process:" + body, body, getMethodRegistry());
-            }
-
-        }
-        if(channel == null)
-        {
-            return body.execute(dispatcher, channelId);
-        }
-        else
-        {
-            try
-            {
-                return Subject.doAs(channel.getSubject(), new 
PrivilegedExceptionAction<Boolean>()
-                {
-                    @Override
-                    public Boolean run() throws AMQException
-                    {
-                        return body.execute(dispatcher, channelId);
-                    }
-                });
-            }
-            catch (PrivilegedActionException e)
-            {
-                if(e.getCause() instanceof AMQException)
-                {
-                    throw (AMQException) e.getCause();
-                }
-                else
-                {
-                    throw new ServerScopedRuntimeException(e.getCause());
-                }
-            }
-
-
-        }
-
-    }
-
-    public void contentHeaderReceived(int channelId, ContentHeaderBody body) 
throws AMQException
-    {
-
-        AMQChannel channel = getAndAssertChannel(channelId);
-
-        channel.publishContentHeader(body);
-
-    }
-
-    public void contentBodyReceived(int channelId, ContentBody body) throws 
AMQException
-    {
-        AMQChannel channel = getAndAssertChannel(channelId);
-
-        channel.publishContentBody(body);
-    }
-
-    public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
-    {
-        // NO - OP
-    }
 
     /**
      * Convenience method that writes a frame to the protocol session. 
Equivalent to calling
@@ -808,19 +537,8 @@ public class AMQProtocolEngine implement
     {
         synchronized (_channelMap)
         {
-            return new ArrayList<AMQChannel>(_channelMap.values());
-        }
-    }
-
-    public AMQChannel getAndAssertChannel(int channelId) throws AMQException
-    {
-        AMQChannel channel = getChannel(channelId);
-        if (channel == null)
-        {
-            throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found 
with id:" + channelId);
+            return new ArrayList<>(_channelMap.values());
         }
-
-        return channel;
     }
 
     public AMQChannel getChannel(int channelId)
@@ -899,8 +617,8 @@ public class AMQProtocolEngine implement
         writeFrame(new AMQFrame(channel.getChannelId(),
                                 
getMethodRegistry().createChannelCloseBody(cause.getCode(),
                                                                            
AMQShortString.validValueOf(message),
-                                                                           
_methodProcessor.getClassId(),
-                                                                           
_methodProcessor.getMethodId())));
+                                                                           
_currentClassId,
+                                                                           
_currentMethodId)));
         closeChannel(channel, cause, message, true);
     }
 
@@ -1106,7 +824,7 @@ public class AMQProtocolEngine implement
         {
             _logger.info("Closing connection due to: " + message);
         }
-        closeConnection(channelId, new AMQFrame(0, new 
ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), 
AMQShortString.validValueOf(message), _methodProcessor.getClassId(), 
_methodProcessor.getMethodId())));
+        closeConnection(channelId, new AMQFrame(0, new 
ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), 
AMQShortString.validValueOf(message), _currentClassId, _currentMethodId)));
     }
 
     private void closeConnection(int channelId, AMQFrame frame)
@@ -1224,9 +942,7 @@ public class AMQProtocolEngine implement
     {
         _protocolVersion = pv;
         _methodRegistry.setProtocolVersion(_protocolVersion);
-        _methodProcessor.setProtocolVersion(_protocolVersion);
         _protocolOutputConverter = new ProtocolOutputConverterImpl(this);
-        _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this);
     }
 
     public byte getProtocolMajorVersion()
@@ -1335,11 +1051,6 @@ public class AMQProtocolEngine implement
         return _methodRegistry;
     }
 
-    public MethodDispatcher getMethodDispatcher()
-    {
-        return _dispatcher;
-    }
-
     public void closed()
     {
         try
@@ -1353,14 +1064,10 @@ public class AMQProtocolEngine implement
                 closeProtocolSession();
             }
         }
-        catch (ConnectionScopedRuntimeException e)
+        catch (ConnectionScopedRuntimeException | TransportException e)
         {
             _logger.error("Could not close protocol engine", e);
         }
-        catch (TransportException e)
-        {
-           _logger.error("Could not close protocol engine", e);
-        }
     }
 
     public void readerIdle()
@@ -1427,11 +1134,6 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public void setSender(Sender<ByteBuffer> sender)
-    {
-        // Do nothing
-    }
-
     public long getReadBytes()
     {
         return _readBytes;
@@ -1572,7 +1274,7 @@ public class AMQProtocolEngine implement
 
     public List<AMQChannel> getSessionModels()
     {
-               return new ArrayList<AMQChannel>(getChannels());
+               return new ArrayList<>(getChannels());
     }
 
     public LogSubject getLogSubject()
@@ -2074,4 +1776,52 @@ public class AMQProtocolEngine implement
             return _broker.getEventLogger();
         }
     }
+
+    @Override
+    public ServerChannelMethodProcessor getChannelMethodProcessor(final int 
channelId)
+    {
+        ServerChannelMethodProcessor channelMethodProcessor = 
getChannel(channelId);
+        if(channelMethodProcessor == null)
+        {
+            channelMethodProcessor = (ServerChannelMethodProcessor) 
Proxy.newProxyInstance(ServerMethodDispatcher.class.getClassLoader(),
+                                                            new Class[] { 
ServerChannelMethodProcessor.class }, new InvocationHandler()
+                    {
+                        @Override
+                        public Object invoke(final Object proxy, final Method 
method, final Object[] args)
+                                throws Throwable
+                        {
+                            closeConnection(AMQConstant.CHANNEL_ERROR, 
"Unknown channel id: " + channelId, channelId);
+
+                            return null;
+                        }
+                    });
+        }
+        return channelMethodProcessor;
+    }
+
+    @Override
+    public void receiveHeartbeat()
+    {
+        // No op
+    }
+
+    @Override
+    public void receiveProtocolHeader(final ProtocolInitiation 
protocolInitiation)
+    {
+        protocolInitiationReceived(protocolInitiation);
+    }
+
+    @Override
+    public void setCurrentMethod(final int classId, final int methodId)
+    {
+        _currentClassId = classId;
+        _currentMethodId = methodId;
+    }
+
+    @Override
+    public boolean ignoreAllButCloseOk()
+    {
+        return _closing.get();
+    }
+
 }

Added: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java?rev=1631275&view=auto
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
 (added)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
 Mon Oct 13 00:58:45 2014
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import java.io.IOException;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.Subject;
+
+import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.codec.ServerDecoder;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+
+public class BrokerDecoder extends ServerDecoder
+{
+    private final AMQProtocolEngine _connection;
+    /**
+     * Creates a new AMQP decoder.
+     *
+     * @param connection
+     */
+    public BrokerDecoder(final AMQProtocolEngine connection)
+    {
+        super(connection);
+        _connection = connection;
+    }
+
+    @Override
+    protected void processFrame(final int channelId, final byte type, final 
long bodySize, final MarkableDataInput in)
+            throws AMQFrameDecodingException, IOException
+    {
+        Subject subject;
+        AMQChannel channel = _connection.getChannel(channelId);
+        if(channel == null)
+        {
+            subject = _connection.getSubject();
+        }
+        else
+        {
+            _connection.channelRequiresSync(channel);
+
+            subject = channel.getSubject();
+        }
+        try
+        {
+            Subject.doAs(subject, new PrivilegedExceptionAction<Object>()
+            {
+                @Override
+                public Void run() throws IOException, AMQFrameDecodingException
+                {
+                    doProcessFrame(channelId, type, bodySize, in);
+                    return null;
+                }
+            });
+        }
+        catch (PrivilegedActionException e)
+        {
+            Throwable cause = e.getCause();
+            if(cause instanceof IOException)
+            {
+                throw (IOException) cause;
+            }
+            else if(cause instanceof AMQFrameDecodingException)
+            {
+                throw (AMQFrameDecodingException) cause;
+            }
+            else if(cause instanceof RuntimeException)
+            {
+                throw (RuntimeException) cause;
+            }
+            else throw new ServerScopedRuntimeException(cause);
+        }
+
+    }
+
+
+    private void doProcessFrame(final int channelId, final byte type, final 
long bodySize, final MarkableDataInput in)
+            throws AMQFrameDecodingException, IOException
+    {
+        super.processFrame(channelId, type, bodySize, in);
+
+    }
+
+}

Propchange: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
 Mon Oct 13 00:58:45 2014
@@ -20,16 +20,15 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.AMQException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.server.message.MessageDestination;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class IncomingMessage
 {
 
@@ -58,7 +57,7 @@ public class IncomingMessage
         return _messagePublishInfo;
     }
 
-    public void addContentBodyFrame(final ContentBody contentChunk) throws 
AMQException
+    public void addContentBodyFrame(final ContentBody contentChunk)
     {
         _bodyLengthReceived += contentChunk.getSize();
         _contentChunks.add(contentChunk);
@@ -94,7 +93,7 @@ public class IncomingMessage
         _messageDestination = e;
     }
 
-    public int getBodyCount() throws AMQException
+    public int getBodyCount()
     {
         return _contentChunks.size();
     }

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 Mon Oct 13 00:58:45 2014
@@ -48,6 +48,7 @@ import org.apache.qpid.client.state.AMQS
 import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.ClientDecoder;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
@@ -193,7 +194,7 @@ public class AMQProtocolHandler implemen
         _connection = con;
         _protocolSession = new AMQProtocolSession(this, _connection);
         _stateManager = new AMQStateManager(_protocolSession);
-        _decoder = new AMQDecoder(false, 
_protocolSession.getMethodProcessor());
+        _decoder = new ClientDecoder(_protocolSession.getMethodProcessor());
         _failoverHandler = new FailoverHandler(this);
     }
 

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
 Mon Oct 13 00:58:45 2014
@@ -30,14 +30,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.ListIterator;
 
-import org.apache.qpid.framing.AMQDataBlockDecoder;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ByteArrayDataInput;
-import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.MethodProcessor;
-import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
 
 /**
  * AMQDecoder delegates the decoding of AMQP either to a data block decoder, 
or in the case of new connections, to a
@@ -51,12 +45,9 @@ import org.apache.qpid.framing.ProtocolI
  * TODO If protocol initiation decoder not needed, then don't create it. 
Probably not a big deal, but it adds to the
  *       per-session overhead.
  */
-public class AMQDecoder
+public abstract class AMQDecoder<T extends MethodProcessor>
 {
-    private final MethodProcessor _methodProcessor;
-
-    /** Holds the 'normal' AMQP data decoder. */
-    private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
+    private final T _methodProcessor;
 
     /** Holds the protocol initiation decoder. */
     private ProtocolInitiation.Decoder _piDecoder = new 
ProtocolInitiation.Decoder();
@@ -67,6 +58,8 @@ public class AMQDecoder
 
     private boolean _firstRead = true;
 
+    private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
+
     private List<ByteArrayInputStream> _remainingBufs = new 
ArrayList<ByteArrayInputStream>();
 
     /**
@@ -75,7 +68,7 @@ public class AMQDecoder
      * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to 
handle protocol initiation.
      * @param methodProcessor method processor
      */
-    public AMQDecoder(boolean expectProtocolInitiation, MethodProcessor 
methodProcessor)
+    protected AMQDecoder(boolean expectProtocolInitiation, T methodProcessor)
     {
         _expectProtocolInitiation = expectProtocolInitiation;
         _methodProcessor = methodProcessor;
@@ -96,7 +89,12 @@ public class AMQDecoder
 
     public void setMaxFrameSize(final int frameMax)
     {
-        _dataBlockDecoder.setMaxFrameSize(frameMax);
+        _maxFrameSize = frameMax;
+    }
+
+    public T getMethodProcessor()
+    {
+        return _methodProcessor;
     }
 
     private class RemainingByteArrayInputStream extends InputStream
@@ -254,10 +252,10 @@ public class AMQDecoder
         {
             if(!_expectProtocolInitiation)
             {
-                enoughData = _dataBlockDecoder.decodable(msg);
+                enoughData = decodable(msg);
                 if (enoughData)
                 {
-                    _dataBlockDecoder.processInput(_methodProcessor, msg);
+                    processInput(msg);
                 }
             }
             else
@@ -303,4 +301,105 @@ public class AMQDecoder
             }
         }
     }
+
+    private boolean decodable(final MarkableDataInput in) throws 
AMQFrameDecodingException, IOException
+    {
+        final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
+        // type, channel, body length and end byte
+        if (remainingAfterAttributes < 0)
+        {
+            return false;
+        }
+
+        in.mark(8);
+        in.skip(1 + 2);
+
+
+        // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
+        final long bodySize = in.readInt() & 0xffffffffL;
+        if (bodySize > _maxFrameSize)
+        {
+            throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+                                                "Incoming frame size of "
+                                                + bodySize
+                                                + " is larger than negotiated 
maximum of  "
+                                                + _maxFrameSize);
+        }
+        in.reset();
+
+        return (remainingAfterAttributes >= bodySize);
+
+    }
+
+    private void processInput(final MarkableDataInput in)
+            throws AMQFrameDecodingException, AMQProtocolVersionException, 
IOException
+    {
+        final byte type = in.readByte();
+
+        final int channel = in.readUnsignedShort();
+        final long bodySize = EncodingUtils.readUnsignedInteger(in);
+
+        // bodySize can be zero
+        if ((channel < 0) || (bodySize < 0))
+        {
+            throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+                                                "Undecodable frame: type = " + 
type + " channel = " + channel
+                                                + " bodySize = " + bodySize);
+        }
+
+        processFrame(channel, type, bodySize, in);
+
+        byte marker = in.readByte();
+        if ((marker & 0xFF) != 0xCE)
+        {
+            throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+                                                "End of frame marker not 
found. Read " + marker + " length=" + bodySize
+                                                + " type=" + type);
+        }
+
+    }
+
+    protected void processFrame(final int channel, final byte type, final long 
bodySize, final MarkableDataInput in)
+            throws AMQFrameDecodingException, IOException
+    {
+        switch (type)
+        {
+            case 1:
+                processMethod(channel, in);
+                break;
+            case 2:
+                ContentHeaderBody.process(in, 
_methodProcessor.getChannelMethodProcessor(channel), bodySize);
+                break;
+            case 3:
+                ContentBody.process(in, 
_methodProcessor.getChannelMethodProcessor(channel), bodySize);
+                break;
+            case 8:
+                HeartbeatBody.process(channel, in, _methodProcessor, bodySize);
+                break;
+            default:
+                throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, 
"Unsupported frame type: " + type);
+        }
+    }
+
+
+    abstract void processMethod(int channelId,
+                               MarkableDataInput in)
+            throws AMQFrameDecodingException, IOException;
+
+    AMQFrameDecodingException newUnknownMethodException(final int classId,
+                                                        final int methodId,
+                                                        ProtocolVersion 
protocolVersion)
+    {
+        return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID,
+                                             "Method "
+                                             + methodId
+                                             + " unknown in AMQP version "
+                                             + protocolVersion
+                                             + " (while trying to decode class 
"
+                                             + classId
+                                             + " method "
+                                             + methodId
+                                             + ".");
+    }
+
 }

Added: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java?rev=1631275&view=auto
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
 (added)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
 Mon Oct 13 00:58:45 2014
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import java.io.IOException;
+
+import org.apache.qpid.framing.*;
+
+public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends 
ClientChannelMethodProcessor>>
+{
+
+    /**
+     * Creates a new AMQP decoder.
+     *
+     * @param methodProcessor          method processor
+     */
+    public ClientDecoder(final ClientMethodProcessor<? extends 
ClientChannelMethodProcessor> methodProcessor)
+    {
+        super(false, methodProcessor);
+    }
+
+
+    void processMethod(int channelId,
+                       MarkableDataInput in)
+            throws AMQFrameDecodingException, IOException
+    {
+        ClientMethodProcessor<? extends ClientChannelMethodProcessor> 
methodProcessor = getMethodProcessor();
+        ClientChannelMethodProcessor channelMethodProcessor = 
methodProcessor.getChannelMethodProcessor(channelId);
+        final int classAndMethod = in.readInt();
+        int classId = classAndMethod >> 16;
+        int methodId = classAndMethod & 0xFFFF;
+        methodProcessor.setCurrentMethod(classId, methodId);
+        try
+        {
+            switch (classAndMethod)
+            {
+                //CONNECTION_CLASS:
+                case 0x000a000a:
+                    ConnectionStartBody.process(in, methodProcessor);
+                    break;
+                case 0x000a0014:
+                    ConnectionSecureBody.process(in, methodProcessor);
+                    break;
+                case 0x000a001e:
+                    ConnectionTuneBody.process(in, methodProcessor);
+                    break;
+                case 0x000a0029:
+                    ConnectionOpenOkBody.process(in, methodProcessor);
+                    break;
+                case 0x000a002a:
+                    ConnectionRedirectBody.process(in, methodProcessor);
+                    break;
+                case 0x000a0032:
+                    if 
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+                    {
+                        ConnectionRedirectBody.process(in, methodProcessor);
+                    }
+                    else
+                    {
+                        ConnectionCloseBody.process(in, methodProcessor);
+                    }
+                    break;
+                case 0x000a0033:
+                    if 
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+                    {
+                        throw newUnknownMethodException(classId, methodId,
+                                                        
methodProcessor.getProtocolVersion());
+                    }
+                    else
+                    {
+                        methodProcessor.receiveConnectionCloseOk();
+                    }
+                    break;
+                case 0x000a003c:
+                    if 
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+                    {
+                        ConnectionCloseBody.process(in, methodProcessor);
+                    }
+                    else
+                    {
+                        throw newUnknownMethodException(classId, methodId,
+                                                        
methodProcessor.getProtocolVersion());
+                    }
+                    break;
+                case 0x000a003d:
+                    if 
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+                    {
+                        methodProcessor.receiveConnectionCloseOk();
+                    }
+                    else
+                    {
+                        throw newUnknownMethodException(classId, methodId,
+                                                        
methodProcessor.getProtocolVersion());
+                    }
+                    break;
+
+                // CHANNEL_CLASS:
+
+                case 0x0014000b:
+                    ChannelOpenOkBody.process(in, 
methodProcessor.getProtocolVersion(), channelMethodProcessor);
+                    break;
+                case 0x00140014:
+                    ChannelFlowBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00140015:
+                    ChannelFlowOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x0014001e:
+                    ChannelAlertBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00140028:
+                    ChannelCloseBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00140029:
+                    channelMethodProcessor.receiveChannelCloseOk();
+                    break;
+
+                // ACCESS_CLASS:
+
+                case 0x001e000b:
+                    AccessRequestOkBody.process(in, channelMethodProcessor);
+                    break;
+
+                // EXCHANGE_CLASS:
+
+                case 0x0028000b:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveExchangeDeclareOk();
+                    }
+                    break;
+                case 0x00280015:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveExchangeDeleteOk();
+                    }
+                    break;
+                case 0x00280017:
+                    ExchangeBoundOkBody.process(in, channelMethodProcessor);
+                    break;
+
+
+                // QUEUE_CLASS:
+
+                case 0x0032000b:
+                    QueueDeclareOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00320015:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveQueueBindOk();
+                    }
+                    break;
+                case 0x0032001f:
+                    QueuePurgeOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00320029:
+                    QueueDeleteOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00320033:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveQueueUnbindOk();
+                    }
+                    break;
+
+
+                // BASIC_CLASS:
+
+                case 0x003c000b:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveBasicQosOk();
+                    }
+                    break;
+                case 0x003c0015:
+                    BasicConsumeOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c001f:
+                    BasicCancelOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0032:
+                    BasicReturnBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c003c:
+                    BasicDeliverBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0047:
+                    BasicGetOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0048:
+                    BasicGetEmptyBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0065:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveBasicRecoverSyncOk();
+                    }
+                    break;
+                case 0x003c006f:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveBasicRecoverSyncOk();
+                    }
+                    break;
+
+                // TX_CLASS:
+
+                case 0x005a000b:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveTxSelectOk();
+                    }
+                    break;
+                case 0x005a0015:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveTxCommitOk();
+                    }
+                    break;
+                case 0x005a001f:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveTxRollbackOk();
+                    }
+                    break;
+
+                default:
+                    throw newUnknownMethodException(classId, methodId,
+                                                    
methodProcessor.getProtocolVersion());
+
+            }
+        }
+        finally
+        {
+            methodProcessor.setCurrentMethod(0, 0);
+        }
+    }
+
+}

Propchange: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java?rev=1631275&view=auto
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
 (added)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
 Mon Oct 13 00:58:45 2014
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import java.io.IOException;
+
+import org.apache.qpid.framing.*;
+
+public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends 
ServerChannelMethodProcessor>>
+{
+
+    /**
+     * Creates a new AMQP decoder.
+     *
+     * @param methodProcessor          method processor
+     */
+    public ServerDecoder(final ServerMethodProcessor<? extends 
ServerChannelMethodProcessor> methodProcessor)
+    {
+        super(true, methodProcessor);
+    }
+
+    void processMethod(int channelId,
+                       MarkableDataInput in)
+            throws AMQFrameDecodingException, IOException
+    {
+        ServerMethodProcessor<? extends ServerChannelMethodProcessor> 
methodProcessor = getMethodProcessor();
+        ServerChannelMethodProcessor channelMethodProcessor = 
methodProcessor.getChannelMethodProcessor(channelId);
+        final int classAndMethod = in.readInt();
+        int classId = classAndMethod >> 16;
+        int methodId = classAndMethod & 0xFFFF;
+        methodProcessor.setCurrentMethod(classId, methodId);
+        try
+        {
+            switch (classAndMethod)
+            {
+                //CONNECTION_CLASS:
+                case 0x000a000b:
+                    ConnectionStartOkBody.process(in, methodProcessor);
+                    break;
+                case 0x000a0015:
+                    ConnectionSecureOkBody.process(in, methodProcessor);
+                    break;
+                case 0x000a001f:
+                    ConnectionTuneOkBody.process(in, methodProcessor);
+                    break;
+                case 0x000a0028:
+                    ConnectionOpenBody.process(in, methodProcessor);
+                    break;
+                case 0x000a0032:
+                    if 
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+                    {
+                        throw newUnknownMethodException(classId, methodId,
+                                                        
methodProcessor.getProtocolVersion());
+                    }
+                    else
+                    {
+                        ConnectionCloseBody.process(in, methodProcessor);
+                    }
+                    break;
+                case 0x000a0033:
+                    if 
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+                    {
+                        throw newUnknownMethodException(classId, methodId,
+                                                        
methodProcessor.getProtocolVersion());
+                    }
+                    else
+                    {
+                        methodProcessor.receiveConnectionCloseOk();
+                    }
+                    break;
+                case 0x000a003c:
+                    if 
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+                    {
+                        ConnectionCloseBody.process(in, methodProcessor);
+                    }
+                    else
+                    {
+                        throw newUnknownMethodException(classId, methodId,
+                                                        
methodProcessor.getProtocolVersion());
+                    }
+                    break;
+                case 0x000a003d:
+                    if 
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+                    {
+                        methodProcessor.receiveConnectionCloseOk();
+                    }
+                    else
+                    {
+                        throw newUnknownMethodException(classId, methodId,
+                                                        
methodProcessor.getProtocolVersion());
+                    }
+                    break;
+
+                // CHANNEL_CLASS:
+
+                case 0x0014000a:
+                    ChannelOpenBody.process(channelId, in, methodProcessor);
+                    break;
+                case 0x00140014:
+                    ChannelFlowBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00140015:
+                    ChannelFlowOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00140028:
+                    ChannelCloseBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00140029:
+                    channelMethodProcessor.receiveChannelCloseOk();
+                    break;
+
+                // ACCESS_CLASS:
+
+                case 0x001e000a:
+                    AccessRequestBody.process(in, channelMethodProcessor);
+                    break;
+
+                // EXCHANGE_CLASS:
+
+                case 0x0028000a:
+                    ExchangeDeclareBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00280014:
+                    ExchangeDeleteBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00280016:
+                    ExchangeBoundBody.process(in, channelMethodProcessor);
+                    break;
+
+
+                // QUEUE_CLASS:
+
+                case 0x0032000a:
+                    QueueDeclareBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00320014:
+                    QueueBindBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x0032001e:
+                    QueuePurgeBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00320028:
+                    QueueDeleteBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00320032:
+                    QueueUnbindBody.process(in, channelMethodProcessor);
+                    break;
+
+
+                // BASIC_CLASS:
+
+                case 0x003c000a:
+                    BasicQosBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0014:
+                    BasicConsumeBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c001e:
+                    BasicCancelBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0028:
+                    BasicPublishBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0046:
+                    BasicGetBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0050:
+                    BasicAckBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c005a:
+                    BasicRejectBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0064:
+                    BasicRecoverBody.process(in, 
methodProcessor.getProtocolVersion(), channelMethodProcessor);
+                    break;
+                case 0x003c0066:
+                    BasicRecoverSyncBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c006e:
+                    BasicRecoverSyncBody.process(in, channelMethodProcessor);
+                    break;
+
+                // TX_CLASS:
+
+                case 0x005a000a:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveTxSelect();
+                    }
+                    break;
+                case 0x005a0014:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveTxCommit();
+                    }
+                    break;
+                case 0x005a001e:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveTxRollback();
+                    }
+                    break;
+
+                default:
+                    throw newUnknownMethodException(classId, methodId,
+                                                    
methodProcessor.getProtocolVersion());
+
+            }
+        }
+        finally
+        {
+            methodProcessor.setCurrentMethod(0, 0);
+        }
+    }
+
+}

Propchange: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
 Mon Oct 13 00:58:45 2014
@@ -165,9 +165,8 @@ public class AccessRequestBody extends A
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws 
IOException
+    public static void process(final MarkableDataInput buffer,
+                               final ServerChannelMethodProcessor dispatcher) 
throws IOException
     {
         AMQShortString realm = buffer.readAMQShortString();
         byte bitfield = buffer.readByte();
@@ -176,6 +175,9 @@ public class AccessRequestBody extends A
         boolean active = (bitfield & 0x04) == 0x4 ;
         boolean write = (bitfield & 0x08) == 0x8 ;
         boolean read = (bitfield & 0x10) == 0x10 ;
-        dispatcher.receiveAccessRequest(channelId, realm, exclusive, passive, 
active, write, read);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveAccessRequest(realm, exclusive, passive, active, 
write, read);
+        }
     }
 }

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
 Mon Oct 13 00:58:45 2014
@@ -95,10 +95,14 @@ public class AccessRequestOkBody extends
         return buf.toString();
     }
 
-    public static void process(final int channelId, final MarkableDataInput 
buffer, final MethodProcessor dispatcher)
+    public static void process(final MarkableDataInput buffer,
+                               final ClientChannelMethodProcessor dispatcher)
             throws IOException
     {
         int ticket = buffer.readUnsignedShort();
-        dispatcher.receiveAccessRequestOk(channelId, ticket);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveAccessRequestOk(ticket);
+        }
     }
 }

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
 Mon Oct 13 00:58:45 2014
@@ -112,13 +112,15 @@ public class BasicAckBody extends AMQMet
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws 
IOException
+    public static void process(final MarkableDataInput buffer,
+                               final ServerChannelMethodProcessor dispatcher) 
throws IOException
     {
 
         long deliveryTag = buffer.readLong();
         boolean multiple = (buffer.readByte() & 0x01) != 0;
-        dispatcher.receiveBasicAck(channelId, deliveryTag, multiple);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveBasicAck(deliveryTag, multiple);
+        }
     }
 }

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
 Mon Oct 13 00:58:45 2014
@@ -113,13 +113,15 @@ public class BasicCancelBody extends AMQ
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws 
IOException
+    public static void process(final MarkableDataInput buffer,
+                               final ServerChannelMethodProcessor dispatcher) 
throws IOException
     {
 
         AMQShortString consumerTag = buffer.readAMQShortString();
         boolean noWait = (buffer.readByte() & 0x01) == 0x01;
-        dispatcher.receiveBasicCancel(channelId, consumerTag, noWait);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveBasicCancel(consumerTag, noWait);
+        }
     }
 }

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
 Mon Oct 13 00:58:45 2014
@@ -96,10 +96,14 @@ public class BasicCancelOkBody extends A
         return buf.toString();
     }
 
-    public static void process(final int channelId, final MarkableDataInput 
in, final MethodProcessor dispatcher)
+    public static void process(final MarkableDataInput in,
+                               final ClientChannelMethodProcessor dispatcher)
             throws IOException
     {
         AMQShortString consumerTag = in.readAMQShortString();
-        dispatcher.receiveBasicCancelOk(channelId, consumerTag);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveBasicCancelOk(consumerTag);
+        }
     }
 }

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
 Mon Oct 13 00:58:45 2014
@@ -191,7 +191,8 @@ public class BasicConsumeBody extends AM
         return buf.toString();
     }
 
-    public static void process(final int channelId, final MarkableDataInput 
buffer, final MethodProcessor dispatcher)
+    public static void process(final MarkableDataInput buffer,
+                               final ServerChannelMethodProcessor dispatcher)
             throws IOException, AMQFrameDecodingException
     {
 
@@ -205,6 +206,9 @@ public class BasicConsumeBody extends AM
         boolean exclusive = (bitfield & 0x04) == 0x04;
         boolean nowait = (bitfield & 0x08) == 0x08;
         FieldTable arguments = EncodingUtils.readFieldTable(buffer);
-        dispatcher.receiveBasicConsume(channelId, queue, consumerTag, noLocal, 
noAck, exclusive, nowait, arguments);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveBasicConsume(queue, consumerTag, noLocal, noAck, 
exclusive, nowait, arguments);
+        }
     }
 }

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
 Mon Oct 13 00:58:45 2014
@@ -96,10 +96,14 @@ public class BasicConsumeOkBody extends 
         return buf.toString();
     }
 
-    public static void process(final int channelId, final MarkableDataInput 
buffer, final MethodProcessor dispatcher)
+    public static void process(final MarkableDataInput buffer,
+                               final ClientChannelMethodProcessor dispatcher)
             throws IOException
     {
         AMQShortString consumerTag = buffer.readAMQShortString();
-        dispatcher.receiveBasicConsumeOk(channelId, consumerTag);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveBasicConsumeOk(consumerTag);
+        }
     }
 }

Modified: 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
 (original)
+++ 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
 Mon Oct 13 00:58:45 2014
@@ -152,9 +152,8 @@ public class BasicDeliverBody extends AM
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws 
IOException
+    public static void process(final MarkableDataInput buffer,
+                               final ClientChannelMethodProcessor dispatcher) 
throws IOException
     {
 
         AMQShortString consumerTag = buffer.readAMQShortString();
@@ -162,6 +161,9 @@ public class BasicDeliverBody extends AM
         boolean redelivered = (buffer.readByte() & 0x01) != 0;
         AMQShortString exchange = buffer.readAMQShortString();
         AMQShortString routingKey = buffer.readAMQShortString();
-        dispatcher.receiveBasicDeliver(channelId, consumerTag, deliveryTag, 
redelivered, exchange, routingKey);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveBasicDeliver(consumerTag, deliveryTag, 
redelivered, exchange, routingKey);
+        }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to