Author: rgodfrey
Date: Wed Jan 6 16:16:11 2016
New Revision: 1723359
URL: http://svn.apache.org/viewvc?rev=1723359&view=rev
Log:
QPID-6971 : Remove MarkableDataInput and use QpidByteBuffer for input processing
Removed:
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicQosOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldArray.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueBindOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
Wed Jan 6 16:16:11 2016
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.store.berkeleydb.upgrade;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -49,11 +46,9 @@ import org.apache.qpid.exchange.Exchange
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ByteArrayDataInput;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessagePublishInfo;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -677,15 +672,9 @@ public class UpgradeFrom4To5 extends Abs
byte[] underlying = new byte[bodySize];
tupleInput.readFast(underlying);
- try
- {
- return ContentHeaderBody.createFromBuffer(new
ByteArrayDataInput(underlying),
- bodySize);
- }
- catch (IOException e)
- {
- throw new
AMQFrameDecodingException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
- }
+ return
ContentHeaderBody.createFromBuffer(QpidByteBuffer.wrap(underlying),
+ bodySize);
+
}
@Override
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
Wed Jan 6 16:16:11 2016
@@ -20,33 +20,17 @@
*/
package org.apache.qpid.server.store;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.framing.EncodingUtils;
import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.util.ByteBufferInputStream;
public class TestMessageMetaDataFactory implements
MessageMetaDataType.Factory<TestMessageMetaData>
{
public TestMessageMetaData createMetaData(QpidByteBuffer buf)
{
- try
- {
- InputStream bbis = buf.asInputStream();
- DataInputStream dais = new DataInputStream(bbis);
-
- long id = EncodingUtils.readLong(dais);
- int size = EncodingUtils.readInteger(dais);
+ long id = buf.getLong();
+ int size = buf.getInt();
- return new TestMessageMetaData(id, size);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ return new TestMessageMetaData(id, size);
}
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
Wed Jan 6 16:16:11 2016
@@ -28,7 +28,7 @@ import java.security.PrivilegedException
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.ServerDecoder;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -49,8 +49,8 @@ public class BrokerDecoder extends Serve
}
@Override
- protected void processFrame(final int channelId, final byte type, final
long bodySize, final MarkableDataInput in)
- throws AMQFrameDecodingException, IOException
+ protected void processFrame(final int channelId, final byte type, final
long bodySize, final QpidByteBuffer in)
+ throws AMQFrameDecodingException
{
long startTime = 0;
try
@@ -83,11 +83,7 @@ public class BrokerDecoder extends Serve
catch (PrivilegedActionException e)
{
Throwable cause = e.getCause();
- if(cause instanceof IOException)
- {
- throw (IOException) cause;
- }
- else if(cause instanceof AMQFrameDecodingException)
+ if(cause instanceof AMQFrameDecodingException)
{
throw (AMQFrameDecodingException) cause;
}
@@ -112,8 +108,8 @@ public class BrokerDecoder extends Serve
}
- private void doProcessFrame(final int channelId, final byte type, final
long bodySize, final MarkableDataInput in)
- throws AMQFrameDecodingException, IOException
+ private void doProcessFrame(final int channelId, final byte type, final
long bodySize, final QpidByteBuffer in)
+ throws AMQFrameDecodingException
{
super.processFrame(channelId, type, bodySize, in);
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
Wed Jan 6 16:16:11 2016
@@ -20,14 +20,12 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
@@ -207,14 +205,13 @@ public class MessageMetaData implements
{
try
{
- MarkableDataInput dataInput = buf.asDataInput();
- int size = EncodingUtils.readInteger(dataInput);
- ContentHeaderBody chb =
ContentHeaderBody.createFromBuffer(dataInput, size);
- final AMQShortString exchange =
EncodingUtils.readAMQShortString(dataInput);
- final AMQShortString routingKey =
EncodingUtils.readAMQShortString(dataInput);
+ int size = buf.getInt();
+ ContentHeaderBody chb =
ContentHeaderBody.createFromBuffer(buf, size);
+ final AMQShortString exchange =
AMQShortString.readAMQShortString(buf);
+ final AMQShortString routingKey =
AMQShortString.readAMQShortString(buf);
- final byte flags = EncodingUtils.readByte(dataInput);
- long arrivalTime = EncodingUtils.readLong(dataInput);
+ final byte flags = buf.get();
+ long arrivalTime = buf.getLong();
MessagePublishInfo publishBody =
new MessagePublishInfo(exchange,
@@ -224,7 +221,7 @@ public class MessageMetaData implements
return new MessageMetaData(publishBody, chb, arrivalTime);
}
- catch (IOException | AMQFrameDecodingException |
AMQProtocolVersionException e)
+ catch (AMQFrameDecodingException | AMQProtocolVersionException e)
{
throw new ConnectionScopedRuntimeException(e);
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
Wed Jan 6 16:16:11 2016
@@ -21,7 +21,6 @@
package org.apache.qpid.client.message;
import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
@@ -40,9 +39,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ByteArrayDataInput;
public class Encrypted091MessageFactory extends AbstractJMSMessageFactory
{
@@ -144,7 +143,7 @@ public class Encrypted091MessageFactory
BasicContentHeaderProperties properties = new
BasicContentHeaderProperties();
int payloadOffset;
- ByteArrayDataInput dataInput = new
ByteArrayDataInput(unencryptedBytes);
+ QpidByteBuffer dataInput =
QpidByteBuffer.wrap(unencryptedBytes);
payloadOffset = properties.read(dataInput);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
Wed Jan 6 16:16:11 2016
@@ -29,7 +29,6 @@ import java.nio.channels.GatheringByteCh
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -45,8 +44,6 @@ import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.codec.MarkableDataInput;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.streams.CompositeInputStream;
public final class QpidByteBuffer
@@ -394,6 +391,38 @@ public final class QpidByteBuffer
return _buffer.getFloat(index);
}
+ public int getUnsignedByte()
+ {
+ return ((int)get()) & 0xFF;
+ }
+
+ public int getUnsignedShort()
+ {
+ return ((int) getShort()) & 0xffff;
+ }
+
+ public long getUnsignedInt()
+ {
+ return ((long) getInt()) & 0xffffffffL;
+ }
+
+ public void putUnsignedByte(final short s)
+ {
+ put((byte)s);
+ }
+
+
+ public void putUnsignedShort(final int i)
+ {
+ putShort((short)i);
+ }
+
+ public void putUnsignedInt(final long l)
+ {
+ putInt((int)l);
+ }
+
+
public QpidByteBuffer slice()
{
return new QpidByteBuffer(_buffer.slice(), _ref);
@@ -434,11 +463,6 @@ public final class QpidByteBuffer
return new BufferInputStream();
}
- public MarkableDataInput asDataInput()
- {
- return new BufferDataInput();
- }
-
public static QpidByteBuffer allocate(int size)
{
@@ -762,147 +786,4 @@ public final class QpidByteBuffer
}
}
- private final class BufferDataInput implements MarkableDataInput
- {
- private int _mark;
- private final int _offset;
-
- public BufferDataInput()
- {
- _offset = _buffer.position();
- }
-
- public void readFully(byte[] b)
- {
- _buffer.get(b);
- }
-
- public void readFully(byte[] b, int off, int len)
- {
- _buffer.get(b, 0, len);
- }
-
- public QpidByteBuffer readAsByteBuffer(int len)
- {
- final QpidByteBuffer view = view(0, len);
- skipBytes(len);
- return view;
- }
-
- public int skipBytes(int n)
- {
- _buffer.position(_buffer.position()+n);
- return _buffer.position()-_offset;
- }
-
- public boolean readBoolean()
- {
- return _buffer.get() != 0;
- }
-
- public byte readByte()
- {
- return _buffer.get();
- }
-
- public int readUnsignedByte()
- {
- return ((int) _buffer.get()) & 0xFF;
- }
-
- public short readShort()
- {
- return _buffer.getShort();
- }
-
- public int readUnsignedShort()
- {
- return ((int) _buffer.getShort()) & 0xffff;
- }
-
- public char readChar()
- {
- return (char) _buffer.getChar();
- }
-
- public int readInt()
- {
- return _buffer.getInt();
- }
-
- public long readLong()
- {
- return _buffer.getLong();
- }
-
- public float readFloat()
- {
- return _buffer.getFloat();
- }
-
- public double readDouble()
- {
- return _buffer.getDouble();
- }
-
- public AMQShortString readAMQShortString()
- {
- return AMQShortString.readAMQShortString(_buffer);
- }
-
- public String readLine()
- {
- throw new UnsupportedOperationException();
- }
-
- public String readUTF()
- {
- throw new UnsupportedOperationException();
- }
-
- public int available()
- {
- return _buffer.remaining();
- }
-
-
- public long skip(long i)
- {
- _buffer.position(_buffer.position()+(int)i);
- return i;
- }
-
- public int read(byte[] b)
- {
- readFully(b);
- return b.length;
- }
-
- public int position()
- {
- return _buffer.position()-_offset;
- }
-
- public void position(int position)
- {
- _buffer.position(position + _offset);
- }
-
- public int length()
- {
- return _buffer.limit();
- }
-
-
- public void mark(int readAhead)
- {
- _mark = position();
- }
-
- public void reset()
- {
- position(_mark);
- }
- }
-
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
Wed Jan 6 16:16:11 2016
@@ -20,19 +20,10 @@
*/
package org.apache.qpid.codec;
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
@@ -51,7 +42,7 @@ import org.apache.qpid.protocol.AMQConst
public abstract class AMQDecoder<T extends MethodProcessor>
{
private static final Logger LOGGER =
LoggerFactory.getLogger(AMQDecoder.class);
- private static final int MAX_BUFFERS_LIMIT = 10;
+ public static final int FRAME_HEADER_SIZE = 7;
private final T _methodProcessor;
/** Holds the protocol initiation decoder. */
@@ -99,60 +90,56 @@ public abstract class AMQDecoder<T exten
return _methodProcessor;
}
- protected void decode(final MarkableDataInput msg) throws IOException,
AMQFrameDecodingException
+ protected final int decode(final QpidByteBuffer buf) throws
AMQFrameDecodingException
{
// If this is the first read then we may be getting a protocol
initiation back if we tried to negotiate
// an unsupported version
- if(_firstRead && msg.available()>0)
+ if(_firstRead && buf.hasRemaining())
{
- msg.mark(1);
_firstRead = false;
- if(!_expectProtocolInitiation && (((int)msg.readByte()) &0xff) > 8)
+ if(!_expectProtocolInitiation && (((int)buf.get(buf.position()))
&0xff) > 8)
{
_expectProtocolInitiation = true;
}
- msg.reset();
}
- boolean enoughData = true;
- while (enoughData)
+ int required = 0;
+ while (required == 0)
{
if(!_expectProtocolInitiation)
{
- enoughData = decodable(msg);
- if (enoughData)
+ required = decodable(buf);
+ if (required == 0)
{
- processInput(msg);
+ processInput(buf);
}
}
else
{
- enoughData = _piDecoder.decodable(msg);
- if (enoughData)
+ required = _piDecoder.decodable(buf);
+ if (required == 0)
{
- _methodProcessor.receiveProtocolHeader(new
ProtocolInitiation(msg));
+ _methodProcessor.receiveProtocolHeader(new
ProtocolInitiation(buf));
}
}
}
+ return required;
}
- private boolean decodable(final MarkableDataInput in) throws
AMQFrameDecodingException, IOException
+ private int decodable(final QpidByteBuffer in) throws
AMQFrameDecodingException
{
- final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
+ final int remainingAfterAttributes = in.remaining() -
FRAME_HEADER_SIZE;
// type, channel, body length and end byte
if (remainingAfterAttributes < 0)
{
- return false;
+ return -remainingAfterAttributes;
}
- in.mark(8);
- in.skip(1 + 2);
-
// Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
- final long bodySize = in.readInt() & 0xffffffffL;
+ final long bodySize = ((long)in.getInt(in.position()+3)) & 0xffffffffL;
if (bodySize > _maxFrameSize)
{
throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
@@ -161,18 +148,19 @@ public abstract class AMQDecoder<T exten
+ " is larger than negotiated
maximum of "
+ _maxFrameSize);
}
- in.reset();
- return (remainingAfterAttributes >= bodySize);
+
+ long required = (1L+bodySize)-remainingAfterAttributes;
+ return required > 0 ? (int) required : 0;
}
- private void processInput(final MarkableDataInput in)
- throws AMQFrameDecodingException, AMQProtocolVersionException,
IOException
+ private void processInput(final QpidByteBuffer in)
+ throws AMQFrameDecodingException, AMQProtocolVersionException
{
- final byte type = in.readByte();
+ final byte type = in.get();
- final int channel = in.readUnsignedShort();
- final long bodySize = EncodingUtils.readUnsignedInteger(in);
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
// bodySize can be zero
if ((channel < 0) || (bodySize < 0))
@@ -184,7 +172,7 @@ public abstract class AMQDecoder<T exten
processFrame(channel, type, bodySize, in);
- byte marker = in.readByte();
+ byte marker = in.get();
if ((marker & 0xFF) != 0xCE)
{
throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
@@ -194,8 +182,8 @@ public abstract class AMQDecoder<T exten
}
- protected void processFrame(final int channel, final byte type, final long
bodySize, final MarkableDataInput in)
- throws AMQFrameDecodingException, IOException
+ protected void processFrame(final int channel, final byte type, final long
bodySize, final QpidByteBuffer in)
+ throws AMQFrameDecodingException
{
switch (type)
{
@@ -218,8 +206,8 @@ public abstract class AMQDecoder<T exten
abstract void processMethod(int channelId,
- MarkableDataInput in)
- throws AMQFrameDecodingException, IOException;
+ QpidByteBuffer in)
+ throws AMQFrameDecodingException;
AMQFrameDecodingException newUnknownMethodException(final int classId,
final int methodId,
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
Wed Jan 6 16:16:11 2016
@@ -26,11 +26,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.*;
public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends
ClientChannelMethodProcessor>>
{
- private List<ByteBuffer> _incompleteBuffers = new ArrayList<ByteBuffer>();
+ private QpidByteBuffer _incompleteBuffer;
/**
* Creates a new AMQP decoder.
@@ -43,42 +44,68 @@ public class ClientDecoder extends AMQDe
}
public void decodeBuffer(ByteBuffer buf)
- throws AMQFrameDecodingException, AMQProtocolVersionException,
IOException
+ throws AMQFrameDecodingException, AMQProtocolVersionException
{
- buf = buf.slice();
- _incompleteBuffers.add(buf);
- MarkableDataInput msg = new
ByteBufferListDataInput(_incompleteBuffers);
-
- decode(msg);
+ buf = buf.slice();
- ListIterator<ByteBuffer> iter = _incompleteBuffers.listIterator();
- while (iter.hasNext())
+ if(_incompleteBuffer != null)
{
- ByteBuffer next = iter.next();
- if (next.hasRemaining())
+ if(buf.remaining() < _incompleteBuffer.remaining())
{
- if (next.position() != 0)
- {
- iter.set(next.slice());
- }
- break;
+ _incompleteBuffer.put(buf);
+ return;
}
else
{
- iter.remove();
+ final ByteBuffer start = buf.duplicate();
+ start.limit(_incompleteBuffer.remaining());
+ buf.position(buf.position()+_incompleteBuffer.remaining());
+ _incompleteBuffer.put(start);
+ _incompleteBuffer.flip();
+ final int required = decode(_incompleteBuffer);
+
+ if(required != 0)
+ {
+ QpidByteBuffer newBuffer =
QpidByteBuffer.allocate(required + _incompleteBuffer.remaining());
+ newBuffer.put(_incompleteBuffer);
+ _incompleteBuffer.dispose();
+ _incompleteBuffer = newBuffer;
+ if(buf.hasRemaining())
+ {
+ decodeBuffer(buf);
+ }
+ return;
+ }
+ else
+ {
+ _incompleteBuffer.dispose();
+ _incompleteBuffer = null;
+ if(!buf.hasRemaining())
+ {
+ return;
+ }
+ }
}
}
+ QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(buf);
+ final int required = decode(qpidByteBuffer);
+ if(required != 0)
+ {
+ _incompleteBuffer =
QpidByteBuffer.allocate(qpidByteBuffer.remaining()+required);
+ _incompleteBuffer.put(qpidByteBuffer);
+ }
+
}
void processMethod(int channelId,
- MarkableDataInput in)
- throws AMQFrameDecodingException, IOException
+ QpidByteBuffer in)
+ throws AMQFrameDecodingException
{
ClientMethodProcessor<? extends ClientChannelMethodProcessor>
methodProcessor = getMethodProcessor();
ClientChannelMethodProcessor channelMethodProcessor =
methodProcessor.getChannelMethodProcessor(channelId);
- final int classAndMethod = in.readInt();
+ final int classAndMethod = in.getInt();
int classId = classAndMethod >> 16;
int methodId = classAndMethod & 0xFFFF;
methodProcessor.setCurrentMethod(classId, methodId);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
Wed Jan 6 16:16:11 2016
@@ -40,16 +40,16 @@ public class ServerDecoder extends AMQDe
public void decodeBuffer(QpidByteBuffer buf) throws
AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
- decode(buf.asDataInput());
+ decode(buf);
}
void processMethod(int channelId,
- MarkableDataInput in)
- throws AMQFrameDecodingException, IOException
+ QpidByteBuffer in)
+ throws AMQFrameDecodingException
{
ServerMethodProcessor<? extends ServerChannelMethodProcessor>
methodProcessor = getMethodProcessor();
- final int classAndMethod = in.readInt();
+ final int classAndMethod = in.getInt();
int classId = classAndMethod >> 16;
int methodId = classAndMethod & 0xFFFF;
methodProcessor.setCurrentMethod(classId, methodId);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
Wed Jan 6 16:16:11 2016
@@ -61,8 +61,8 @@ public class AMQFrame extends AMQDataBlo
QpidByteBuffer frameHeader =
QpidByteBuffer.allocateDirect(HEADER_SIZE);
frameHeader.put(_bodyFrame.getFrameType());
- EncodingUtils.writeUnsignedShort(frameHeader, _channel);
- EncodingUtils.writeUnsignedInteger(frameHeader, _bodyFrame.getSize());
+ frameHeader.putUnsignedShort(_channel);
+ frameHeader.putUnsignedInt((long) _bodyFrame.getSize());
frameHeader.flip();
sender.send(frameHeader);
frameHeader.dispose();
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
Wed Jan 6 16:16:11 2016
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.framing;
-
-import java.io.DataInput;
-import java.io.IOException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,8 +75,8 @@ public abstract class AMQMethodBodyImpl
final int size = getSize();
QpidByteBuffer buf = QpidByteBuffer.allocateDirect(size);
- EncodingUtils.writeUnsignedShort(buf, getClazz());
- EncodingUtils.writeUnsignedShort(buf, getMethod());
+ buf.putUnsignedShort(getClazz());
+ buf.putUnsignedShort(getMethod());
writeMethodPayload(buf);
buf.flip();
sender.send(buf);
@@ -113,11 +109,6 @@ public abstract class AMQMethodBodyImpl
}
- protected int readInt(DataInput buffer) throws IOException
- {
- return buffer.readInt();
- }
-
protected int getSizeOf(FieldTable table)
{
return EncodingUtils.encodedFieldTableLength(table); //To change body
of created methods use File | Settings | File Templates.
@@ -144,11 +135,6 @@ public abstract class AMQMethodBodyImpl
EncodingUtils.writeBytes(buffer,data);
}
- protected short readShort(DataInput buffer) throws IOException
- {
- return EncodingUtils.readShort(buffer);
- }
-
protected void writeShort(QpidByteBuffer buffer, short s)
{
buffer.putShort(s);
@@ -161,17 +147,17 @@ public abstract class AMQMethodBodyImpl
protected void writeUnsignedShort(QpidByteBuffer buffer, int s)
{
- EncodingUtils.writeUnsignedShort(buffer, s);
+ buffer.putUnsignedShort(s);
}
protected void writeUnsignedInteger(QpidByteBuffer buffer, long i)
{
- EncodingUtils.writeUnsignedInteger(buffer, i);
+ buffer.putUnsignedInt(i);
}
protected void writeUnsignedByte(QpidByteBuffer buffer, short unsignedByte)
{
- EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
+ buffer.putUnsignedByte(unsignedByte);
}
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
Wed Jan 6 16:16:11 2016
@@ -21,8 +21,6 @@
package org.apache.qpid.framing;
-import java.io.DataInput;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -98,21 +96,34 @@ public final class AMQShortString implem
_asString = string == null ? "" : string;
}
- private AMQShortString(DataInput data, final int length) throws IOException
+ public static AMQShortString readAMQShortString(ByteBuffer buffer)
{
- if (length > MAX_LENGTH)
+ int length = ((int) buffer.get()) & 0xff;
+ if(length == 0)
{
- throw new IllegalArgumentException("Cannot create AMQShortString
with number of octets over 255!");
+ return null;
}
- byte[] dataBytes = new byte[length];
- data.readFully(dataBytes);
- _data = dataBytes;
- _offset = 0;
- _length = length;
+ else
+ {
+ if (length > MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("Cannot create
AMQShortString with number of octets over 255!");
+ }
+ if(length > buffer.remaining())
+ {
+ throw new IllegalArgumentException("Cannot create
AMQShortString with length "
+ + length + " from a
ByteBuffer with only "
+ + buffer.remaining()
+ + " bytes.");
+ }
+ byte[] data = new byte[length];
+ buffer.get(data);
+ return new AMQShortString(data, 0, length);
+ }
}
- public static AMQShortString readAMQShortString(ByteBuffer buffer)
+ public static AMQShortString readAMQShortString(QpidByteBuffer buffer)
{
int length = ((int) buffer.get()) & 0xff;
if(length == 0)
@@ -139,6 +150,7 @@ public final class AMQShortString implem
}
}
+
public AMQShortString(byte[] data, final int offset, final int length)
{
if (length > MAX_LENGTH)
@@ -171,20 +183,6 @@ public final class AMQShortString implem
}
- public static AMQShortString readFromBuffer(DataInput buffer) throws
IOException
- {
- final int length = buffer.readUnsignedByte();
- if (length == 0)
- {
- return null;
- }
- else
- {
-
- return new AMQShortString(buffer, length);
- }
- }
-
public byte[] getBytes()
{
if(_offset == 0 && _length == _data.length)
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
Wed Jan 6 16:16:11 2016
@@ -20,12 +20,10 @@
*/
package org.apache.qpid.framing;
-import java.io.IOException;
import java.math.BigDecimal;
import java.util.Collection;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
/**
* AMQType is a type that represents the different possible AMQP field table
types. It provides operations for each
@@ -58,7 +56,7 @@ public enum AMQType
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
return EncodingUtils.readLongString(buffer);
}
@@ -105,9 +103,9 @@ public enum AMQType
buffer.putLong( (Long) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
- return EncodingUtils.readUnsignedInteger(buffer);
+ return buffer.getUnsignedInt();
}
},
@@ -145,11 +143,11 @@ public enum AMQType
buffer.putInt(unscaled);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
- byte places = EncodingUtils.readByte(buffer);
+ byte places = buffer.get();
- int unscaled = EncodingUtils.readInteger(buffer);
+ int unscaled = buffer.getInt();
BigDecimal bd = new BigDecimal(unscaled);
@@ -182,9 +180,9 @@ public enum AMQType
buffer.putLong ((Long) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
- return EncodingUtils.readLong(buffer);
+ return buffer.getLong();
}
},
@@ -255,7 +253,7 @@ public enum AMQType
*
* @return An instance of the type.
*/
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
try
{
@@ -318,7 +316,7 @@ public enum AMQType
*
* @return An instance of the type.
*/
- public Object readValueFromBuffer(MarkableDataInput buffer)
throws IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
// Read size of field table then all name/value pairs.
return FieldArray.readFromBuffer(buffer);
@@ -348,7 +346,7 @@ public enum AMQType
public void writeValueImpl(Object value, QpidByteBuffer buffer)
{ }
- public Object readValueFromBuffer(MarkableDataInput buffer)
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
return null;
}
@@ -379,7 +377,7 @@ public enum AMQType
EncodingUtils.writeLongstr(buffer, (byte[]) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
return EncodingUtils.readLongstr(buffer);
}
@@ -409,7 +407,7 @@ public enum AMQType
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
return EncodingUtils.readLongString(buffer);
}
@@ -440,7 +438,7 @@ public enum AMQType
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
return EncodingUtils.readLongString(buffer);
}
@@ -475,9 +473,9 @@ public enum AMQType
EncodingUtils.writeBoolean(buffer, (Boolean) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
- return EncodingUtils.readBoolean(buffer);
+ return buffer.get() == 1;
}
},
@@ -511,9 +509,9 @@ public enum AMQType
buffer.put((byte) charVal);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
- return EncodingUtils.readChar(buffer);
+ return (char) buffer.get();
}
},
@@ -546,9 +544,9 @@ public enum AMQType
buffer.put((Byte) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
- return EncodingUtils.readByte(buffer);
+ return buffer.get();
}
},
@@ -585,9 +583,9 @@ public enum AMQType
buffer.putShort((Short) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
- return EncodingUtils.readShort(buffer);
+ return buffer.getShort();
}
},
@@ -626,9 +624,9 @@ public enum AMQType
{
buffer.putInt((Integer) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
- return EncodingUtils.readInteger(buffer);
+ return buffer.getInt();
}
},
@@ -673,9 +671,9 @@ public enum AMQType
buffer.putLong ((Long) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
- return EncodingUtils.readLong(buffer);
+ return buffer.getLong();
}
},
@@ -708,9 +706,9 @@ public enum AMQType
buffer.putFloat ((Float) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
- return EncodingUtils.readFloat(buffer);
+ return buffer.getFloat();
}
},
@@ -747,9 +745,9 @@ public enum AMQType
buffer.putDouble((Double) value);
}
- public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
+ public Object readValueFromBuffer(QpidByteBuffer buffer)
{
- return EncodingUtils.readDouble(buffer);
+ return buffer.getDouble();
}
};
@@ -822,5 +820,5 @@ public enum AMQType
*
* @return An instance of the type.
*/
- abstract Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException;
+ abstract Object readValueFromBuffer(QpidByteBuffer buffer);
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
Wed Jan 6 16:16:11 2016
@@ -20,15 +20,12 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInput;
-import java.io.IOException;
import java.math.BigDecimal;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
/**
* AMQTypedValue combines together a native Java Object value, and an {@link
AMQType}, as a fully typed AMQP parameter
@@ -67,7 +64,7 @@ public abstract class AMQTypedValue
_value = type.toNativeValue(value);
}
- private GenericTypedValue(AMQType type, MarkableDataInput buffer)
throws IOException
+ private GenericTypedValue(AMQType type, QpidByteBuffer buffer)
{
_type = type;
_value = type.readValueFromBuffer(buffer);
@@ -131,9 +128,9 @@ public abstract class AMQTypedValue
_value = value;
}
- public LongTypedValue(DataInput buffer) throws IOException
+ public LongTypedValue(QpidByteBuffer buffer)
{
- _value = EncodingUtils.readLong(buffer);
+ _value = buffer.getLong();
}
public AMQType getType()
@@ -199,9 +196,9 @@ public abstract class AMQTypedValue
}
- public static AMQTypedValue readFromBuffer(MarkableDataInput buffer)
throws IOException
+ public static AMQTypedValue readFromBuffer(QpidByteBuffer buffer)
{
- AMQType type = AMQTypeMap.getType(buffer.readByte());
+ AMQType type = AMQTypeMap.getType(buffer.get());
switch(type)
{
@@ -209,7 +206,7 @@ public abstract class AMQTypedValue
return new LongTypedValue(buffer);
case INT:
- int value = EncodingUtils.readInteger(buffer);
+ int value = buffer.getInt();
return createAMQTypedValue(value);
default:
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
Wed Jan 6 16:16:11 2016
@@ -27,11 +27,8 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
public class AccessRequestBody extends AMQMethodBodyImpl implements
EncodableAMQDataBlock, AMQMethodBody
{
@@ -43,13 +40,6 @@ public class AccessRequestBody extends A
private final AMQShortString _realm; // [realm]
private final byte _bitfield0; // [exclusive, passive, active, write, read]
- // Constructor
- public AccessRequestBody(MarkableDataInput buffer) throws
AMQFrameDecodingException, IOException
- {
- _realm = buffer.readAMQShortString();
- _bitfield0 = buffer.readByte();
- }
-
public AccessRequestBody(
AMQShortString realm,
boolean exclusive,
@@ -165,11 +155,11 @@ public class AccessRequestBody extends A
return buf.toString();
}
- public static void process(final MarkableDataInput buffer,
- final ServerChannelMethodProcessor dispatcher)
throws IOException
+ public static void process(final QpidByteBuffer buffer,
+ final ServerChannelMethodProcessor dispatcher)
{
- AMQShortString realm = buffer.readAMQShortString();
- byte bitfield = buffer.readByte();
+ AMQShortString realm = AMQShortString.readAMQShortString(buffer);
+ byte bitfield = buffer.get();
boolean exclusive = (bitfield & 0x01) == 0x1 ;
boolean passive = (bitfield & 0x02) == 0x2 ;
boolean active = (bitfield & 0x04) == 0x4 ;
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
Wed Jan 6 16:16:11 2016
@@ -27,11 +27,8 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
public class AccessRequestOkBody extends AMQMethodBodyImpl implements
EncodableAMQDataBlock, AMQMethodBody
{
@@ -42,12 +39,6 @@ public class AccessRequestOkBody extends
// Fields declared in specification
private final int _ticket; // [ticket]
- // Constructor
- public AccessRequestOkBody(MarkableDataInput buffer) throws
AMQFrameDecodingException, IOException
- {
- _ticket = buffer.readUnsignedShort();
- }
-
public AccessRequestOkBody(
int ticket
)
@@ -95,11 +86,10 @@ public class AccessRequestOkBody extends
return buf.toString();
}
- public static void process(final MarkableDataInput buffer,
+ public static void process(final QpidByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
- throws IOException
{
- int ticket = buffer.readUnsignedShort();
+ int ticket = buffer.getUnsignedShort();
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveAccessRequestOk(ticket);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
Wed Jan 6 16:16:11 2016
@@ -27,11 +27,8 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
public class BasicAckBody extends AMQMethodBodyImpl implements
EncodableAMQDataBlock, AMQMethodBody
{
@@ -43,13 +40,6 @@ public class BasicAckBody extends AMQMet
private final long _deliveryTag; // [deliveryTag]
private final byte _bitfield0; // [multiple]
- // Constructor
- public BasicAckBody(MarkableDataInput buffer) throws
AMQFrameDecodingException, IOException
- {
- _deliveryTag = buffer.readLong();
- _bitfield0 = buffer.readByte();
- }
-
public BasicAckBody(
long deliveryTag,
boolean multiple
@@ -112,12 +102,12 @@ public class BasicAckBody extends AMQMet
return buf.toString();
}
- public static void process(final MarkableDataInput buffer,
- final ChannelMethodProcessor dispatcher) throws
IOException
+ public static void process(final QpidByteBuffer buffer,
+ final ChannelMethodProcessor dispatcher)
{
- long deliveryTag = buffer.readLong();
- boolean multiple = (buffer.readByte() & 0x01) != 0;
+ long deliveryTag = buffer.getLong();
+ boolean multiple = (buffer.get() & 0x01) != 0;
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveBasicAck(deliveryTag, multiple);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
Wed Jan 6 16:16:11 2016
@@ -27,11 +27,8 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
public class BasicCancelBody extends AMQMethodBodyImpl implements
EncodableAMQDataBlock, AMQMethodBody
{
@@ -43,13 +40,6 @@ public class BasicCancelBody extends AMQ
private final AMQShortString _consumerTag; // [consumerTag]
private final byte _bitfield0; // [nowait]
- // Constructor
- public BasicCancelBody(MarkableDataInput buffer) throws
AMQFrameDecodingException, IOException
- {
- _consumerTag = buffer.readAMQShortString();
- _bitfield0 = buffer.readByte();
- }
-
public BasicCancelBody(
AMQShortString consumerTag,
boolean nowait
@@ -113,12 +103,12 @@ public class BasicCancelBody extends AMQ
return buf.toString();
}
- public static void process(final MarkableDataInput buffer,
- final ServerChannelMethodProcessor dispatcher)
throws IOException
+ public static void process(final QpidByteBuffer buffer,
+ final ServerChannelMethodProcessor dispatcher)
{
- AMQShortString consumerTag = buffer.readAMQShortString();
- boolean noWait = (buffer.readByte() & 0x01) == 0x01;
+ AMQShortString consumerTag = AMQShortString.readAMQShortString(buffer);
+ boolean noWait = (buffer.get() & 0x01) == 0x01;
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveBasicCancel(consumerTag, noWait);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
Wed Jan 6 16:16:11 2016
@@ -27,11 +27,8 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
public class BasicCancelOkBody extends AMQMethodBodyImpl implements
EncodableAMQDataBlock, AMQMethodBody
{
@@ -42,12 +39,6 @@ public class BasicCancelOkBody extends A
// Fields declared in specification
private final AMQShortString _consumerTag; // [consumerTag]
- // Constructor
- public BasicCancelOkBody(MarkableDataInput buffer) throws
AMQFrameDecodingException, IOException
- {
- _consumerTag = buffer.readAMQShortString();
- }
-
public BasicCancelOkBody(
AMQShortString consumerTag
)
@@ -96,11 +87,10 @@ public class BasicCancelOkBody extends A
return buf.toString();
}
- public static void process(final MarkableDataInput in,
+ public static void process(QpidByteBuffer in,
final ClientChannelMethodProcessor dispatcher)
- throws IOException
{
- AMQShortString consumerTag = in.readAMQShortString();
+ AMQShortString consumerTag = AMQShortString.readAMQShortString(in);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveBasicCancelOk(consumerTag);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
Wed Jan 6 16:16:11 2016
@@ -27,11 +27,8 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
public class BasicConsumeBody extends AMQMethodBodyImpl implements
EncodableAMQDataBlock, AMQMethodBody
{
@@ -46,16 +43,6 @@ public class BasicConsumeBody extends AM
private final byte _bitfield0; // [noLocal, noAck, exclusive, nowait]
private final FieldTable _arguments; // [arguments]
- // Constructor
- public BasicConsumeBody(MarkableDataInput buffer) throws
AMQFrameDecodingException, IOException
- {
- _ticket = buffer.readUnsignedShort();
- _queue = buffer.readAMQShortString();
- _consumerTag = buffer.readAMQShortString();
- _bitfield0 = buffer.readByte();
- _arguments = EncodingUtils.readFieldTable(buffer);
- }
-
public BasicConsumeBody(
int ticket,
AMQShortString queue,
@@ -191,15 +178,15 @@ public class BasicConsumeBody extends AM
return buf.toString();
}
- public static void process(final MarkableDataInput buffer,
+ public static void process(final QpidByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
- throws IOException, AMQFrameDecodingException
+ throws AMQFrameDecodingException
{
- int ticket = buffer.readUnsignedShort();
- AMQShortString queue = buffer.readAMQShortString();
- AMQShortString consumerTag = buffer.readAMQShortString();
- byte bitfield = buffer.readByte();
+ int ticket = buffer.getUnsignedShort();
+ AMQShortString queue = AMQShortString.readAMQShortString(buffer);
+ AMQShortString consumerTag = AMQShortString.readAMQShortString(buffer);
+ byte bitfield = buffer.get();
boolean noLocal = (bitfield & 0x01) == 0x01;
boolean noAck = (bitfield & 0x02) == 0x02;
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
Wed Jan 6 16:16:11 2016
@@ -27,11 +27,8 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
public class BasicConsumeOkBody extends AMQMethodBodyImpl implements
EncodableAMQDataBlock, AMQMethodBody
{
@@ -42,12 +39,6 @@ public class BasicConsumeOkBody extends
// Fields declared in specification
private final AMQShortString _consumerTag; // [consumerTag]
- // Constructor
- public BasicConsumeOkBody(MarkableDataInput buffer) throws
AMQFrameDecodingException, IOException
- {
- _consumerTag = buffer.readAMQShortString();
- }
-
public BasicConsumeOkBody(
AMQShortString consumerTag
)
@@ -96,11 +87,10 @@ public class BasicConsumeOkBody extends
return buf.toString();
}
- public static void process(final MarkableDataInput buffer,
+ public static void process(final QpidByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
- throws IOException
{
- AMQShortString consumerTag = buffer.readAMQShortString();
+ AMQShortString consumerTag = AMQShortString.readAMQShortString(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveBasicConsumeOk(consumerTag);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
Wed Jan 6 16:16:11 2016
@@ -20,13 +20,10 @@
*/
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.transport.ByteBufferSender;
public class BasicContentHeaderProperties
@@ -320,15 +317,15 @@ public class BasicContentHeaderPropertie
}
}
- public int read(MarkableDataInput input) throws IOException
+ public int read(QpidByteBuffer input)
{
- _propertyFlags = input.readUnsignedShort();
+ _propertyFlags = input.getUnsignedShort();
int length = 2;
if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
{
length++;
- _contentType = EncodingUtils.readAMQShortString(input);
+ _contentType = AMQShortString.readAMQShortString(input);
if(_contentType != null)
{
length += _contentType.length();
@@ -338,7 +335,7 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & ENCODING_MASK) != 0)
{
length++;
- _encoding = EncodingUtils.readAMQShortString(input);
+ _encoding = AMQShortString.readAMQShortString(input);
if(_encoding != null)
{
length += _encoding.length();
@@ -347,7 +344,7 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & HEADERS_MASK) != 0)
{
- int fieldTableLength = input.readInt();
+ int fieldTableLength = input.getInt();
_headers = new FieldTable(input, fieldTableLength);
@@ -357,20 +354,20 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
{
- _deliveryMode = input.readByte();
+ _deliveryMode = input.get();
length++;
}
if ((_propertyFlags & PRIORITY_MASK) != 0)
{
- _priority = input.readByte();
+ _priority = input.get();
length++;
}
if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
{
length++;
- _correlationId = EncodingUtils.readAMQShortString(input);
+ _correlationId = AMQShortString.readAMQShortString(input);
if(_correlationId != null)
{
length += _correlationId.length();
@@ -380,7 +377,7 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & REPLY_TO_MASK) != 0)
{
length++;
- _replyTo = EncodingUtils.readAMQShortString(input);
+ _replyTo = AMQShortString.readAMQShortString(input);
if(_replyTo != null)
{
length += _replyTo.length();
@@ -390,7 +387,7 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & EXPIRATION_MASK) != 0)
{
length++;
- AMQShortString expiration =
EncodingUtils.readAMQShortString(input);
+ AMQShortString expiration =
AMQShortString.readAMQShortString(input);
if(expiration != null)
{
length += expiration.length();
@@ -401,7 +398,7 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
{
length++;
- _messageId = EncodingUtils.readAMQShortString(input);
+ _messageId = AMQShortString.readAMQShortString(input);
if(_messageId != null)
{
length += _messageId.length();
@@ -410,14 +407,14 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & TIMESTAMP_MASK) != 0)
{
- _timestamp = input.readLong();
+ _timestamp = input.getLong();
length += 8;
}
if ((_propertyFlags & TYPE_MASK) != 0)
{
length++;
- _type = EncodingUtils.readAMQShortString(input);
+ _type = AMQShortString.readAMQShortString(input);
if(_type != null)
{
length += _type.length();
@@ -427,7 +424,7 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & USER_ID_MASK) != 0)
{
length++;
- _userId = EncodingUtils.readAMQShortString(input);
+ _userId = AMQShortString.readAMQShortString(input);
if(_userId != null)
{
length += _userId.length();
@@ -437,7 +434,7 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
{
length++;
- _appId = EncodingUtils.readAMQShortString(input);
+ _appId = AMQShortString.readAMQShortString(input);
if(_appId != null)
{
length += _appId.length();
@@ -447,7 +444,7 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
{
length++;
- _clusterId = EncodingUtils.readAMQShortString(input);
+ _clusterId = AMQShortString.readAMQShortString(input);
if(_clusterId != null)
{
length += _clusterId.length();
@@ -480,7 +477,7 @@ public class BasicContentHeaderPropertie
}
- public synchronized void populatePropertiesFromBuffer(MarkableDataInput
buffer, int propertyFlags, int size) throws AMQFrameDecodingException,
IOException
+ public synchronized void populatePropertiesFromBuffer(QpidByteBuffer
buffer, int propertyFlags, int size) throws AMQFrameDecodingException
{
_propertyFlags = propertyFlags;
@@ -492,58 +489,55 @@ public class BasicContentHeaderPropertie
{
_encodedForm.dispose();
}
- _encodedForm = buffer.readAsByteBuffer(size);
+ _encodedForm = buffer.view(0,size);
final QpidByteBuffer byteBuffer = _encodedForm.slice();
- decode(byteBuffer.asDataInput());
+ decode(byteBuffer);
byteBuffer.dispose();
+ buffer.position(buffer.position()+size);
}
- private void decode(MarkableDataInput buffer) throws IOException,
AMQFrameDecodingException
+ private void decode(QpidByteBuffer buffer) throws AMQFrameDecodingException
{
- int headersOffset = 0;
-
if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
{
- _contentType = buffer.readAMQShortString();
- headersOffset +=
EncodingUtils.encodedShortStringLength(_contentType);
+ _contentType = AMQShortString.readAMQShortString(buffer);
}
if ((_propertyFlags & ENCODING_MASK) != 0)
{
- _encoding = buffer.readAMQShortString();
- headersOffset += EncodingUtils.encodedShortStringLength(_encoding);
+ _encoding = AMQShortString.readAMQShortString(buffer);
}
if ((_propertyFlags & HEADERS_MASK) != 0)
{
- long length = EncodingUtils.readUnsignedInteger(buffer);
+ long length = buffer.getUnsignedInt();
- QpidByteBuffer buf = _encodedForm.view(headersOffset+4,
(int)length);
+ QpidByteBuffer buf = buffer.view(0, (int)length);
_headers = new FieldTable(buf);
buf.dispose();
- buffer.skipBytes((int)length);
+ buffer.position(buffer.position()+(int)length);
}
if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
{
- _deliveryMode = buffer.readByte();
+ _deliveryMode = buffer.get();
}
if ((_propertyFlags & PRIORITY_MASK) != 0)
{
- _priority = buffer.readByte();
+ _priority = buffer.get();
}
if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
{
- _correlationId = buffer.readAMQShortString();
+ _correlationId = AMQShortString.readAMQShortString(buffer);
}
if ((_propertyFlags & REPLY_TO_MASK) != 0)
{
- _replyTo = buffer.readAMQShortString();
+ _replyTo = AMQShortString.readAMQShortString(buffer);
}
if ((_propertyFlags & EXPIRATION_MASK) != 0)
@@ -553,35 +547,34 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
{
- _messageId = buffer.readAMQShortString();
+ _messageId = AMQShortString.readAMQShortString(buffer);
}
if ((_propertyFlags & TIMESTAMP_MASK) != 0)
{
- _timestamp = EncodingUtils.readTimestamp(buffer);
+ _timestamp = buffer.getLong();
}
if ((_propertyFlags & TYPE_MASK) != 0)
{
- _type = buffer.readAMQShortString();
+ _type = AMQShortString.readAMQShortString(buffer);
}
if ((_propertyFlags & USER_ID_MASK) != 0)
{
- _userId = buffer.readAMQShortString();
+ _userId = AMQShortString.readAMQShortString(buffer);
}
if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
{
- _appId = buffer.readAMQShortString();
+ _appId = AMQShortString.readAMQShortString(buffer);
}
if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
{
- _clusterId = buffer.readAMQShortString();
+ _clusterId = AMQShortString.readAMQShortString(buffer);
}
-
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
Wed Jan 6 16:16:11 2016
@@ -27,11 +27,8 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
public class BasicDeliverBody extends AMQMethodBodyImpl implements
EncodableAMQDataBlock, AMQMethodBody
{
@@ -46,16 +43,6 @@ public class BasicDeliverBody extends AM
private final AMQShortString _exchange; // [exchange]
private final AMQShortString _routingKey; // [routingKey]
- // Constructor
- public BasicDeliverBody(MarkableDataInput buffer) throws
AMQFrameDecodingException, IOException
- {
- _consumerTag = buffer.readAMQShortString();
- _deliveryTag = buffer.readLong();
- _bitfield0 = buffer.readByte();
- _exchange = buffer.readAMQShortString();
- _routingKey = buffer.readAMQShortString();
- }
-
public BasicDeliverBody(
AMQShortString consumerTag,
long deliveryTag,
@@ -152,15 +139,15 @@ public class BasicDeliverBody extends AM
return buf.toString();
}
- public static void process(final MarkableDataInput buffer,
- final ClientChannelMethodProcessor dispatcher)
throws IOException
+ public static void process(final QpidByteBuffer buffer,
+ final ClientChannelMethodProcessor dispatcher)
{
- AMQShortString consumerTag = buffer.readAMQShortString();
- long deliveryTag = buffer.readLong();
- boolean redelivered = (buffer.readByte() & 0x01) != 0;
- AMQShortString exchange = buffer.readAMQShortString();
- AMQShortString routingKey = buffer.readAMQShortString();
+ AMQShortString consumerTag = AMQShortString.readAMQShortString(buffer);
+ long deliveryTag = buffer.getLong();
+ boolean redelivered = (buffer.get() & 0x01) != 0;
+ AMQShortString exchange = AMQShortString.readAMQShortString(buffer);
+ AMQShortString routingKey = AMQShortString.readAMQShortString(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveBasicDeliver(consumerTag, deliveryTag,
redelivered, exchange, routingKey);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
Wed Jan 6 16:16:11 2016
@@ -27,11 +27,8 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
public class BasicGetBody extends AMQMethodBodyImpl implements
EncodableAMQDataBlock, AMQMethodBody
{
@@ -44,14 +41,6 @@ public class BasicGetBody extends AMQMet
private final AMQShortString _queue; // [queue]
private final byte _bitfield0; // [noAck]
- // Constructor
- public BasicGetBody(MarkableDataInput buffer) throws
AMQFrameDecodingException, IOException
- {
- _ticket = buffer.readUnsignedShort();
- _queue = buffer.readAMQShortString();
- _bitfield0 = buffer.readByte();
- }
-
public BasicGetBody(
int ticket,
AMQShortString queue,
@@ -125,14 +114,13 @@ public class BasicGetBody extends AMQMet
return buf.toString();
}
- public static void process(final MarkableDataInput buffer,
+ public static void process(final QpidByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
- throws IOException
{
- int ticket = buffer.readUnsignedShort();
- AMQShortString queue = buffer.readAMQShortString();
- boolean noAck = (buffer.readByte() & 0x01) != 0;
+ int ticket = buffer.getUnsignedShort();
+ AMQShortString queue = AMQShortString.readAMQShortString(buffer);
+ boolean noAck = (buffer.get() & 0x01) != 0;
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveBasicGet(queue, noAck);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
Wed Jan 6 16:16:11 2016
@@ -27,11 +27,8 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
public class BasicGetEmptyBody extends AMQMethodBodyImpl implements
EncodableAMQDataBlock, AMQMethodBody
{
@@ -42,12 +39,6 @@ public class BasicGetEmptyBody extends A
// Fields declared in specification
private final AMQShortString _clusterId; // [clusterId]
- // Constructor
- public BasicGetEmptyBody(MarkableDataInput buffer) throws
AMQFrameDecodingException, IOException
- {
- _clusterId = buffer.readAMQShortString();
- }
-
public BasicGetEmptyBody(
AMQShortString clusterId
)
@@ -96,10 +87,10 @@ public class BasicGetEmptyBody extends A
return buf.toString();
}
- public static void process(final MarkableDataInput buffer,
- final ClientChannelMethodProcessor dispatcher)
throws IOException
+ public static void process(final QpidByteBuffer buffer,
+ final ClientChannelMethodProcessor dispatcher)
{
- AMQShortString clusterId = buffer.readAMQShortString();
+ AMQShortString clusterId = AMQShortString.readAMQShortString(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveBasicGetEmpty();
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java?rev=1723359&r1=1723358&r2=1723359&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
Wed Jan 6 16:16:11 2016
@@ -27,11 +27,8 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
public class BasicGetOkBody extends AMQMethodBodyImpl implements
EncodableAMQDataBlock, AMQMethodBody
{
@@ -46,16 +43,6 @@ public class BasicGetOkBody extends AMQM
private final AMQShortString _routingKey; // [routingKey]
private final long _messageCount; // [messageCount]
- // Constructor
- public BasicGetOkBody(MarkableDataInput buffer) throws
AMQFrameDecodingException, IOException
- {
- _deliveryTag = buffer.readLong();
- _bitfield0 = buffer.readByte();
- _exchange = buffer.readAMQShortString();
- _routingKey = buffer.readAMQShortString();
- _messageCount = EncodingUtils.readUnsignedInteger(buffer);
- }
-
public BasicGetOkBody(
long deliveryTag,
boolean redelivered,
@@ -151,14 +138,14 @@ public class BasicGetOkBody extends AMQM
return buf.toString();
}
- public static void process(final MarkableDataInput buffer,
- final ClientChannelMethodProcessor dispatcher)
throws IOException
+ public static void process(final QpidByteBuffer buffer,
+ final ClientChannelMethodProcessor dispatcher)
{
- long deliveryTag = buffer.readLong();
- boolean redelivered = (buffer.readByte() & 0x01) != 0;
- AMQShortString exchange = buffer.readAMQShortString();
- AMQShortString routingKey = buffer.readAMQShortString();
- long messageCount = EncodingUtils.readUnsignedInteger(buffer);
+ long deliveryTag = buffer.getLong();
+ boolean redelivered = (buffer.get() & 0x01) != 0;
+ AMQShortString exchange = AMQShortString.readAMQShortString(buffer);
+ AMQShortString routingKey = AMQShortString.readAMQShortString(buffer);
+ long messageCount = buffer.getUnsignedInt();
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveBasicGetOk(deliveryTag, redelivered, exchange,
routingKey, messageCount);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]