Repository: qpid-jms Updated Branches: refs/heads/master 8f876f926 -> 436d406ff
QPIDJMS-383 Use new APIs to reduce buffer copying Use the new send and receive methods in proton-j 0.27.0 to reduce buffer copies when sending and receiving. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/436d406f Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/436d406f Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/436d406f Branch: refs/heads/master Commit: 436d406ffce8d8ee0252bcfbf4aa35d1e36668a4 Parents: 8f876f9 Author: Timothy Bish <[email protected]> Authored: Thu Mar 29 18:44:08 2018 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Apr 23 15:08:30 2018 -0400 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpConsumer.java | 28 +- .../jms/provider/amqp/AmqpFixedProducer.java | 3 +- .../jms/provider/amqp/message/AmqpCodec.java | 22 +- .../amqp/message/AmqpMessageSupport.java | 7 +- .../amqp/message/AmqpReadableBuffer.java | 219 +++++++++ .../amqp/message/AmqpReadableBufferTest.java | 444 +++++++++++++++++++ 6 files changed, 680 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index be2438e..c39b55d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -47,9 +47,6 @@ import org.apache.qpid.proton.engine.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - /** * AMQP Consumer object that is used to manage JMS MessageConsumer semantics. */ @@ -57,12 +54,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class); - private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128; - protected final AmqpSession session; protected AsyncResult stopRequest; protected AsyncResult pullRequest; - protected final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY); protected long incomingSequence; protected long deliveredCount; protected boolean deferredClose; @@ -485,7 +479,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver JmsMessage message = null; try { - message = AmqpCodec.decodeMessage(this, unwrapIncomingMessage(incoming)).asJmsMessage(); + message = AmqpCodec.decodeMessage(this, getEndpoint().recv()).asJmsMessage(); } catch (Exception e) { LOG.warn("Error on transform: {}", e.getMessage()); // TODO - We could signal provider error but not sure we want to fail @@ -495,8 +489,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver // a bytes messages as a fall back. settleDelivery(incoming, MODIFIED_FAILED_UNDELIVERABLE); return false; - } finally { - incomingBuffer.clear(); } try { @@ -585,24 +577,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } } - protected ByteBuf unwrapIncomingMessage(Delivery incoming) { - int count; - - // Attempt to preemptively size the buffer for the incoming delivery. - if (incomingBuffer.capacity() < incoming.available()) { - incomingBuffer.capacity(incoming.available()); - } - - while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) { - incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count); - if (!incomingBuffer.isWritable()) { - incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5)); - } - } - - return incomingBuffer; - } - public void preCommit() { } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index e93d74e..b2036fe 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -32,6 +32,7 @@ import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsProducerInfo; import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.amqp.message.AmqpReadableBuffer; import org.apache.qpid.jms.util.IOExceptionSupport; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Modified; @@ -138,7 +139,7 @@ public class AmqpFixedProducer extends AmqpProducer { // Write the already encoded AMQP message into the Sender ByteBuf encoded = (ByteBuf) envelope.getPayload(); - getEndpoint().send(encoded.array(), encoded.arrayOffset() + encoded.readerIndex(), encoded.readableBytes()); + getEndpoint().sendNoCopy(new AmqpReadableBuffer(encoded.duplicate())); AmqpProvider provider = getParent().getProvider(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java index 85d8d06..733294f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java @@ -28,7 +28,6 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIA import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.isContentType; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -50,6 +49,7 @@ import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.codec.AMQPDefinedTypes; import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.WritableBuffer; import io.netty.buffer.ByteBuf; @@ -196,11 +196,10 @@ public final class AmqpCodec { * * @throws IOException if an error occurs while creating the message objects. */ - public static AmqpJmsMessageFacade decodeMessage(AmqpConsumer consumer, ByteBuf messageBytes) throws IOException { + public static AmqpJmsMessageFacade decodeMessage(AmqpConsumer consumer, ReadableBuffer messageBytes) throws IOException { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = messageBytes.nioBuffer(); - decoder.setByteBuffer(buffer); + decoder.setBuffer(messageBytes); Header header = null; DeliveryAnnotations deliveryAnnotations = null; @@ -211,13 +210,13 @@ public final class AmqpCodec { Footer footer = null; Section section = null; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } if (section instanceof Header) { header = (Header) section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -227,7 +226,7 @@ public final class AmqpCodec { if (section instanceof DeliveryAnnotations) { deliveryAnnotations = (DeliveryAnnotations) section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -237,7 +236,7 @@ public final class AmqpCodec { if (section instanceof MessageAnnotations) { messageAnnotations = (MessageAnnotations) section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -247,7 +246,7 @@ public final class AmqpCodec { if (section instanceof Properties) { properties = (Properties) section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -257,7 +256,7 @@ public final class AmqpCodec { if (section instanceof ApplicationProperties) { applicationProperties = (ApplicationProperties) section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -267,7 +266,7 @@ public final class AmqpCodec { if (section != null && !(section instanceof Footer)) { body = section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -279,7 +278,6 @@ public final class AmqpCodec { } decoder.setByteBuffer(null); - messageBytes.resetReaderIndex(); // First we try the easy way, if the annotation is there we don't have to work hard. AmqpJmsMessageFacade result = createFromMsgAnnotation(messageAnnotations); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java index 702870b..3303628 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java @@ -16,14 +16,15 @@ */ package org.apache.qpid.jms.provider.amqp.message; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.message.Message; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; /** * Support class containing constant values and static methods that are @@ -179,7 +180,7 @@ public final class AmqpMessageSupport { * * @return a buffer containing the wire level representation of the input Message. */ - public static ByteBuf encodeMessage(Message message) { + public static ReadableBuffer encodeMessage(Message message) { final int BUFFER_SIZE = 4096; byte[] encodedMessage = new byte[BUFFER_SIZE]; int encodedSize = 0; @@ -192,6 +193,6 @@ public final class AmqpMessageSupport { } } - return Unpooled.wrappedBuffer(encodedMessage, 0, encodedSize); + return ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(encodedMessage, 0, encodedSize)); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java new file mode 100644 index 0000000..7f14bf4 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.StandardCharsets; + +import org.apache.qpid.proton.codec.ReadableBuffer; +import org.apache.qpid.proton.codec.WritableBuffer; + +import io.netty.buffer.ByteBuf; + +/** + * ReadableBuffer implementation that wraps a Netty ByteBuf + */ +public class AmqpReadableBuffer implements ReadableBuffer { + + private ByteBuf buffer; + + public AmqpReadableBuffer(ByteBuf buffer) { + this.buffer = buffer; + } + + public ByteBuf getBuffer() { + return buffer; + } + + @Override + public int capacity() { + return buffer.capacity(); + } + + @Override + public boolean hasArray() { + return buffer.hasArray(); + } + + @Override + public byte[] array() { + return buffer.array(); + } + + @Override + public int arrayOffset() { + return buffer.arrayOffset() + buffer.readerIndex(); + } + + @Override + public ReadableBuffer reclaimRead() { + return this; + } + + @Override + public byte get() { + return buffer.readByte(); + } + + @Override + public byte get(int index) { + return buffer.getByte(index); + } + + @Override + public int getInt() { + return buffer.readInt(); + } + + @Override + public long getLong() { + return buffer.readLong(); + } + + @Override + public short getShort() { + return buffer.readShort(); + } + + @Override + public float getFloat() { + return buffer.readFloat(); + } + + @Override + public double getDouble() { + return buffer.readDouble(); + } + + @Override + public ReadableBuffer get(byte[] target, int offset, int length) { + buffer.readBytes(target, offset, length); + return this; + } + + @Override + public ReadableBuffer get(byte[] target) { + buffer.readBytes(target); + return this; + } + + @Override + public ReadableBuffer get(WritableBuffer target) { + int start = target.position(); + + if (buffer.hasArray()) { + target.put(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes()); + } else { + target.put(buffer.nioBuffer()); + } + + int written = target.position() - start; + + buffer.readerIndex(buffer.readerIndex() + written); + + return this; + } + + @Override + public ReadableBuffer slice() { + return new AmqpReadableBuffer(buffer.slice()); + } + + @Override + public ReadableBuffer flip() { + buffer.setIndex(0, buffer.readerIndex()); + return this; + } + + @Override + public ReadableBuffer limit(int limit) { + buffer.writerIndex(limit); + return this; + } + + @Override + public int limit() { + return buffer.writerIndex(); + } + + @Override + public ReadableBuffer position(int position) { + buffer.readerIndex(position); + return this; + } + + @Override + public int position() { + return buffer.readerIndex(); + } + + @Override + public ReadableBuffer mark() { + buffer.markReaderIndex(); + return this; + } + + @Override + public ReadableBuffer reset() { + buffer.resetReaderIndex(); + return this; + } + + @Override + public ReadableBuffer rewind() { + buffer.readerIndex(0); + return this; + } + + @Override + public ReadableBuffer clear() { + buffer.setIndex(0, buffer.capacity()); + return this; + } + + @Override + public int remaining() { + return buffer.readableBytes(); + } + + @Override + public boolean hasRemaining() { + return buffer.isReadable(); + } + + @Override + public ReadableBuffer duplicate() { + return new AmqpReadableBuffer(buffer.duplicate()); + } + + @Override + public ByteBuffer byteBuffer() { + return buffer.nioBuffer(); + } + + @Override + public String readUTF8() throws CharacterCodingException { + return buffer.toString(StandardCharsets.UTF_8); + } + + @Override + public String readString(CharsetDecoder decoder) throws CharacterCodingException { + return buffer.toString(StandardCharsets.UTF_8); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java new file mode 100644 index 0000000..59bd371 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.StandardCharsets; + +import org.apache.qpid.proton.codec.ReadableBuffer; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * Tests for the ReadableBuffer wrapper that uses Netty ByteBuf underneath + */ +public class AmqpReadableBufferTest { + + @Test + public void testWrapBuffer() { + ByteBuf byteBuffer = Unpooled.buffer(100, 100); + + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertEquals(100, buffer.capacity()); + assertSame(byteBuffer, buffer.getBuffer()); + assertSame(buffer, buffer.reclaimRead()); + } + + @Test + public void testArrayAccess() { + ByteBuf byteBuffer = Unpooled.buffer(100, 100); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertTrue(buffer.hasArray()); + assertSame(buffer.array(), byteBuffer.array()); + assertEquals(buffer.arrayOffset(), byteBuffer.arrayOffset()); + } + + @Test + public void testArrayAccessWhenNoArray() { + ByteBuf byteBuffer = Unpooled.directBuffer(); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertFalse(buffer.hasArray()); + } + + @Test + public void testByteBuffer() { + byte[] data = new byte[] { 0, 1, 2, 3, 4 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + ByteBuffer nioBuffer = buffer.byteBuffer(); + assertEquals(data.length, nioBuffer.remaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], nioBuffer.get()); + } + } + + @Test + public void testGet() { + byte[] data = new byte[] { 0, 1, 2, 3, 4 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + + assertFalse(buffer.hasRemaining()); + + try { + buffer.get(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) {} + } + + @Test + public void testGetIndex() { + byte[] data = new byte[] { 0, 1, 2, 3, 4 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get(i)); + } + + assertTrue(buffer.hasRemaining()); + } + + @Test + public void testGetShort() { + byte[] data = new byte[] { 0, 1 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertEquals(1, buffer.getShort()); + assertFalse(buffer.hasRemaining()); + + try { + buffer.getShort(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) {} + } + + @Test + public void testGetInt() { + byte[] data = new byte[] { 0, 0, 0, 1 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertEquals(1, buffer.getInt()); + assertFalse(buffer.hasRemaining()); + + try { + buffer.getInt(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) {} + } + + @Test + public void testGetLong() { + byte[] data = new byte[] { 0, 0, 0, 0, 0, 0, 0, 1 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertEquals(1, buffer.getLong()); + assertFalse(buffer.hasRemaining()); + + try { + buffer.getLong(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) {} + } + + @Test + public void testGetFloat() { + byte[] data = new byte[] { 0, 0, 0, 0 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertEquals(0, buffer.getFloat(), 0.0); + assertFalse(buffer.hasRemaining()); + + try { + buffer.getFloat(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) {} + } + + @Test + public void testGetDouble() { + byte[] data = new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertEquals(0, buffer.getDouble(), 0.0); + assertFalse(buffer.hasRemaining()); + + try { + buffer.getDouble(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) {} + } + + @Test + public void testGetBytes() { + byte[] data = new byte[] { 0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + byte[] target = new byte[data.length]; + + buffer.get(target); + assertFalse(buffer.hasRemaining()); + assertArrayEquals(data, target); + + try { + buffer.get(target); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) {} + } + + @Test + public void testGetBytesIntInt() { + byte[] data = new byte[] { 0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + byte[] target = new byte[data.length]; + + buffer.get(target, 0, target.length); + assertFalse(buffer.hasRemaining()); + assertArrayEquals(data, target); + + try { + buffer.get(target, 0, target.length); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) {} + } + + @Test + public void testGetBytesToWritableBuffer() { + byte[] data = new byte[] { 0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length); + AmqpWritableBuffer target = new AmqpWritableBuffer(targetBuffer); + + buffer.get(target); + assertFalse(buffer.hasRemaining()); + assertArrayEquals(targetBuffer.array(), data); + } + + @Test + public void testGetBytesToWritableBufferThatIsDirect() { + byte[] data = new byte[] { 0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.directBuffer(data.length, data.length); + byteBuffer.writeBytes(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length); + AmqpWritableBuffer target = new AmqpWritableBuffer(targetBuffer); + + buffer.get(target); + assertFalse(buffer.hasRemaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], target.getBuffer().readByte()); + } + } + + @Test + public void testDuplicate() { + byte[] data = new byte[] { 0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + ReadableBuffer duplicate = buffer.duplicate(); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], duplicate.get()); + } + + assertFalse(duplicate.hasRemaining()); + } + + @Test + public void testSlice() { + byte[] data = new byte[] { 0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + ReadableBuffer slice = buffer.slice(); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], slice.get()); + } + + assertFalse(slice.hasRemaining()); + } + + @Test + public void testLimit() { + byte[] data = new byte[] { 1, 2 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertEquals(data.length, buffer.limit()); + buffer.limit(1); + assertEquals(1, buffer.limit()); + assertEquals(1, buffer.get()); + assertFalse(buffer.hasRemaining()); + + try { + buffer.get(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) {} + } + + @Test + public void testClear() { + byte[] data = new byte[] { 0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + byte[] target = new byte[data.length]; + + buffer.get(target); + assertFalse(buffer.hasRemaining()); + assertArrayEquals(data, target); + + try { + buffer.get(target); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) {} + + buffer.clear(); + assertTrue(buffer.hasRemaining()); + assertEquals(data.length, buffer.remaining()); + buffer.get(target); + assertFalse(buffer.hasRemaining()); + assertArrayEquals(data, target); + } + + @Test + public void testRewind() { + byte[] data = new byte[] { 0, 1, 2, 3, 4 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + + assertFalse(buffer.hasRemaining()); + buffer.rewind(); + assertTrue(buffer.hasRemaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + } + + @Test + public void testReset() { + byte[] data = new byte[] { 0, 1, 2, 3, 4 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + buffer.mark(); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + + assertFalse(buffer.hasRemaining()); + buffer.reset(); + assertTrue(buffer.hasRemaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + } + + @Test + public void testGetPosition() { + byte[] data = new byte[] { 0, 1, 2, 3, 4 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertEquals(buffer.position(), 0); + for (int i = 0; i < data.length; i++) { + assertEquals(buffer.position(), i); + assertEquals(data[i], buffer.get()); + assertEquals(buffer.position(), i + 1); + } + } + + @Test + public void testSetPosition() { + byte[] data = new byte[] { 0, 1, 2, 3, 4 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + + assertFalse(buffer.hasRemaining()); + buffer.position(0); + assertTrue(buffer.hasRemaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + } + + @Test + public void testFlip() { + byte[] data = new byte[] { 0, 1, 2, 3, 4 }; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + buffer.mark(); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + + assertFalse(buffer.hasRemaining()); + buffer.flip(); + assertTrue(buffer.hasRemaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + } + + @Test + public void testReadUTF8() throws CharacterCodingException { + String testString = "test-string-1"; + byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8); + ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes); + + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertEquals(testString, buffer.readUTF8()); + } + + @Test + public void testReadString() throws CharacterCodingException { + String testString = "test-string-1"; + byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8); + ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes); + + AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer); + + assertEquals(testString, buffer.readString(StandardCharsets.UTF_8.newDecoder())); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
