Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java Sat Oct 11 23:46:39 2014 @@ -37,6 +37,8 @@ import org.apache.qpid.server.security.a public class ServerMethodProcessor implements MethodProcessor { private static final Logger LOGGER = Logger.getLogger(ServerMethodProcessor.class); + private int _classId; + private int _methodId; private static interface ChannelAction @@ -92,8 +94,8 @@ public class ServerMethodProcessor imple mechanisms, locales)); } - _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unexpected method received: ConnectionStart", 0, - ConnectionStartBody.CLASS_ID, ConnectionStartBody.METHOD_ID); + _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unexpected method received: ConnectionStart", 0 + ); } @@ -121,9 +123,8 @@ public class ServerMethodProcessor imple if (ss == null) { _connection.closeConnection(AMQConstant.RESOURCE_ERROR, - "Unable to create SASL Server:" + mechanism, 0, - ConnectionStartOkBody.CLASS_ID, - ConnectionStartOkBody.METHOD_ID); + "Unable to create SASL Server:" + mechanism, 0 + ); } else { @@ -143,9 +144,8 @@ public class ServerMethodProcessor imple LOGGER.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); _connection.closeConnection(AMQConstant.NOT_ALLOWED, - AMQConstant.NOT_ALLOWED.getName().toString(), 0, - ConnectionStartOkBody.CLASS_ID, - ConnectionStartOkBody.METHOD_ID); + AMQConstant.NOT_ALLOWED.getName().toString(), 0 + ); disposeSaslServer(); break; @@ -182,8 +182,8 @@ public class ServerMethodProcessor imple { disposeSaslServer(); - _connection.closeConnection(AMQConstant.RESOURCE_ERROR, "SASL error: " + e.getMessage(), 0, - ConnectionStartOkBody.CLASS_ID, ConnectionStartOkBody.METHOD_ID); + _connection.closeConnection(AMQConstant.RESOURCE_ERROR, "SASL error: " + e.getMessage(), 0 + ); } } @@ -938,6 +938,13 @@ public class ServerMethodProcessor imple } + @Override + public void setCurrentMethod(final int classId, final int methodId) + { + _classId = classId; + _methodId = methodId; + } + private void disposeSaslServer() { SaslServer ss = _connection.getSaslServer();
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Sat Oct 11 23:46:39 2014 @@ -20,13 +20,12 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.queue.QueueEntry; - import java.util.Collection; import java.util.Set; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; + public interface UnacknowledgedMessageMap { @@ -37,12 +36,12 @@ public interface UnacknowledgedMessageMa *@param message the message being iterated over @return true to stop iteration, false to continue * @throws AMQException */ - boolean callback(final long deliveryTag, MessageInstance message) throws AMQException; + boolean callback(final long deliveryTag, MessageInstance message); void visitComplete(); } - void visit(Visitor visitor) throws AMQException; + void visit(Visitor visitor); void add(long deliveryTag, MessageInstance message); Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Sat Oct 11 23:46:39 2014 @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.qpid.AMQException; import org.apache.qpid.server.message.MessageInstance; public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap @@ -82,7 +81,7 @@ public class UnacknowledgedMessageMapImp } } - public void visit(Visitor visitor) throws AMQException + public void visit(Visitor visitor) { synchronized (_lock) { Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Sat Oct 11 23:46:39 2014 @@ -41,7 +41,7 @@ import org.apache.qpid.test.utils.QpidTe public class AMQChannelTest extends QpidTestCase { private VirtualHostImpl _virtualHost; - private AMQProtocolSession _protocolSession; + private AMQProtocolEngine _protocolSession; private Map<Integer,String> _replies; private Broker _broker; @@ -108,7 +108,7 @@ public class AMQChannelTest extends Qpid channel.setPublishFrame(info, e); channel.publishContentHeader(contentHeaderBody); - channel.commit(); + channel.commit(null, false); assertEquals("Unexpected number of replies", 1, _replies.size()); assertEquals("Message authorization passed", "Access Refused", _replies.get(403)); @@ -130,7 +130,7 @@ public class AMQChannelTest extends Qpid channel.setPublishFrame(info, e); channel.publishContentHeader(contentHeaderBody); - channel.commit(); + channel.commit(null, false); assertEquals("Unexpected number of replies", 0, _replies.size()); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Sat Oct 11 23:46:39 2014 @@ -51,7 +51,7 @@ public class AckTest extends QpidTestCas private ConsumerTarget_0_8 _subscriptionTarget; private ConsumerImpl _consumer; - private AMQProtocolSession _protocolSession; + private AMQProtocolEngine _protocolEngine; private TestMemoryMessageStore _messageStore; @@ -68,8 +68,8 @@ public class AckTest extends QpidTestCas super.setUp(); BrokerTestHelper.setUp(); _channel = BrokerTestHelper_0_8.createChannel(5); - _protocolSession = _channel.getProtocolSession(); - _virtualHost = _protocolSession.getVirtualHost(); + _protocolEngine = _channel.getConnection(); + _virtualHost = _protocolEngine.getVirtualHost(); _queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost); _messageStore = (TestMemoryMessageStore)_virtualHost.getMessageStore(); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Sat Oct 11 23:46:39 2014 @@ -25,6 +25,7 @@ import java.util.List; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageCounter; import org.apache.qpid.server.store.MessageStore; @@ -75,7 +76,7 @@ public class AcknowledgeTest extends Qpi private InternalTestProtocolSession getSession() { - return (InternalTestProtocolSession)_channel.getProtocolSession(); + return (InternalTestProtocolSession)_channel.getConnection(); } private AMQQueue getQueue() @@ -129,7 +130,7 @@ public class AcknowledgeTest extends Qpi if (getChannel().isTransactional()) { - getChannel().commit(); + getChannel().commit(null, false); } //Ensure they are stored @@ -140,7 +141,7 @@ public class AcknowledgeTest extends Qpi //Subscribe to the queue AMQShortString subscriber = _channel.consumeFromSource(null, - Collections.singleton(_queue), + Collections.<MessageSource>singleton(_queue), true, null, true, false); getQueue().deliverAsync(); @@ -164,7 +165,7 @@ public class AcknowledgeTest extends Qpi if (getChannel().isTransactional()) { - getChannel().commit(); + getChannel().commit(null, false); } // Check Remaining Acknowledgements Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java Sat Oct 11 23:46:39 2014 @@ -32,7 +32,7 @@ import org.apache.qpid.server.virtualhos public class BrokerTestHelper_0_8 extends BrokerTestHelper { - public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException + public static AMQChannel createChannel(int channelId, AMQProtocolEngine session) throws AMQException { AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore()); session.addChannel(channel); Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Sat Oct 11 23:46:39 2014 @@ -230,7 +230,7 @@ public class InternalTestProtocolSession //Simulate the Client responding with a CloseOK // should really update the StateManger but we don't have access here // changeState(AMQState.CONNECTION_CLOSED); - ((AMQChannel)session).getProtocolSession().closeSession(); + ((AMQChannel)session).getConnection().closeSession(); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Sat Oct 11 23:46:39 2014 @@ -27,6 +27,7 @@ import org.apache.qpid.common.AMQPFilter import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageCounter; import org.apache.qpid.server.store.MessageStore; @@ -77,7 +78,7 @@ public class QueueBrowserUsesNoAckTest e private InternalTestProtocolSession getSession() { - return (InternalTestProtocolSession)_channel.getProtocolSession(); + return (InternalTestProtocolSession)_channel.getConnection(); } private AMQQueue getQueue() @@ -144,6 +145,6 @@ public class QueueBrowserUsesNoAckTest e FieldTable filters = new FieldTable(); filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - return channel.consumeFromSource(null, Collections.singleton(queue), true, filters, true, false); + return channel.consumeFromSource(null, Collections.<MessageSource>singleton(queue), true, filters, true, false); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/AMQChannelException.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/AMQChannelException.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/AMQChannelException.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/AMQChannelException.java Sat Oct 11 23:46:39 2014 @@ -20,8 +20,6 @@ */ package org.apache.qpid; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; @@ -49,10 +47,19 @@ public class AMQChannelException extends } - public AMQFrame getCloseFrame(int channel) + public int getClassId() { - return new AMQFrame(channel, _methodRegistry.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), - AMQShortString.validValueOf(getMessage()),_classId,_methodId)); + return _classId; + } + + public int getMethodId() + { + return _methodId; + } + + public MethodRegistry getMethodRegistry() + { + return _methodRegistry; } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Sat Oct 11 23:46:39 2014 @@ -123,254 +123,264 @@ public class AMQDataBlockDecoder throws AMQFrameDecodingException, IOException { final int classAndMethod = in.readInt(); - switch (classAndMethod) + int classId = classAndMethod >> 16; + int methodId = classAndMethod & 0xFFFF; + dispatcher.setCurrentMethod(classId, methodId); + try { - //CONNECTION_CLASS: - case 0x000a000a: - ConnectionStartBody.process(in, dispatcher); - break; - case 0x000a000b: - ConnectionStartOkBody.process(in, dispatcher); - break; - case 0x000a0014: - ConnectionSecureBody.process(in, dispatcher); - break; - case 0x000a0015: - ConnectionSecureOkBody.process(in, dispatcher); - break; - case 0x000a001e: - ConnectionTuneBody.process(in, dispatcher); - break; - case 0x000a001f: - ConnectionTuneOkBody.process(in, dispatcher); - break; - case 0x000a0028: - ConnectionOpenBody.process(in, dispatcher); - break; - case 0x000a0029: - ConnectionOpenOkBody.process(in, dispatcher); - break; - case 0x000a002a: - ConnectionRedirectBody.process(in, dispatcher); - break; - case 0x000a0032: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { + switch (classAndMethod) + { + //CONNECTION_CLASS: + case 0x000a000a: + ConnectionStartBody.process(in, dispatcher); + break; + case 0x000a000b: + ConnectionStartOkBody.process(in, dispatcher); + break; + case 0x000a0014: + ConnectionSecureBody.process(in, dispatcher); + break; + case 0x000a0015: + ConnectionSecureOkBody.process(in, dispatcher); + break; + case 0x000a001e: + ConnectionTuneBody.process(in, dispatcher); + break; + case 0x000a001f: + ConnectionTuneOkBody.process(in, dispatcher); + break; + case 0x000a0028: + ConnectionOpenBody.process(in, dispatcher); + break; + case 0x000a0029: + ConnectionOpenOkBody.process(in, dispatcher); + break; + case 0x000a002a: ConnectionRedirectBody.process(in, dispatcher); - } - else - { - ConnectionCloseBody.process(in, dispatcher); - } - break; - case 0x000a0033: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), - dispatcher.getProtocolVersion()); - } - else - { - dispatcher.receiveConnectionCloseOk(); - } - break; - case 0x000a003c: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - ConnectionCloseBody.process(in, dispatcher); - } - else - { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), - dispatcher.getProtocolVersion()); - } - break; - case 0x000a003d: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - dispatcher.receiveConnectionCloseOk(); - } - else - { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), - dispatcher.getProtocolVersion()); - } - break; + break; + case 0x000a0032: + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + ConnectionRedirectBody.process(in, dispatcher); + } + else + { + ConnectionCloseBody.process(in, dispatcher); + } + break; + case 0x000a0033: + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + throw newUnknownMethodException(classId, methodId, + dispatcher.getProtocolVersion()); + } + else + { + dispatcher.receiveConnectionCloseOk(); + } + break; + case 0x000a003c: + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + ConnectionCloseBody.process(in, dispatcher); + } + else + { + throw newUnknownMethodException(classId, methodId, + dispatcher.getProtocolVersion()); + } + break; + case 0x000a003d: + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + dispatcher.receiveConnectionCloseOk(); + } + else + { + throw newUnknownMethodException(classId, methodId, + dispatcher.getProtocolVersion()); + } + break; // CHANNEL_CLASS: - case 0x0014000a: - ChannelOpenBody.process(channelId, in, dispatcher); - break; - case 0x0014000b: - ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); - break; - case 0x00140014: - ChannelFlowBody.process(channelId, in, dispatcher); - break; - case 0x00140015: - ChannelFlowOkBody.process(channelId, in, dispatcher); - break; - case 0x0014001e: - ChannelAlertBody.process(channelId, in, dispatcher); - break; - case 0x00140028: - ChannelCloseBody.process(channelId, in, dispatcher); - break; - case 0x00140029: - dispatcher.receiveChannelCloseOk(channelId); - break; - - // ACCESS_CLASS: - - case 0x001e000a: - AccessRequestBody.process(channelId, in, dispatcher); - break; - case 0x001e000b: - AccessRequestOkBody.process(channelId, in, dispatcher); - break; - - // EXCHANGE_CLASS: - - case 0x0028000a: - ExchangeDeclareBody.process(channelId, in, dispatcher); - break; - case 0x0028000b: - dispatcher.receiveExchangeDeclareOk(channelId); - break; - case 0x00280014: - ExchangeDeleteBody.process(channelId, in, dispatcher); - break; - case 0x00280015: - dispatcher.receiveExchangeDeleteOk(channelId); - break; - case 0x00280016: - ExchangeBoundBody.process(channelId, in, dispatcher); - break; - case 0x00280017: - ExchangeBoundOkBody.process(channelId, in, dispatcher); - break; - - - // QUEUE_CLASS: - - case 0x0032000a: - QueueDeclareBody.process(channelId, in, dispatcher); - break; - case 0x0032000b: - QueueDeclareOkBody.process(channelId, in, dispatcher); - break; - case 0x00320014: - QueueBindBody.process(channelId, in, dispatcher); - break; - case 0x00320015: - dispatcher.receiveQueueBindOk(channelId); - break; - case 0x0032001e: - QueuePurgeBody.process(channelId, in, dispatcher); - break; - case 0x0032001f: - QueuePurgeOkBody.process(channelId, in, dispatcher); - break; - case 0x00320028: - QueueDeleteBody.process(channelId, in, dispatcher); - break; - case 0x00320029: - QueueDeleteOkBody.process(channelId, in, dispatcher); - break; - case 0x00320032: - QueueUnbindBody.process(channelId, in, dispatcher); - break; - case 0x00320033: - dispatcher.receiveQueueUnbindOk(channelId); - break; - - - // BASIC_CLASS: - - case 0x003c000a: - BasicQosBody.process(channelId, in, dispatcher); - break; - case 0x003c000b: - dispatcher.receiveBasicQosOk(channelId); - break; - case 0x003c0014: - BasicConsumeBody.process(channelId, in, dispatcher); - break; - case 0x003c0015: - BasicConsumeOkBody.process(channelId, in, dispatcher); - break; - case 0x003c001e: - BasicCancelBody.process(channelId, in, dispatcher); - break; - case 0x003c001f: - BasicCancelOkBody.process(channelId, in, dispatcher); - break; - case 0x003c0028: - BasicPublishBody.process(channelId, in, dispatcher); - break; - case 0x003c0032: - BasicReturnBody.process(channelId, in, dispatcher); - break; - case 0x003c003c: - BasicDeliverBody.process(channelId, in, dispatcher); - break; - case 0x003c0046: - BasicGetBody.process(channelId, in, dispatcher); - break; - case 0x003c0047: - BasicGetOkBody.process(channelId, in, dispatcher); - break; - case 0x003c0048: - BasicGetEmptyBody.process(channelId, in, dispatcher); - break; - case 0x003c0050: - BasicAckBody.process(channelId, in, dispatcher); - break; - case 0x003c005a: - BasicRejectBody.process(channelId, in, dispatcher); - break; - case 0x003c0064: - BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); - break; - case 0x003c0065: - dispatcher.receiveBasicRecoverSyncOk(channelId); - break; - case 0x003c0066: - BasicRecoverSyncBody.process(channelId, in, dispatcher); - break; - case 0x003c006e: - BasicRecoverSyncBody.process(channelId, in, dispatcher); - break; - case 0x003c006f: - dispatcher.receiveBasicRecoverSyncOk(channelId); - break; - - // TX_CLASS: - - case 0x005a000a: - dispatcher.receiveTxSelect(channelId); - break; - case 0x005a000b: - dispatcher.receiveTxSelectOk(channelId); - break; - case 0x005a0014: - dispatcher.receiveTxCommit(channelId); - break; - case 0x005a0015: - dispatcher.receiveTxCommitOk(channelId); - break; - case 0x005a001e: - dispatcher.receiveTxRollback(channelId); - break; - case 0x005a001f: - dispatcher.receiveTxRollbackOk(channelId); - break; - - default: - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), - dispatcher.getProtocolVersion()); + case 0x0014000a: + ChannelOpenBody.process(channelId, in, dispatcher); + break; + case 0x0014000b: + ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); + break; + case 0x00140014: + ChannelFlowBody.process(channelId, in, dispatcher); + break; + case 0x00140015: + ChannelFlowOkBody.process(channelId, in, dispatcher); + break; + case 0x0014001e: + ChannelAlertBody.process(channelId, in, dispatcher); + break; + case 0x00140028: + ChannelCloseBody.process(channelId, in, dispatcher); + break; + case 0x00140029: + dispatcher.receiveChannelCloseOk(channelId); + break; + + // ACCESS_CLASS: + + case 0x001e000a: + AccessRequestBody.process(channelId, in, dispatcher); + break; + case 0x001e000b: + AccessRequestOkBody.process(channelId, in, dispatcher); + break; + + // EXCHANGE_CLASS: + + case 0x0028000a: + ExchangeDeclareBody.process(channelId, in, dispatcher); + break; + case 0x0028000b: + dispatcher.receiveExchangeDeclareOk(channelId); + break; + case 0x00280014: + ExchangeDeleteBody.process(channelId, in, dispatcher); + break; + case 0x00280015: + dispatcher.receiveExchangeDeleteOk(channelId); + break; + case 0x00280016: + ExchangeBoundBody.process(channelId, in, dispatcher); + break; + case 0x00280017: + ExchangeBoundOkBody.process(channelId, in, dispatcher); + break; + + + // QUEUE_CLASS: + + case 0x0032000a: + QueueDeclareBody.process(channelId, in, dispatcher); + break; + case 0x0032000b: + QueueDeclareOkBody.process(channelId, in, dispatcher); + break; + case 0x00320014: + QueueBindBody.process(channelId, in, dispatcher); + break; + case 0x00320015: + dispatcher.receiveQueueBindOk(channelId); + break; + case 0x0032001e: + QueuePurgeBody.process(channelId, in, dispatcher); + break; + case 0x0032001f: + QueuePurgeOkBody.process(channelId, in, dispatcher); + break; + case 0x00320028: + QueueDeleteBody.process(channelId, in, dispatcher); + break; + case 0x00320029: + QueueDeleteOkBody.process(channelId, in, dispatcher); + break; + case 0x00320032: + QueueUnbindBody.process(channelId, in, dispatcher); + break; + case 0x00320033: + dispatcher.receiveQueueUnbindOk(channelId); + break; + + + // BASIC_CLASS: + + case 0x003c000a: + BasicQosBody.process(channelId, in, dispatcher); + break; + case 0x003c000b: + dispatcher.receiveBasicQosOk(channelId); + break; + case 0x003c0014: + BasicConsumeBody.process(channelId, in, dispatcher); + break; + case 0x003c0015: + BasicConsumeOkBody.process(channelId, in, dispatcher); + break; + case 0x003c001e: + BasicCancelBody.process(channelId, in, dispatcher); + break; + case 0x003c001f: + BasicCancelOkBody.process(channelId, in, dispatcher); + break; + case 0x003c0028: + BasicPublishBody.process(channelId, in, dispatcher); + break; + case 0x003c0032: + BasicReturnBody.process(channelId, in, dispatcher); + break; + case 0x003c003c: + BasicDeliverBody.process(channelId, in, dispatcher); + break; + case 0x003c0046: + BasicGetBody.process(channelId, in, dispatcher); + break; + case 0x003c0047: + BasicGetOkBody.process(channelId, in, dispatcher); + break; + case 0x003c0048: + BasicGetEmptyBody.process(channelId, in, dispatcher); + break; + case 0x003c0050: + BasicAckBody.process(channelId, in, dispatcher); + break; + case 0x003c005a: + BasicRejectBody.process(channelId, in, dispatcher); + break; + case 0x003c0064: + BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); + break; + case 0x003c0065: + dispatcher.receiveBasicRecoverSyncOk(channelId); + break; + case 0x003c0066: + BasicRecoverSyncBody.process(channelId, in, dispatcher); + break; + case 0x003c006e: + BasicRecoverSyncBody.process(channelId, in, dispatcher); + break; + case 0x003c006f: + dispatcher.receiveBasicRecoverSyncOk(channelId); + break; + + // TX_CLASS: + + case 0x005a000a: + dispatcher.receiveTxSelect(channelId); + break; + case 0x005a000b: + dispatcher.receiveTxSelectOk(channelId); + break; + case 0x005a0014: + dispatcher.receiveTxCommit(channelId); + break; + case 0x005a0015: + dispatcher.receiveTxCommitOk(channelId); + break; + case 0x005a001e: + dispatcher.receiveTxRollback(channelId); + break; + case 0x005a001f: + dispatcher.receiveTxRollbackOk(channelId); + break; + default: + throw newUnknownMethodException(classId, methodId, + dispatcher.getProtocolVersion()); + + } + } + finally + { + dispatcher.setCurrentMethod(0,0); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java Sat Oct 11 23:46:39 2014 @@ -38,6 +38,13 @@ public class ExchangeBoundOkBody extends public static final int CLASS_ID = 40; public static final int METHOD_ID = 23; + public static final int OK = 0; + public static final int EXCHANGE_NOT_FOUND = 1; + public static final int QUEUE_NOT_FOUND = 2; + public static final int NO_BINDINGS = 3; + public static final int QUEUE_NOT_BOUND = 4; + public static final int NO_QUEUE_BOUND_WITH_RK = 5; + public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6; // Fields declared in specification private final int _replyCode; // [replyCode] Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java Sat Oct 11 23:46:39 2014 @@ -28,6 +28,8 @@ public class FrameCreatingMethodProcesso private ProtocolVersion _protocolVersion; private final List<AMQDataBlock> _processedMethods = new ArrayList<>(); + private int _classId; + private int _methodId; public FrameCreatingMethodProcessor(final ProtocolVersion protocolVersion) { @@ -522,4 +524,21 @@ public class FrameCreatingMethodProcesso { _processedMethods.add(protocolInitiation); } + + @Override + public void setCurrentMethod(final int classId, final int methodId) + { + _classId = classId; + _methodId = methodId; + } + + public int getClassId() + { + return _classId; + } + + public int getMethodId() + { + return _methodId; + } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java?rev=1631137&r1=1631136&r2=1631137&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java Sat Oct 11 23:46:39 2014 @@ -198,4 +198,6 @@ public interface MethodProcessor void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize); void receiveProtocolHeader(ProtocolInitiation protocolInitiation); + + void setCurrentMethod(int classId, int methodId); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
