Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Fri Oct 17 14:23:19 2014 @@ -25,7 +25,7 @@ import java.nio.ByteBuffer; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.store.StoredMessage; public class MockStoredMessage implements StoredMessage<MessageMetaData> @@ -41,7 +41,7 @@ public class MockStoredMessage implement public MockStoredMessage(long messageId, String headerName, Object headerValue) { - this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60), headerName, headerValue); + this(messageId, new MessagePublishInfo(null, false, false, null), new ContentHeaderBody(new BasicContentHeaderProperties()), headerName, headerValue); } public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Fri Oct 17 14:23:19 2014 @@ -27,6 +27,7 @@ import org.apache.qpid.common.AMQPFilter import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageCounter; import org.apache.qpid.server.store.MessageStore; @@ -77,7 +78,7 @@ public class QueueBrowserUsesNoAckTest e private InternalTestProtocolSession getSession() { - return (InternalTestProtocolSession)_channel.getProtocolSession(); + return (InternalTestProtocolSession)_channel.getConnection(); } private AMQQueue getQueue() @@ -144,6 +145,6 @@ public class QueueBrowserUsesNoAckTest e FieldTable filters = new FieldTable(); filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - return channel.consumeFromSource(null, Collections.singleton(queue), true, filters, true, false); + return channel.consumeFromSource(null, Collections.<MessageSource>singleton(queue), true, filters, true, false); } } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java Fri Oct 17 14:23:19 2014 @@ -23,10 +23,9 @@ package org.apache.qpid.server.protocol. import java.util.UUID; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.store.MessageCounter; @@ -57,36 +56,7 @@ public class ReferenceCountingTest exten { ContentHeaderBody chb = createPersistentContentHeader(); - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; - - + MessagePublishInfo info = new MessagePublishInfo(null, false, false, null); final MessageMetaData mmd = new MessageMetaData(info, chb); @@ -114,44 +84,16 @@ public class ReferenceCountingTest exten private ContentHeaderBody createPersistentContentHeader() { - ContentHeaderBody chb = new ContentHeaderBody(); BasicContentHeaderProperties bchp = new BasicContentHeaderProperties(); bchp.setDeliveryMode((byte)2); - chb.setProperties(bchp); + ContentHeaderBody chb = new ContentHeaderBody(bchp); return chb; } public void testMessageRemains() throws AMQException { - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; + MessagePublishInfo info = new MessagePublishInfo(null, false, false, null); final ContentHeaderBody chb = createPersistentContentHeader(); Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Fri Oct 17 14:23:19 2014 @@ -30,7 +30,7 @@ import org.apache.qpid.framing.AMQShortS import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; @@ -239,7 +239,7 @@ public class MessageConverter_0_10_to_0_ private ContentHeaderBody convertContentHeaderBody(MessageTransferMessage message, VirtualHostImpl vhost) { BasicContentHeaderProperties props = convertContentHeaderProperties(message, vhost); - ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); + ContentHeaderBody chb = new ContentHeaderBody(props); chb.setBodySize(message.getSize()); return chb; } @@ -256,38 +256,7 @@ public class MessageConverter_0_10_to_0_ final boolean immediate = delvProps != null && delvProps.getImmediate(); final boolean mandatory = delvProps != null && !delvProps.getDiscardUnroutable(); - return new MessagePublishInfo() - { - @Override - public AMQShortString getExchange() - { - return exchangeName; - } - - @Override - public void setExchange(AMQShortString exchange) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isImmediate() - { - return immediate; - } - - @Override - public boolean isMandatory() - { - return mandatory; - } - - @Override - public AMQShortString getRoutingKey() - { - return routingKey; - } - }; + return new MessagePublishInfo(exchangeName, immediate, mandatory, routingKey); } @Override Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Fri Oct 17 14:23:19 2014 @@ -28,7 +28,7 @@ import org.apache.qpid.framing.AMQShortS import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.protocol.v0_8.AMQMessage; @@ -137,45 +137,13 @@ public class MessageConverter_1_0_to_v0_ { final MessageMetaData_1_0.MessageHeader_1_0 header = serverMsg.getMessageHeader(); + String key = header.getTo(); + if(key == null) + { + key = header.getSubject(); + } - MessagePublishInfo publishInfo = new MessagePublishInfo() - { - @Override - public AMQShortString getExchange() - { - return null; - } - - @Override - public void setExchange(final AMQShortString amqShortString) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isImmediate() - { - return false; - } - - @Override - public boolean isMandatory() - { - return false; - } - - @Override - public AMQShortString getRoutingKey() - { - String key = header.getTo(); - if(key == null) - { - key = header.getSubject(); - } - - return AMQShortString.valueOf(key); - } - }; + MessagePublishInfo publishInfo = new MessagePublishInfo(null, false, false, AMQShortString.valueOf(key)); final BasicContentHeaderProperties props = new BasicContentHeaderProperties(); @@ -204,7 +172,7 @@ public class MessageConverter_1_0_to_v0_ props.setHeaders(FieldTable.convertToFieldTable(headerProps)); - final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); + final ContentHeaderBody chb = new ContentHeaderBody(props); chb.setBodySize(size); return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime()); Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Oct 17 14:23:19 2014 @@ -62,8 +62,6 @@ import org.apache.qpid.client.state.list import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; -import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -316,21 +314,12 @@ public class AMQSession_0_8 extends AMQS if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0)) { BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); - getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class); - } - else if(getProtocolVersion().equals(ProtocolVersion.v0_9)) - { - BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false); - getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); - } - else if(getProtocolVersion().equals(ProtocolVersion.v0_91)) - { - BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false); getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } else { - throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion()); + BasicRecoverSyncBody body = getMethodRegistry().createBasicRecoverSyncBody(false); + getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } } } @@ -1148,33 +1137,22 @@ public class AMQSession_0_8 extends AMQS if (isBound(null, AMQShortString.valueOf(queue), null)) { - MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); - AMQMethodBody body; - if (methodRegistry instanceof MethodRegistry_0_9) + + if(ProtocolVersion.v8_0.equals(getProtocolVersion())) { - String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); + } - MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry; - body = methodRegistry_0_9.createQueueUnbindBody(getTicket(), + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + + String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + AMQMethodBody body = methodRegistry.createQueueUnbindBody(getTicket(), AMQShortString.valueOf(queue), AMQShortString.valueOf(exchange), AMQShortString.valueOf(bindingKey), null); - } - else if (methodRegistry instanceof MethodRegistry_0_91) - { - MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry; - body = methodRegistry_0_91.createQueueUnbindBody(getTicket(), - AMQShortString.valueOf(queue), - AMQShortString.valueOf(exchange), - AMQShortString.valueOf(binding.getBindingKey()), - null); - } - else - { - throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); - } getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class); return null; } Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Fri Oct 17 14:23:19 2014 @@ -216,7 +216,7 @@ public class BasicMessageProducer_0_8 ex AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(getChannelId(), - classIfForBasic, 0, contentHeaderProperties, size); + contentHeaderProperties, size); if(contentHeaderFrame.getSize() > getSession().getAMQConnection().getMaximumFrameSize()) { throw new JMSException("Unable to send message as the headers are too large (" Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Fri Oct 17 14:23:19 2014 @@ -125,6 +125,28 @@ public class ClientMethodDispatcherImpl return false; } + @Override + public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody body, final int channelId) + throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + @Override + public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody, + final int channelId) + throws AMQException + { + return false; + } + + @Override + public boolean dispatchChannelAlert(final ChannelAlertBody channelAlertBody, final int channelId) + throws AMQException + { + return false; + } + public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException { _basicCancelOkMethodHandler.methodReceived(_session, body, channelId); @@ -244,16 +266,6 @@ public class ClientMethodDispatcherImpl return false; } - public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); @@ -324,16 +336,6 @@ public class ClientMethodDispatcherImpl throw new AMQMethodNotImplementedException(body); } - public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); @@ -349,36 +351,6 @@ public class ClientMethodDispatcherImpl throw new AMQMethodNotImplementedException(body); } - public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); @@ -399,30 +371,6 @@ public class ClientMethodDispatcherImpl throw new AMQMethodNotImplementedException(body); } - public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException { @@ -439,14 +387,17 @@ public class ClientMethodDispatcherImpl throw new AMQMethodNotImplementedException(body); } - public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException + @Override + public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException { - throw new AMQMethodNotImplementedException(body); + return false; } - public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException + @Override + public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody basicRecoverSyncBody, final int channelId) + throws AMQException { - throw new AMQMethodNotImplementedException(body); + return false; } public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException @@ -465,46 +416,6 @@ public class ClientMethodDispatcherImpl return false; } - public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException { return false; @@ -515,21 +426,6 @@ public class ClientMethodDispatcherImpl return false; } - public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException { return false; Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java Fri Oct 17 14:23:19 2014 @@ -23,10 +23,14 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQMethodNotImplementedException; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; +import org.apache.qpid.framing.BasicRecoverSyncBody; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; +import org.apache.qpid.framing.ChannelAlertBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; -public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9 +public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher { public ClientMethodDispatcherImpl_0_9(AMQProtocolSession session) { @@ -38,106 +42,18 @@ public class ClientMethodDispatcherImpl_ return false; } - public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException + @Override + public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId) + throws AMQException { throw new AMQMethodNotImplementedException(body); } - public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException + public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); } - public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java Fri Oct 17 14:23:19 2014 @@ -23,10 +23,14 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQMethodNotImplementedException; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91; +import org.apache.qpid.framing.BasicRecoverSyncBody; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; +import org.apache.qpid.framing.ChannelAlertBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; -public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_91 +public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl implements MethodDispatcher { public ClientMethodDispatcherImpl_0_91(AMQProtocolSession session) { @@ -38,119 +42,26 @@ public class ClientMethodDispatcherImpl_ return false; } - public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException + @Override + public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId) + throws AMQException { throw new AMQMethodNotImplementedException(body); } - public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException + public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); } - public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); } - public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException { return false; } -} \ No newline at end of file +} Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java Fri Oct 17 14:23:19 2014 @@ -22,72 +22,49 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; +import org.apache.qpid.framing.BasicRecoverSyncBody; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; import org.apache.qpid.framing.ChannelAlertBody; -import org.apache.qpid.framing.TestContentBody; -import org.apache.qpid.framing.TestContentOkBody; -import org.apache.qpid.framing.TestIntegerBody; -import org.apache.qpid.framing.TestIntegerOkBody; -import org.apache.qpid.framing.TestStringBody; -import org.apache.qpid.framing.TestStringOkBody; -import org.apache.qpid.framing.TestTableBody; -import org.apache.qpid.framing.TestTableOkBody; -import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; -public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0 +public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher { public ClientMethodDispatcherImpl_8_0(AMQProtocolSession session) { super(session); } - public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException { return false; } - public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException + @Override + public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody queueUnbindOkBody, final int channelId) { return false; } - public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException + @Override + public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody, + final int channelId) { return false; } - public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException + @Override + public boolean dispatchQueueUnbind(final QueueUnbindBody body, final int channelId) throws AMQException { - return false; - } - - public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException - { - return false; + throw new AMQMethodNotImplementedException(body); } - public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException + @Override + public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, final int channelId) + throws AMQException { - return false; + throw new AMQMethodNotImplementedException(body); } } Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Oct 17 14:23:19 2014 @@ -23,8 +23,8 @@ package org.apache.qpid.client.protocol; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; @@ -48,6 +48,7 @@ import org.apache.qpid.client.state.AMQS import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.codec.ClientDecoder; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; @@ -193,7 +194,7 @@ public class AMQProtocolHandler implemen _connection = con; _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); - _decoder = new AMQDecoder(false, _protocolSession); + _decoder = new ClientDecoder(_protocolSession.getMethodProcessor()); _failoverHandler = new FailoverHandler(this); } @@ -459,9 +460,10 @@ public class AMQProtocolHandler implemen { _readBytes += msg.remaining(); _lastReadTime = System.currentTimeMillis(); + final List<AMQDataBlock> dataBlocks = _protocolSession.getMethodProcessor().getProcessedMethods(); try { - final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); + _decoder.decodeBuffer(msg); // Decode buffer int size = dataBlocks.size(); @@ -511,6 +513,10 @@ public class AMQProtocolHandler implemen propagateExceptionToFrameListeners(e); exception(e); } + finally + { + dataBlocks.clear(); + } } @@ -753,8 +759,12 @@ public class AMQProtocolHandler implemen // Connection is already closed then don't do a syncWrite try { - final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection."), 0, 0); + final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody( + AMQConstant.REPLY_SUCCESS.getCode(), + // replyCode + new AMQShortString("JMS client is closing the connection."), + 0, + 0); final AMQFrame frame = body.generateFrame(0); syncWrite(frame, ConnectionCloseOkBody.class, timeout); Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Oct 17 14:23:19 2014 @@ -44,6 +44,7 @@ import org.apache.qpid.framing.AMQShortS import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FrameCreatingMethodProcessor; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; @@ -88,8 +89,11 @@ public class AMQProtocolSession implemen private ProtocolVersion _protocolVersion; - private MethodRegistry _methodRegistry = - MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion()); + private final MethodRegistry _methodRegistry = + new MethodRegistry(ProtocolVersion.getLatestSupportedVersion()); + + private final FrameCreatingMethodProcessor _methodProcessor = + new FrameCreatingMethodProcessor(ProtocolVersion.getLatestSupportedVersion()); private MethodDispatcher _methodDispatcher; @@ -416,7 +420,8 @@ public class AMQProtocolSession implemen _logger.debug("Setting ProtocolVersion to :" + pv); } _protocolVersion = pv; - _methodRegistry = MethodRegistry.getMethodRegistry(pv); + _methodRegistry.setProtocolVersion(pv); + _methodProcessor.setProtocolVersion(pv); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); } @@ -549,4 +554,9 @@ public class AMQProtocolSession implemen { _protocolHandler.setMaxFrameSize(frameMax); } + + public FrameCreatingMethodProcessor getMethodProcessor() + { + return _methodProcessor; + } } Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java (original) +++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java Fri Oct 17 14:23:19 2014 @@ -23,7 +23,7 @@ package org.apache.qpid.client; import org.apache.qpid.AMQException; import org.apache.qpid.client.transport.TestNetworkConnection; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.amqp_0_91.QueueDeclareOkBodyImpl; +import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.url.AMQBindingURL; @@ -50,7 +50,7 @@ public class AMQSession_0_8Test extends { try { - _connection.getProtocolHandler().methodBodyReceived(1, new QueueDeclareOkBodyImpl(testQueueName, 0, 0)); + _connection.getProtocolHandler().methodBodyReceived(1, new QueueDeclareOkBody(testQueueName, 0, 0)); } catch (AMQException e) { Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original) +++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Fri Oct 17 14:23:19 2014 @@ -20,6 +20,9 @@ */ package org.apache.qpid.client.protocol; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import junit.framework.TestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,12 +35,10 @@ import org.apache.qpid.client.transport. import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - /** * This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception. * @@ -75,7 +76,7 @@ public class AMQProtocolHandlerTest exte //Create a new ProtocolHandler with a fake connection. _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'")); _handler.setNetworkConnection(new TestNetworkConnection()); - AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); + AMQBody body = new BasicRecoverSyncOkBody(ProtocolVersion.v8_0); _blockFrame = new AMQFrame(0, body); _handleCountDown = new CountDownLatch(1); Modified: qpid/trunk/qpid/java/common/build-generate-sources.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/build-generate-sources.xml?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/build-generate-sources.xml (original) +++ qpid/trunk/qpid/java/common/build-generate-sources.xml Fri Oct 17 14:23:19 2014 @@ -23,11 +23,7 @@ <fail message="A required property was not set"> <condition> <not> - <and> - <isset property="generated-amqp-0-8-dir"/> - <isset property="generated-amqp-0-10-dir"/> - <isset property="gentools.classes"/> - </and> + <isset property="generated-amqp-0-10-dir"/> </not> </condition> </fail> @@ -39,14 +35,8 @@ <property name="mllib.dir" value="${basedir}/../../python"/> <property name="xml.spec.dir" value="${basedir}/../../specs"/> - <property name="xml.spec.0-8" value="amqp0-8-qpid.stripped.xml"/> - <property name="xml.spec.0-9" value="amqp0-9-qpid.stripped.xml"/> - <property name="xml.spec.0-9-1" value="amqp0-9-1.stripped.xml"/> <property name="xml.spec.0-10" value="amqp.0-10-qpid-errata.stripped.xml"/> - <property name="xml.spec.deps.0-8" value="${xml.spec.0-8} ${xml.spec.0-9} ${xml.spec.0-9-1}"/> - <property name="xml.spec.list.0-8" value='"${xml.spec.dir}/${xml.spec.0-8}" "${xml.spec.dir}/${xml.spec.0-9}" "${xml.spec.dir}/${xml.spec.0-9-1}"'/> - <property name="gentools.src" value="${basedir}/gentools/src"/> <property name="generated.package" value="org/apache/qpid/framing" /> @@ -65,27 +55,6 @@ </java> </target> - <target name="compile-gentools"> - <mkdir dir="${gentools.classes}"/> - <javac srcdir="${gentools.src}" destdir="${gentools.classes}" source="${java.source}" target="${java.target}" fork="true" includeantruntime="false"> - <classpath> - <path refid="source.generation.classpathref"/> - </classpath> - </javac> - </target> - - <target name="amqp-0-8-generation" depends="compile-gentools" if="generate.protocol.sources"> - <mkdir dir="${generated-amqp-0-8-dir}/${generated.package}"/> - <echo message="Generating AMQP 0-8/0-9/0-9-1 protocol classes..."/> - <java classname="org.apache.qpid.gentools.Main" fork="true" dir="${gentools.classes}" failonerror="true"> - <arg line='-j -o "${generated-amqp-0-8-dir}/${generated.package}" -t "${basedir}/templates" ${xml.spec.list.0-8}'/> - <classpath> - <pathelement path="${gentools.classes}" /> - <path refid="source.generation.classpathref"/> - </classpath> - </java> - </target> - - <target name="generate-sources" depends="amqp-0-8-generation, amqp-0-10-generation"/> + <target name="generate-sources" depends="amqp-0-10-generation"/> </project> Modified: qpid/trunk/qpid/java/common/pom.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/pom.xml?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/pom.xml (original) +++ qpid/trunk/qpid/java/common/pom.xml Fri Oct 17 14:23:19 2014 @@ -111,7 +111,6 @@ <target> <ant antfile="build-generate-sources.xml"> <reference torefid="source.generation.classpathref" refid="maven.plugin.classpath" /> - <property name="gentools.classes" value="${project.build.directory}/gentools-classes" /> <property name="build.compiler" value="extJavac" /> </ant> </target> @@ -120,16 +119,6 @@ </executions> <dependencies> <dependency> - <groupId>velocity</groupId> - <artifactId>velocity</artifactId> - <version>${velocity-version}</version> - </dependency> - <dependency> - <groupId>velocity</groupId> - <artifactId>velocity-dep</artifactId> - <version>${velocity-version}</version> - </dependency> - <dependency> <groupId>org.python</groupId> <artifactId>jython-standalone</artifactId> <version>${jython-version}</version> Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java Fri Oct 17 14:23:19 2014 @@ -20,10 +20,7 @@ */ package org.apache.qpid; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; /** @@ -34,24 +31,35 @@ public class AMQChannelException extends private final int _classId; private final int _methodId; /* AMQP version for which exception ocurred */ - private final byte major; - private final byte minor; + private final MethodRegistry _methodRegistry; - public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, - Throwable cause) + + public AMQChannelException(AMQConstant errorCode, + String msg, + int classId, + int methodId, + MethodRegistry methodRegistry) { - super(errorCode, msg, cause); + super(errorCode, msg); _classId = classId; _methodId = methodId; - this.major = major; - this.minor = minor; + _methodRegistry = methodRegistry; + + } + + public int getClassId() + { + return _classId; + } + + public int getMethodId() + { + return _methodId; } - public AMQFrame getCloseFrame(int channel) + public MethodRegistry getMethodRegistry() { - MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor)); - return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), - AMQShortString.validValueOf(getMessage()),_classId,_methodId)); + return _methodRegistry; } } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java Fri Oct 17 14:23:19 2014 @@ -22,9 +22,9 @@ package org.apache.qpid; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; /** @@ -35,32 +35,30 @@ public class AMQConnectionException exte private final int _classId; private final int _methodId; - /** AMQP version for which exception ocurred, major code. */ - private final byte major; + private final MethodRegistry _methodRegistry; - /** AMQP version for which exception ocurred, minor code. */ - private final byte minor; - - private boolean _closeConnetion; + public AMQConnectionException(AMQConstant errorCode, String msg, AMQMethodBody body, MethodRegistry methodRegistry) + { + this(errorCode, msg, body.getClazz(), body.getMethod(), methodRegistry, null); + } - public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, - Throwable cause) + public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, MethodRegistry methodRegistry, + Throwable cause) { super(errorCode, msg, cause); _classId = classId; _methodId = methodId; - this.major = major; - this.minor = minor; + _methodRegistry = methodRegistry; + } - public AMQFrame getCloseFrame(int channel) + public AMQFrame getCloseFrame() { - MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor)); return new AMQFrame(0, - reg.createConnectionCloseBody(getErrorCode().getCode(), - AMQShortString.validValueOf(getMessage()), - _classId, - _methodId)); + _methodRegistry.createConnectionCloseBody(getErrorCode().getCode(), + AMQShortString.validValueOf(getMessage()), + _classId, + _methodId)); } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java Fri Oct 17 14:23:19 2014 @@ -20,7 +20,6 @@ */ package org.apache.qpid; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.protocol.AMQConstant; /** @@ -72,7 +71,6 @@ public class AMQException extends Except this(null, (msg == null) ? "" : msg); } - @Deprecated public AMQException(AMQConstant errorCode, String msg) { this(errorCode, (msg == null) ? "" : msg, null); Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Fri Oct 17 14:23:19 2014 @@ -30,16 +30,8 @@ import java.util.ArrayList; import java.util.List; import java.util.ListIterator; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQDataBlockDecoder; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.AMQMethodBodyFactory; -import org.apache.qpid.framing.AMQProtocolVersionException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ByteArrayDataInput; -import org.apache.qpid.framing.EncodingUtils; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.framing.*; +import org.apache.qpid.protocol.AMQConstant; /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a @@ -53,10 +45,9 @@ import org.apache.qpid.protocol.AMQVersi * TODO If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the * per-session overhead. */ -public class AMQDecoder +public abstract class AMQDecoder<T extends MethodProcessor> { - /** Holds the 'normal' AMQP data decoder. */ - private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); + private final T _methodProcessor; /** Holds the protocol initiation decoder. */ private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder(); @@ -64,26 +55,26 @@ public class AMQDecoder /** Flag to indicate whether this decoder needs to handle protocol initiation. */ private boolean _expectProtocolInitiation; - private AMQMethodBodyFactory _bodyFactory; private boolean _firstRead = true; + private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode(); + private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>(); /** * Creates a new AMQP decoder. * * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation. - * @param session protocol session (connection) + * @param methodProcessor method processor */ - public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) + protected AMQDecoder(boolean expectProtocolInitiation, T methodProcessor) { _expectProtocolInitiation = expectProtocolInitiation; - _bodyFactory = new AMQMethodBodyFactory(session); + _methodProcessor = methodProcessor; } - /** * Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol * initation decoder. This method is expected to be called with <tt>false</tt> once protocol initation completes. @@ -98,7 +89,12 @@ public class AMQDecoder public void setMaxFrameSize(final int frameMax) { - _dataBlockDecoder.setMaxFrameSize(frameMax); + _maxFrameSize = frameMax; + } + + public T getMethodProcessor() + { + return _methodProcessor; } private class RemainingByteArrayInputStream extends InputStream @@ -219,14 +215,13 @@ public class AMQDecoder } - public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException + public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { - // get prior remaining data from accumulator - ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); MarkableDataInput msg; + // get prior remaining data from accumulator ByteArrayInputStream bais; DataInput di; if(!_remainingBufs.isEmpty()) @@ -257,10 +252,10 @@ public class AMQDecoder { if(!_expectProtocolInitiation) { - enoughData = _dataBlockDecoder.decodable(msg); + enoughData = decodable(msg); if (enoughData) { - dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg)); + processInput(msg); } } else @@ -268,7 +263,7 @@ public class AMQDecoder enoughData = _piDecoder.decodable(msg); if (enoughData) { - dataBlocks.add(new ProtocolInitiation(msg)); + _methodProcessor.receiveProtocolHeader(new ProtocolInitiation(msg)); } } @@ -305,6 +300,106 @@ public class AMQDecoder } } } - return dataBlocks; } + + private boolean decodable(final MarkableDataInput in) throws AMQFrameDecodingException, IOException + { + final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1); + // type, channel, body length and end byte + if (remainingAfterAttributes < 0) + { + return false; + } + + in.mark(8); + in.skip(1 + 2); + + + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() + final long bodySize = in.readInt() & 0xffffffffL; + if (bodySize > _maxFrameSize) + { + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "Incoming frame size of " + + bodySize + + " is larger than negotiated maximum of " + + _maxFrameSize); + } + in.reset(); + + return (remainingAfterAttributes >= bodySize); + + } + + private void processInput(final MarkableDataInput in) + throws AMQFrameDecodingException, AMQProtocolVersionException, IOException + { + final byte type = in.readByte(); + + final int channel = in.readUnsignedShort(); + final long bodySize = EncodingUtils.readUnsignedInteger(in); + + // bodySize can be zero + if ((channel < 0) || (bodySize < 0)) + { + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); + } + + processFrame(channel, type, bodySize, in); + + byte marker = in.readByte(); + if ((marker & 0xFF) != 0xCE) + { + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type); + } + + } + + protected void processFrame(final int channel, final byte type, final long bodySize, final MarkableDataInput in) + throws AMQFrameDecodingException, IOException + { + switch (type) + { + case 1: + processMethod(channel, in); + break; + case 2: + ContentHeaderBody.process(in, _methodProcessor.getChannelMethodProcessor(channel), bodySize); + break; + case 3: + ContentBody.process(in, _methodProcessor.getChannelMethodProcessor(channel), bodySize); + break; + case 8: + HeartbeatBody.process(channel, in, _methodProcessor, bodySize); + break; + default: + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type); + } + } + + + abstract void processMethod(int channelId, + MarkableDataInput in) + throws AMQFrameDecodingException, IOException; + + AMQFrameDecodingException newUnknownMethodException(final int classId, + final int methodId, + ProtocolVersion protocolVersion) + { + return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID, + "Method " + + methodId + + " unknown in AMQP version " + + protocolVersion + + " (while trying to decode class " + + classId + + " method " + + methodId + + "."); + } + } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Fri Oct 17 14:23:19 2014 @@ -20,8 +20,6 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.codec.MarkableDataInput; - import java.io.DataOutput; import java.io.IOException; @@ -39,12 +37,6 @@ public class AMQFrame extends AMQDataBlo _bodyFrame = bodyFrame; } - public AMQFrame(final MarkableDataInput in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException - { - this._channel = channel; - this._bodyFrame = bodyFactory.createBody(in,bodySize); - } - public long getSize() { return 1 + 2 + 4 + _bodyFrame.getSize() + 1; Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Fri Oct 17 14:23:19 2014 @@ -20,28 +20,18 @@ */ package org.apache.qpid.framing; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; -import java.io.DataOutput; -import java.io.IOException; - public interface AMQMethodBody extends AMQBody { public static final byte TYPE = 1; - /** AMQP major version - * @return the major version*/ - public byte getMajor(); - - /** AMQP minor version - * @return the minor version*/ - public byte getMinor(); - - - /** @return unsigned short */ public int getClazz(); @@ -66,18 +56,18 @@ public interface AMQMethodBody extends A * * @param channelId The channel id that is not found * + * @param methodRegistry * @return new AMQChannelException */ - public AMQChannelException getChannelNotFoundException(int channelId); - - public AMQChannelException getChannelException(AMQConstant code, String message); - - public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause); - - public AMQConnectionException getConnectionException(AMQConstant code, String message); - + public AMQChannelException getChannelNotFoundException(int channelId, final MethodRegistry methodRegistry); - public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause); + public AMQChannelException getChannelException(AMQConstant code, + String message, + final MethodRegistry methodRegistry); + + public AMQConnectionException getConnectionException(AMQConstant code, + String message, + final MethodRegistry methodRegistry); public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException; Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Fri Oct 17 14:23:19 2014 @@ -21,17 +21,16 @@ package org.apache.qpid.framing; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - public abstract class AMQMethodBodyImpl implements AMQMethodBody { public static final byte TYPE = 1; @@ -67,31 +66,26 @@ public abstract class AMQMethodBodyImpl * * @param channelId The channel id that is not found * + * @param methodRegistry * @return new AMQChannelException */ - public AMQChannelException getChannelNotFoundException(int channelId) - { - return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId); - } - - public AMQChannelException getChannelException(AMQConstant code, String message) + public AMQChannelException getChannelNotFoundException(int channelId, final MethodRegistry methodRegistry) { - return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), null); + return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId, methodRegistry); } - public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) + public AMQChannelException getChannelException(AMQConstant code, + String message, + final MethodRegistry methodRegistry) { - return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause); + return new AMQChannelException(code, message, getClazz(), getMethod(), methodRegistry); } - public AMQConnectionException getConnectionException(AMQConstant code, String message) + public AMQConnectionException getConnectionException(AMQConstant code, + String message, + final MethodRegistry methodRegistry) { - return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), null); - } - - public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) - { - return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause); + return new AMQConnectionException(code, message, this, methodRegistry); } public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException @@ -112,17 +106,6 @@ public abstract class AMQMethodBodyImpl } - protected byte readByte(DataInput buffer) throws IOException - { - return buffer.readByte(); - } - - protected AMQShortString readAMQShortString(MarkableDataInput buffer) throws IOException - { - AMQShortString str = buffer.readAMQShortString(); - return str == null ? null : str.intern(false); - } - protected int getSizeOf(AMQShortString string) { return EncodingUtils.encodedShortStringLength(string); @@ -148,11 +131,6 @@ public abstract class AMQMethodBodyImpl buffer.writeInt(i); } - protected FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException - { - return EncodingUtils.readFieldTable(buffer); - } - protected int getSizeOf(FieldTable table) { return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. @@ -163,11 +141,6 @@ public abstract class AMQMethodBodyImpl EncodingUtils.writeFieldTableBytes(buffer, table); } - protected long readLong(DataInput buffer) throws IOException - { - return buffer.readLong(); - } - protected void writeLong(DataOutput buffer, long l) throws IOException { buffer.writeLong(l); @@ -183,11 +156,6 @@ public abstract class AMQMethodBodyImpl EncodingUtils.writeBytes(buffer,data); } - protected byte[] readBytes(DataInput buffer) throws IOException - { - return EncodingUtils.readBytes(buffer); - } - protected short readShort(DataInput buffer) throws IOException { return EncodingUtils.readShort(buffer); @@ -198,30 +166,6 @@ public abstract class AMQMethodBodyImpl EncodingUtils.writeShort(buffer, s); } - protected Content readContent(DataInput buffer) - { - return null; - } - - protected int getSizeOf(Content body) - { - return 0; - } - - protected void writeContent(DataOutput buffer, Content body) - { - } - - protected byte readBitfield(DataInput buffer) throws IOException - { - return readByte(buffer); - } - - protected int readUnsignedShort(DataInput buffer) throws IOException - { - return buffer.readUnsignedShort(); - } - protected void writeBitfield(DataOutput buffer, byte bitfield0) throws IOException { buffer.writeByte(bitfield0); @@ -232,21 +176,12 @@ public abstract class AMQMethodBodyImpl EncodingUtils.writeUnsignedShort(buffer, s); } - protected long readUnsignedInteger(DataInput buffer) throws IOException - { - return EncodingUtils.readUnsignedInteger(buffer); - } protected void writeUnsignedInteger(DataOutput buffer, long i) throws IOException { EncodingUtils.writeUnsignedInteger(buffer, i); } - protected short readUnsignedByte(DataInput buffer) throws IOException - { - return (short) buffer.readUnsignedByte(); - } - protected void writeUnsignedByte(DataOutput buffer, short unsignedByte) throws IOException { EncodingUtils.writeUnsignedByte(buffer, unsignedByte); Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Fri Oct 17 14:23:19 2014 @@ -389,6 +389,17 @@ public final class AMQShortString implem { return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset); } + + @Override + public String toString() + { + char[] chars = new char[length()]; + for(int i = 0; i < length(); i++) + { + chars[i] = charAt(i); + } + return new String(chars); + } } public char[] asChars() Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java Fri Oct 17 14:23:19 2014 @@ -63,4 +63,13 @@ public interface ClientMethodDispatcher public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException; public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException; -} \ No newline at end of file + boolean dispatchConnectionRedirect(ConnectionRedirectBody connectionRedirectBody, int channelId) throws AMQException; + boolean dispatchAccessRequestOk(AccessRequestOkBody accessRequestOkBody, int channelId) throws AMQException; + + boolean dispatchQueueUnbindOk(QueueUnbindOkBody queueUnbindOkBody, int channelId) throws AMQException; + + boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody basicRecoverSyncOkBody, int channelId) + throws AMQException; + + boolean dispatchChannelAlert(ChannelAlertBody channelAlertBody, int channelId) throws AMQException; +} Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Fri Oct 17 14:23:19 2014 @@ -20,15 +20,15 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + public class ContentBody implements AMQBody { public static final byte TYPE = 3; @@ -72,23 +72,20 @@ public class ContentBody implements AMQB session.contentBodyReceived(channelId, this); } - protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException + public byte[] getPayload() { - if (size > 0) - { - _payload = new byte[(int)size]; - buffer.read(getPayload()); - } - + return _payload; } - public void reduceBufferToFit() + public static void process(final MarkableDataInput in, + final ChannelMethodProcessor methodProcessor, final long bodySize) + throws IOException { - } - public byte[] getPayload() - { - return _payload; + byte[] payload = new byte[(int)bodySize]; + in.readFully(payload); + + methodProcessor.receiveMessageContent(payload); } private static class BufferContentBody implements AMQBody --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
