PROTON-1672 Handle multi-frame transfer payloads more efficiently Replace reallocation and consolidation of transfer payloads when multiple framed transfers are inbound. Creates a CompositeReadableBuffer that can be used to house the assembled payload for use in the decoder. The decoder implementation refactored to handle ReadableBuffer as the source of bytes as well as ByteBuffer. Adds no-copy method variants to the Sender and Receiver API such that clients or servers can process inbound and outbound deliveries without copying the payloads when it is known to be safe not to copy.
Adds tests and jacoco reports to validate test coverage. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/commit/ec554715 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/tree/ec554715 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/diff/ec554715 Branch: refs/heads/master Commit: ec5547152fed5a74afbe11bbb33541a2cb149fa4 Parents: c4cd774 Author: Timothy Bish <tabish...@gmail.com> Authored: Wed Apr 11 16:00:33 2018 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Wed Apr 11 16:00:33 2018 -0400 ---------------------------------------------------------------------- pom.xml | 33 + .../org/apache/qpid/proton/amqp/Binary.java | 25 +- .../org/apache/qpid/proton/codec/ArrayType.java | 14 +- .../apache/qpid/proton/codec/BinaryType.java | 6 +- .../proton/codec/CompositeReadableBuffer.java | 744 ++++++ .../proton/codec/CompositeWritableBuffer.java | 21 + .../apache/qpid/proton/codec/DecoderImpl.java | 22 +- .../proton/codec/DroppingWritableBuffer.java | 8 +- .../codec/FixedSizePrimitiveTypeEncoding.java | 2 +- .../org/apache/qpid/proton/codec/ListType.java | 9 +- .../org/apache/qpid/proton/codec/MapType.java | 13 +- .../qpid/proton/codec/ReadableBuffer.java | 395 ++- .../apache/qpid/proton/codec/StringType.java | 21 +- .../apache/qpid/proton/codec/SymbolType.java | 39 +- .../qpid/proton/codec/WritableBuffer.java | 28 + .../codec/messaging/FastPathAcceptedType.java | 10 +- .../codec/messaging/FastPathHeaderType.java | 10 +- .../codec/messaging/FastPathPropertiesType.java | 10 +- .../transport/FastPathDispositionType.java | 10 +- .../codec/transport/FastPathFlowType.java | 10 +- .../codec/transport/FastPathTransferType.java | 10 +- .../org/apache/qpid/proton/engine/Receiver.java | 12 + .../org/apache/qpid/proton/engine/Sender.java | 16 + .../qpid/proton/engine/impl/DeliveryImpl.java | 198 +- .../qpid/proton/engine/impl/FrameWriter.java | 13 +- .../qpid/proton/engine/impl/ReceiverImpl.java | 18 + .../qpid/proton/engine/impl/SenderImpl.java | 19 + .../qpid/proton/engine/impl/TransportImpl.java | 21 +- .../proton/engine/impl/TransportSession.java | 22 +- .../qpid/proton/message/impl/MessageImpl.java | 11 +- .../org/apache/qpid/proton/codec/Benchmark.java | 102 +- .../codec/CompositeReadableBufferTest.java | 2305 ++++++++++++++++++ .../proton/engine/impl/DeliveryImplTest.java | 577 ++++- .../proton/engine/impl/TransportImplTest.java | 4 +- 34 files changed, 4502 insertions(+), 256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ba6869b..945058f 100644 --- a/pom.xml +++ b/pom.xml @@ -42,6 +42,10 @@ <!-- Plugin versions --> <maven-bundle-plugin-version>3.2.0</maven-bundle-plugin-version> + <jacoco-plugin-version>0.7.9</jacoco-plugin-version> + + <!-- surefire forked jvm arguments --> + <argLine>-Xmx2g -enableassertions ${jacoco-config}</argLine> </properties> <dependencyManagement> @@ -116,6 +120,20 @@ </excludes> </configuration> </plugin> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>prepare-agent</goal> + </goals> + </execution> + </executions> + <configuration> + <propertyName>jacoco-config</propertyName> + </configuration> + </plugin> </plugins> <pluginManagement> <plugins> @@ -154,6 +172,11 @@ <pushChanges>true</pushChanges> </configuration> </plugin> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + <version>${jacoco-plugin-version}</version> + </plugin> </plugins> </pluginManagement> </build> @@ -181,6 +204,16 @@ <url>https://builds.apache.org/view/M-R/view/Qpid/job/Qpid-proton-j/</url> </ciManagement> + <reporting> + <plugins> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + <version>${jacoco-plugin-version}</version> + </plugin> + </plugins> + </reporting> + <profiles> <!-- Override the apache-release profile from the parent pom --> <profile> http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java index aac3fc5..ab1bfe5 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java @@ -23,6 +23,8 @@ package org.apache.qpid.proton.amqp; import java.nio.ByteBuffer; import java.util.Collection; +import org.apache.qpid.proton.codec.ReadableBuffer; + public final class Binary { @@ -167,7 +169,26 @@ public final class Binary return new Binary(_data, _offset+offset, length); } - public static Binary create(ByteBuffer buffer) + public static Binary create(ReadableBuffer buffer) + { + if (buffer == null) + { + return null; + } + else if (!buffer.hasArray()) + { + byte data[] = new byte [buffer.remaining()]; + ReadableBuffer dup = buffer.duplicate(); + dup.get(data); + return new Binary(data); + } + else + { + return new Binary(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + } + } + + public static Binary create(ByteBuffer buffer) { if( buffer == null ) return null; @@ -178,7 +199,7 @@ public final class Binary dup.get(data); return new Binary(data); } - else + else { return new Binary(buffer.array(), buffer.arrayOffset()+buffer.position(), buffer.remaining()); } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java index 32d6f85..f4f0c8a 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java @@ -21,7 +21,6 @@ package org.apache.qpid.proton.codec; import java.lang.reflect.Array; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; @@ -48,7 +47,7 @@ public class ArrayType implements PrimitiveType<Object[]> void writeValue(double[] a); void writeValue(char[] a); - void setValue(Object[] val, TypeEncoding encoder, int size); + void setValue(Object[] val, TypeEncoding<?> encoder, int size); int getSizeBytes(); @@ -92,7 +91,7 @@ public class ArrayType implements PrimitiveType<Object[]> public ArrayEncoding getEncoding(final Object[] val) { - TypeEncoding encoder = calculateEncoder(val,_encoder); + TypeEncoding<?> encoder = calculateEncoder(val,_encoder); int size = calculateSize(val, encoder); ArrayEncoding arrayEncoding = (val.length > 255 || size > 254) ? _arrayEncoding @@ -101,7 +100,7 @@ public class ArrayType implements PrimitiveType<Object[]> return arrayEncoding; } - private static TypeEncoding calculateEncoder(final Object[] val, final EncoderImpl encoder) + private static TypeEncoding<?> calculateEncoder(final Object[] val, final EncoderImpl encoder) { if(val.length == 0) @@ -156,7 +155,6 @@ public class ArrayType implements PrimitiveType<Object[]> } else { - if(underlyingType == null) { checkTypes = true; @@ -173,7 +171,6 @@ public class ArrayType implements PrimitiveType<Object[]> .getType(val[i]) + " in array"); } - TypeEncoding elementEncoding = underlyingType.getEncoding(val[i]); if(elementEncoding != underlyingEncoding && !underlyingEncoding.encodesSuperset(elementEncoding)) { @@ -438,7 +435,6 @@ public class ArrayType implements PrimitiveType<Object[]> extends LargeFloatingSizePrimitiveTypeEncoding<Object[]> implements ArrayEncoding { - private Object[] _val; private TypeEncoding _underlyingEncoder; private int _size; @@ -635,7 +631,7 @@ public class ArrayType implements PrimitiveType<Object[]> public void skipValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = decoder.readRawInt(); buffer.position(buffer.position() + size); } @@ -900,7 +896,7 @@ public class ArrayType implements PrimitiveType<Object[]> public void skipValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = ((int)decoder.readRawByte()) & 0xFF; buffer.position(buffer.position() + size); } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java index 1d739b8..be0a8f5 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java @@ -22,7 +22,6 @@ package org.apache.qpid.proton.codec; import org.apache.qpid.proton.amqp.Binary; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; @@ -135,7 +134,7 @@ public class BinaryType extends AbstractPrimitiveType<Binary> public void skipValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = decoder.readRawInt(); buffer.position(buffer.position() + size); } @@ -190,7 +189,8 @@ public class BinaryType extends AbstractPrimitiveType<Binary> public void skipValue() { - ByteBuffer buffer = getDecoder().getByteBuffer(); + DecoderImpl decoder = getDecoder(); + ReadableBuffer buffer = decoder.getBuffer(); int size = ((int)getDecoder().readRawByte()) & 0xff; buffer.position(buffer.position() + size); } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java new file mode 100644 index 0000000..e6652e4 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java @@ -0,0 +1,744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.codec; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.InvalidMarkException; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * ReadableBuffer implementation whose content is made up of one or more + * byte arrays. + */ +public class CompositeReadableBuffer implements ReadableBuffer { + + private static final List<byte[]> EMPTY_LIST = Collections.unmodifiableList(new ArrayList<byte[]>()); + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(new byte[0]); + private static final CompositeReadableBuffer EMPTY_SLICE = new CompositeReadableBuffer(true); + private static int UNSET_MARK = -1; + + private ArrayList<byte[]> contents; + + // Track active array and our offset into it. + private int currentArrayIndex = -1; + private byte[] currentArray; + private int currentOffset; + + // State global to the buffer. + private int position; + private int limit; + private int capacity; + private int mark = -1; + private boolean compactable = true; + + /** + * Creates a default empty composite buffer + */ + public CompositeReadableBuffer() { + } + + private CompositeReadableBuffer(byte[] array, int offset) { + this.currentArray = array; + this.currentOffset = offset; + this.capacity = array.length; + this.limit = capacity; + } + + private CompositeReadableBuffer(boolean compactable) { + this.compactable = compactable; + } + + public List<byte[]> getArrays() { + return contents == null ? EMPTY_LIST : Collections.unmodifiableList(contents); + } + + public int getCurrentIndex() { + return currentArrayIndex; + } + + @Override + public boolean hasArray() { + return currentArray != null && (contents == null || contents.size() == 1); + } + + public int capacity() { + return capacity; + } + + @Override + public byte[] array() { + if (hasArray()) { + return currentArray; + } + + throw new UnsupportedOperationException("Buffer not backed by a single array"); + } + + @Override + public int arrayOffset() { + if (hasArray()) { + return currentOffset; + } + + throw new UnsupportedOperationException("Buffer not backed by a single array"); + } + + @Override + public byte get() { + if (position == limit) { + throw new BufferUnderflowException(); + } + + final byte result = currentArray[currentOffset++]; + position++; + maybeMoveToNextArray(); + + return result; + } + + @Override + public byte get(int index) { + if (index < 0 || index >= limit) { + throw new IndexOutOfBoundsException("The given index is not valid: " + index); + } + + byte result = 0; + + if (index == position) { + result = currentArray[currentOffset]; + } else if (index < position) { + result = getBackwards(index); + } else { + result = getForward(index); + } + + return result; + } + + private byte getForward(int index) { + byte result = 0; + + int currentArrayIndex = this.currentArrayIndex; + int currentOffset = this.currentOffset; + byte[] currentArray = this.currentArray; + + for (int amount = index - position; amount >= 0;) { + if (amount < currentArray.length - currentOffset) { + result = currentArray[currentOffset + amount]; + break; + } else { + amount -= currentArray.length - currentOffset; + currentArray = contents.get(++currentArrayIndex); + currentOffset = 0; + } + } + + return result; + } + + private byte getBackwards(int index) { + byte result = 0; + + int currentArrayIndex = this.currentArrayIndex; + int currentOffset = this.currentOffset; + byte[] currentArray = this.currentArray; + + for (int amount = position - index; amount >= 0;) { + if ((currentOffset - amount) >= 0) { + result = currentArray[currentOffset - amount]; + break; + } else { + amount -= currentOffset; + currentArray = contents.get(--currentArrayIndex); + currentOffset = currentArray.length; + } + } + + return result; + } + + @Override + public int getInt() { + if (remaining() < Integer.BYTES) { + throw new BufferUnderflowException(); + } + + int result = 0; + + if (currentArray.length - currentOffset >= 4) { + result = (int)(currentArray[currentOffset++] & 0xFF) << 24 | + (int)(currentArray[currentOffset++] & 0xFF) << 16 | + (int)(currentArray[currentOffset++] & 0xFF) << 8 | + (int)(currentArray[currentOffset++] & 0xFF) << 0; + } else { + for (int i = Integer.BYTES - 1; i >= 0; --i) { + result |= (int)(currentArray[currentOffset++] & 0xFF) << (i * Byte.SIZE); + maybeMoveToNextArray(); + } + } + + position += 4; + + return result; + } + + @Override + public long getLong() { + if (remaining() < Long.BYTES) { + throw new BufferUnderflowException(); + } + + long result = 0; + + if (currentArray.length - currentOffset >= 8) { + result = (long)(currentArray[currentOffset++] & 0xFF) << 56 | + (long)(currentArray[currentOffset++] & 0xFF) << 48 | + (long)(currentArray[currentOffset++] & 0xFF) << 40 | + (long)(currentArray[currentOffset++] & 0xFF) << 32 | + (long)(currentArray[currentOffset++] & 0xFF) << 24 | + (long)(currentArray[currentOffset++] & 0xFF) << 16 | + (long)(currentArray[currentOffset++] & 0xFF) << 8 | + (long)(currentArray[currentOffset++] & 0xFF) << 0; + } else { + for (int i = Long.BYTES - 1; i >= 0; --i) { + result |= (long)(currentArray[currentOffset++] & 0xFF) << (i * Byte.SIZE); + maybeMoveToNextArray(); + } + } + + position += 8; + + return result; + } + + @Override + public short getShort() { + if (remaining() < Short.BYTES) { + throw new BufferUnderflowException(); + } + + short result = 0; + + for (int i = Short.BYTES - 1; i >= 0; --i) { + result |= (currentArray[currentOffset++] & 0xFF) << (i * Byte.SIZE); + maybeMoveToNextArray(); + } + + position += 2; + + return result; + } + + @Override + public float getFloat() { + return Float.intBitsToFloat(getInt()); + } + + @Override + public double getDouble() { + return Double.longBitsToDouble(getLong()); + } + + @Override + public CompositeReadableBuffer get(byte[] data) { + return get(data, 0, data.length); + } + + @Override + public CompositeReadableBuffer get(byte[] data, int offset, int length) { + validateReadTarget(data.length, offset, length); + + if (length > remaining()) { + throw new BufferUnderflowException(); + } + + int copied = 0; + while (length > 0) { + final int chunk = Math.min((currentArray.length - currentOffset), length); + System.arraycopy(currentArray, currentOffset, data, offset + copied, chunk); + + currentOffset += chunk; + length -= chunk; + copied += chunk; + + maybeMoveToNextArray(); + } + + position += copied; + + return this; + } + + @Override + public CompositeReadableBuffer get(WritableBuffer target) { + int length = Math.min(target.remaining(), remaining()); + + do { + final int chunk = Math.min((currentArray.length - currentOffset), length); + + if (chunk == 0) { + break; // This buffer is out of data + } + + target.put(currentArray, currentOffset, chunk); + + currentOffset += chunk; + position += chunk; + length -= chunk; + + maybeMoveToNextArray(); + } while (length > 0); + + return this; + } + + @Override + public CompositeReadableBuffer position(int position) { + if (position < 0 || position > limit) { + throw new IllegalArgumentException("position must be non-negative and no greater than the limit"); + } + + int moveBy = position - this.position; + if (moveBy >= 0) { + moveForward(moveBy); + } else { + moveBackwards(Math.abs(moveBy)); + } + + this.position = position; + + if (mark > position) { + mark = UNSET_MARK; + } + + return this; + } + + private void moveForward(int moveBy) { + while (moveBy > 0) { + if (moveBy < currentArray.length - currentOffset) { + currentOffset += moveBy; + break; + } else { + moveBy -= currentArray.length - currentOffset; + if (currentArrayIndex != -1 && currentArrayIndex < contents.size() - 1) { + currentArray = contents.get(++currentArrayIndex); + currentOffset = 0; + } else { + currentOffset = currentArray.length; + } + } + } + } + + private void moveBackwards(int moveBy) { + while (moveBy > 0) { + if ((currentOffset - moveBy) >= 0) { + currentOffset -= moveBy; + break; + } else { + moveBy -= currentOffset; + currentArray = contents.get(--currentArrayIndex); + currentOffset = currentArray.length; + } + } + } + + @Override + public int position() { + return position; + } + + @Override + public CompositeReadableBuffer slice() { + int newCapacity = limit() - position(); + + final CompositeReadableBuffer result; + + if (newCapacity == 0) { + result = EMPTY_SLICE; + } else { + result = new CompositeReadableBuffer(currentArray, currentOffset); + result.contents = contents; + result.currentArrayIndex = currentArrayIndex; + result.capacity = newCapacity; + result.limit = newCapacity; + result.position = 0; + result.compactable = false; + } + + return result; + } + + @Override + public CompositeReadableBuffer flip() { + limit = position; + position(0); // Move by index to avoid corrupting a slice. + mark = UNSET_MARK; + + return this; + } + + @Override + public CompositeReadableBuffer limit(int limit) { + if (limit < 0 || limit > capacity) { + throw new IllegalArgumentException("limit must be non-negative and no greater than the capacity"); + } + + if (mark > limit) { + mark = UNSET_MARK; + } + + if (position > limit) { + position(limit); + } + + this.limit = limit; + + return this; + } + + @Override + public int limit() { + return limit; + } + + @Override + public CompositeReadableBuffer mark() { + this.mark = position; + return this; + } + + @Override + public CompositeReadableBuffer reset() { + if (mark < 0) { + throw new InvalidMarkException(); + } + + position(mark); + + return this; + } + + @Override + public CompositeReadableBuffer rewind() { + return position(0); + } + + @Override + public CompositeReadableBuffer clear() { + mark = UNSET_MARK; + limit = capacity; + + return position(0); + } + + @Override + public int remaining() { + return limit - position; + } + + @Override + public boolean hasRemaining() { + return remaining() > 0; + } + + @Override + public CompositeReadableBuffer duplicate() { + CompositeReadableBuffer duplicated = + new CompositeReadableBuffer(currentArray, currentOffset); + + if (contents != null) { + duplicated.contents = new ArrayList<>(contents); + } + + duplicated.capacity = capacity; + duplicated.currentArrayIndex = currentArrayIndex; + duplicated.limit = limit; + duplicated.position = position; + duplicated.mark = mark; + duplicated.compactable = compactable; // A slice duplicated should not allow compaction. + + return duplicated; + } + + @Override + public ByteBuffer byteBuffer() { + int viewSpan = limit() - position(); + + final ByteBuffer result; + + if (viewSpan == 0) { + result = EMPTY_BUFFER; + } else if (viewSpan <= currentArray.length - currentOffset) { + result = ByteBuffer.wrap(currentArray, currentOffset, viewSpan); + } else { + result = buildByteBuffer(viewSpan); + } + + return result.asReadOnlyBuffer(); + } + + private ByteBuffer buildByteBuffer(int span) { + byte[] compactedView = new byte[span]; + int arrayIndex = currentArrayIndex; + + // Take whatever is left from the current array; + System.arraycopy(currentArray, currentOffset, compactedView, 0, currentArray.length - currentOffset); + int copied = currentArray.length - currentOffset; + + while (copied < span) { + byte[] next = contents.get(++arrayIndex); + final int length = Math.min(span - copied, next.length); + System.arraycopy(next, 0, compactedView, copied, length); + copied += length; + } + + return ByteBuffer.wrap(compactedView); + } + + @Override + public String readUTF8() throws CharacterCodingException { + return readString(StandardCharsets.UTF_8.newDecoder()); + } + + @Override + public String readString(CharsetDecoder decoder) throws CharacterCodingException { + if (!hasRemaining()) { + return null; + } + + CharBuffer decoded = null; + + if (hasArray()) { + decoded = decoder.decode(ByteBuffer.wrap(currentArray, currentOffset, remaining())); + } else { + decoded = readStringFromComponents(decoder); + } + + return decoded.toString(); + } + + private CharBuffer readStringFromComponents(CharsetDecoder decoder) throws CharacterCodingException { + int size = (int)(remaining() * decoder.averageCharsPerByte()); + CharBuffer decoded = CharBuffer.allocate(size); + + int arrayIndex = currentArrayIndex; + final int viewSpan = limit() - position(); + int processed = Math.min(currentArray.length - currentOffset, viewSpan); + ByteBuffer wrapper = ByteBuffer.wrap(currentArray, currentOffset, processed); + + CoderResult step = CoderResult.OVERFLOW; + + do { + boolean endOfInput = processed == viewSpan; + step = decoder.decode(wrapper, decoded, endOfInput); + if (step.isUnderflow() && endOfInput) { + step = decoder.flush(decoded); + break; + } + + if (step.isOverflow()) { + size = 2 * size + 1; + CharBuffer upsized = CharBuffer.allocate(size); + decoded.flip(); + upsized.put(decoded); + decoded = upsized; + continue; + } + + byte[] next = contents.get(++arrayIndex); + int wrapSize = Math.min(next.length, viewSpan - processed); + wrapper = ByteBuffer.wrap(next, 0, wrapSize); + processed += wrapSize; + } while (!step.isError()); + + if (step.isError()) { + step.throwException(); + } + + return (CharBuffer) decoded.flip(); + } + + /** + * Compact the buffer dropping arrays that have been consumed by previous + * reads from this Composite buffer. The limit is reset to the new capacity + */ + @Override + public CompositeReadableBuffer reclaimRead() { + if (!compactable || (currentArray == null && contents == null)) { + return this; + } + + int totalCompaction = 0; + int totalRemovals = 0; + + for (; totalRemovals < currentArrayIndex; ++totalRemovals) { + byte[] element = contents.remove(0); + totalCompaction += element.length; + } + + currentArrayIndex -= totalRemovals; + + if (currentArray.length == currentOffset) { + totalCompaction += currentArray.length; + + // If we are sitting on the end of the data (length == offest) then + // we are also at the last element in the ArrayList if one is currently + // in use, so remove the data and release the list. + if (currentArrayIndex == 0) { + contents.clear(); + contents = null; + } + + currentArray = null; + currentArrayIndex = -1; + currentOffset = 0; + } + + position -= totalCompaction; + limit = capacity -= totalCompaction; + + if (mark != UNSET_MARK) { + mark -= totalCompaction; + } + + return this; + } + + /** + * Adds the given array into the composite buffer at the end. + * <p> + * The appended array is not copied so changes to the source array are visible in this + * buffer and vice versa. If this composite was empty than it would return true for the + * {@link #hasArray()} method until another array is appended. + * <p> + * Calling this method resets the limit to the new capacity. + * + * @param array + * The array to add to this composite buffer. + * + * @throws IllegalArgumentException if the array is null or zero size. + * @throws IllegalStateException if the buffer does not allow appends. + * + * @return a reference to this {@link CompositeReadableBuffer}. + */ + public CompositeReadableBuffer append(byte[] array) { + if (!compactable) { + throw new IllegalStateException(); + } + + if (array == null || array.length == 0) { + throw new IllegalArgumentException("Array must not be empty or null"); + } + + if (currentArray == null) { + currentArray = array; + currentOffset = 0; + } else if (contents == null) { + contents = new ArrayList<>(); + contents.add(currentArray); + contents.add(array); + currentArrayIndex = 0; + } else { + contents.add(array); + } + + capacity += array.length; + limit = capacity; + + return this; + } + + @Override + public int hashCode() { + int hash = 1; + + if (currentArrayIndex < 0) { + int span = limit() - position(); + while (span > 0) { + hash = 31 * hash + currentArray[currentOffset + --span]; + } + } else { + final int currentPos = position(); + for (int i = limit() - 1; i >= currentPos; i--) { + hash = 31 * hash + (int)get(i); + } + } + + return hash; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (!(other instanceof ReadableBuffer)) { + return false; + } + + ReadableBuffer buffer = (ReadableBuffer)other; + if (this.remaining() != buffer.remaining()) { + return false; + } + + final int currentPos = position(); + + for (int i = buffer.position(); hasRemaining(); i++) { + if (!equals(this.get(), buffer.get(i))) { + return false; + } + } + + position(currentPos); + + return true; + } + + private static boolean equals(byte x, byte y) { + return x == y; + } + + private void maybeMoveToNextArray() { + if (currentArray.length == currentOffset) { + if (currentArrayIndex >= 0 && currentArrayIndex < (contents.size() - 1)) { + currentArray = contents.get(++currentArrayIndex); + currentOffset = 0; + } + } + } + + private static void validateReadTarget(int destSize, int offset, int length) { + if ((offset | length) < 0) { + throw new IndexOutOfBoundsException("offset and legnth must be non-negative"); + } + + if (((long) offset + (long) length) > destSize) { + throw new IndexOutOfBoundsException("target is to small for specified read size"); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java index 5786924..5b2c71c 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java @@ -188,4 +188,25 @@ public class CompositeWritableBuffer implements WritableBuffer { return _first.toString() + " + "+_second.toString(); } + + @Override + public void put(ReadableBuffer payload) { + int firstRemaining = _first.remaining(); + if(firstRemaining > 0) + { + if(firstRemaining >= payload.remaining()) + { + _first.put(payload); + return; + } + else + { + int limit = payload.limit(); + payload.limit(payload.position()+firstRemaining); + _first.put(payload); + payload.limit(limit); + } + } + _second.put(payload); + } } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java index c305916..2d4989b 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java @@ -32,7 +32,6 @@ import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.UnsignedLong; import org.apache.qpid.proton.amqp.UnsignedShort; -import java.io.IOException; import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.nio.charset.CharsetDecoder; @@ -41,7 +40,7 @@ import java.util.*; public class DecoderImpl implements ByteBufferDecoder { - private ByteBuffer _buffer; + private ReadableBuffer _buffer; private final CharsetDecoder _charsetDecoder = StandardCharsets.UTF_8.newDecoder(); @@ -58,7 +57,7 @@ public class DecoderImpl implements ByteBufferDecoder DecoderImpl(final ByteBuffer buffer) { - _buffer = buffer; + _buffer = new ReadableBuffer.ByteBufferReader(buffer); } public TypeConstructor<?> peekConstructor() @@ -998,21 +997,30 @@ public class DecoderImpl implements ByteBufferDecoder _buffer.get(data, offset, length); } - <V> V readRaw(TypeDecoder<V> decoder, int size) { - V decode = decoder.decode(this, (ByteBuffer) _buffer.slice().limit(size)); + V decode = decoder.decode(this, _buffer.slice().limit(size)); _buffer.position(_buffer.position()+size); return decode; } public void setByteBuffer(final ByteBuffer buffer) { - _buffer = buffer; + _buffer = new ReadableBuffer.ByteBufferReader(buffer); } public ByteBuffer getByteBuffer() { + return _buffer.byteBuffer(); + } + + public void setBuffer(final ReadableBuffer buffer) + { + _buffer = buffer; + } + + public ReadableBuffer getBuffer() + { return _buffer; } @@ -1023,7 +1031,7 @@ public class DecoderImpl implements ByteBufferDecoder interface TypeDecoder<V> { - V decode(DecoderImpl decoder, ByteBuffer buf); + V decode(DecoderImpl decoder, ReadableBuffer buf); } private static class UnknownDescribedType implements DescribedType http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java index a6949b5..ade5a08 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java @@ -27,7 +27,7 @@ public class DroppingWritableBuffer implements WritableBuffer private int _pos = 0; @Override - public boolean hasRemaining() + public boolean hasRemaining() { return true; } @@ -104,4 +104,10 @@ public class DroppingWritableBuffer implements WritableBuffer { return Integer.MAX_VALUE; } + + @Override + public void put(ReadableBuffer payload) { + _pos += payload.remaining(); + payload.position(payload.limit()); + } } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java index 7c055ae..872414a 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java @@ -40,7 +40,7 @@ abstract class FixedSizePrimitiveTypeEncoding<T> extends AbstractPrimitiveTypeEn public final void skipValue() { - getDecoder().getByteBuffer().position(getDecoder().getByteBuffer().position() + getFixedSize()); + getDecoder().getBuffer().position(getDecoder().getBuffer().position() + getFixedSize()); } protected abstract int getFixedSize(); http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java index ba84141..a3ff4af 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.proton.codec; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -151,7 +150,7 @@ public class ListType extends AbstractPrimitiveType<List> public List readValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = decoder.readRawInt(); // todo - limit the decoder with size @@ -228,7 +227,7 @@ public class ListType extends AbstractPrimitiveType<List> public void skipValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = decoder.readRawInt(); buffer.position(buffer.position() + size); } @@ -295,7 +294,7 @@ public class ListType extends AbstractPrimitiveType<List> public List readValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = ((int)decoder.readRawByte()) & 0xff; // todo - limit the decoder with size @@ -367,7 +366,7 @@ public class ListType extends AbstractPrimitiveType<List> public void skipValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = ((int)decoder.readRawByte()) & 0xff; buffer.position(buffer.position() + size); } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java index 72d1bbd..b0e2330 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.proton.codec; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -33,7 +32,7 @@ public class MapType extends AbstractPrimitiveType<Map> private final MapEncoding _shortMapEncoding; private EncoderImpl _encoder; - private AMQPType fixedKeyType; + private AMQPType<?> fixedKeyType; private static interface MapEncoding extends PrimitiveTypeEncoding<Map> { @@ -95,7 +94,7 @@ public class MapType extends AbstractPrimitiveType<Map> return len; } - private static TypeConstructor<?> findNextDecoder(DecoderImpl decoder, ByteBuffer buffer, TypeConstructor<?> previousConstructor) + private static TypeConstructor<?> findNextDecoder(DecoderImpl decoder, ReadableBuffer buffer, TypeConstructor<?> previousConstructor) { if (previousConstructor == null) { @@ -205,7 +204,7 @@ public class MapType extends AbstractPrimitiveType<Map> public Map readValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = decoder.readRawInt(); // todo - limit the decoder with size @@ -264,7 +263,7 @@ public class MapType extends AbstractPrimitiveType<Map> public void skipValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = decoder.readRawInt(); buffer.position(buffer.position() + size); } @@ -343,7 +342,7 @@ public class MapType extends AbstractPrimitiveType<Map> public Map readValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = (decoder.readRawByte()) & 0xff; // todo - limit the decoder with size @@ -398,7 +397,7 @@ public class MapType extends AbstractPrimitiveType<Map> public void skipValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = ((int)decoder.readRawByte()) & 0xff; buffer.position(buffer.position() + size); } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java index 1360d76..ea3ef17 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java @@ -18,59 +18,318 @@ */ package org.apache.qpid.proton.codec; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.Charset; +import java.nio.InvalidMarkException; +import java.nio.ReadOnlyBufferException; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.StandardCharsets; /** * Interface to abstract a buffer, similar to {@link WritableBuffer} */ public interface ReadableBuffer { - void put(ReadableBuffer other); - + /** + * Returns the capacity of the backing buffer of this ReadableBuffer + * @return the capacity of the backing buffer of this ReadableBuffer + */ + int capacity(); + + /** + * Returns true if this ReadableBuffer is backed by an array which can be + * accessed by the {@link #array()} and {@link #arrayOffset()} methods. + * + * @return true if the buffer is backed by a primitive array. + */ + boolean hasArray(); + + /** + * Returns the primitive array that backs this buffer if one exists and the + * buffer is not read-only. The caller should have checked the {@link #hasArray()} + * method before calling this method. + * + * @return the array that backs this buffer is available. + * + * @throws UnsupportedOperationException if this {@link ReadableBuffer} doesn't support array access. + * @throws ReadOnlyBufferException if the ReadableBuffer is read-only. + */ + byte[] array(); + + /** + * Returns the offset into the backing array where data should be read from. The caller + * should have checked the {@link #hasArray()} method before calling this method. + * + * @return the offset into the backing array to start reading from. + * + * @throws UnsupportedOperationException if this {@link ReadableBuffer} doesn't support array access. + * @throws ReadOnlyBufferException if the ReadableBuffer is read-only. + */ + int arrayOffset(); + + /** + * Compact the backing storage of this ReadableBuffer, possibly freeing previously-read + * portions of pooled data or reducing the number of backing arrays if present. + * <p> + * This is an optional operation and care should be taken in its implementation. + * + * @return a reference to this buffer + */ + ReadableBuffer reclaimRead(); + + /** + * Reads the byte at the current position and advances the position by 1. + * + * @return the byte at the current position. + * + * @throws BufferUnderflowException if the buffer position has reached the limit. + */ byte get(); + /** + * Reads the byte at the given index, the buffer position is not affected. + * + * @param index + * The index in the buffer from which to read the byte. + * + * @return the byte value stored at the target index. + * + * @throws IndexOutOfBoundsException if the index is not in range for this buffer. + */ + byte get(int index); + + /** + * Reads four bytes from the buffer and returns them as an integer value. The + * buffer position is advanced by four byes. + * + * @return and integer value composed of bytes read from the buffer. + * + * @throws BufferUnderflowException if the buffer position has reached the limit. + */ int getInt(); + /** + * Reads eight bytes from the buffer and returns them as an long value. The + * buffer position is advanced by eight byes. + * + * @return and long value composed of bytes read from the buffer. + * + * @throws BufferUnderflowException if the buffer position has reached the limit. + */ long getLong(); + /** + * Reads two bytes from the buffer and returns them as an short value. The + * buffer position is advanced by two byes. + * + * @return and short value composed of bytes read from the buffer. + * + * @throws BufferUnderflowException if the buffer position has reached the limit. + */ short getShort(); + /** + * Reads four bytes from the buffer and returns them as an float value. The + * buffer position is advanced by four byes. + * + * @return and float value composed of bytes read from the buffer. + * + * @throws BufferUnderflowException if the buffer position has reached the limit. + */ float getFloat(); + /** + * Reads eight bytes from the buffer and returns them as an double value. The + * buffer position is advanced by eight byes. + * + * @return and double value composed of bytes read from the buffer. + * + * @throws BufferUnderflowException if the buffer position has reached the limit. + */ double getDouble(); - ReadableBuffer get(final byte[] data, final int offset, final int length); - - ReadableBuffer get(final byte[] data); - - ReadableBuffer position(int position); - + /** + * A bulk read method that copies bytes from this buffer into the target byte array. + * + * @param target + * The byte array to copy bytes read from this buffer. + * @param offset + * The offset into the given array where the copy starts. + * @param length + * The number of bytes to copy into the target array. + * + * @return a reference to this ReadableBuffer instance. + * + * @throws BufferUnderflowException if the are less readable bytes than the array length. + * @throws IndexOutOfBoundsException if the offset or length values are invalid. + */ + ReadableBuffer get(final byte[] target, final int offset, final int length); + + /** + * A bulk read method that copies bytes from this buffer into the target byte array. + * + * @param target + * The byte array to copy bytes read from this buffer. + * + * @return a reference to this ReadableBuffer instance. + * + * @throws BufferUnderflowException if the are less readable bytes than the array length. + */ + ReadableBuffer get(final byte[] target); + + /** + * Copy data from this buffer to the target buffer starting from the current + * position and continuing until either this buffer's remaining bytes are + * consumed or the target is full. + * + * @param target + * The WritableBuffer to transfer this buffer's data to. + * + * @return a reference to this ReadableBuffer instance. + */ + ReadableBuffer get(WritableBuffer target); + + /** + * Creates a new ReadableBuffer instance that is a view of the readable portion of + * this buffer. The position will be set to zero and the limit and the reported capacity + * will match the value returned by this buffer's {@link #remaining()} method, the mark + * will be undefined. + * + * @return a new ReadableBuffer that is a view of the readable portion of this buffer. + */ ReadableBuffer slice(); + /** + * Sets the buffer limit to the current position and the position is set to zero, the + * mark value reset to undefined. + * + * @return a reference to this {@link ReadableBuffer}. + */ ReadableBuffer flip(); + /** + * Sets the current read limit of this buffer to the given value. If the buffer mark + * value is defined and is larger than the limit the mark will be discarded. If the + * position is larger than the new limit it will be reset to the new limit. + * + * @param limit + * The new read limit to set for this buffer. + * + * @return a reference to this {@link ReadableBuffer}. + * + * @throws IllegalArgumentException if the limit value is negative or greater than the capacity. + */ ReadableBuffer limit(int limit); + /** + * @return the current value of this buffer's limit. + */ int limit(); - int remaining(); + /** + * Sets the current position of this buffer to the given value. If the buffer mark + * value is defined and is larger than the newly set position is must be discarded. + * + * @param position + * The new position to set for this buffer. + * + * @return a reference to this {@link ReadableBuffer}. + * + * @throws IllegalArgumentException if the position value is negative or greater than the limit. + */ + ReadableBuffer position(int position); + /** + * @return the current position from which the next read operation will start. + */ int position(); + /** + * Mark the current position of this buffer which can be returned to after a + * read operation by calling {@link #reset()}. + * + * @return a reference to this {@link ReadableBuffer}. + */ + ReadableBuffer mark(); + + /** + * Reset the buffer's position to a previously marked value, the mark should remain + * set after calling this method. + * + * @return a reference to this {@link ReadableBuffer}. + * + * @throws InvalidMarkException if the mark value is undefined. + */ + ReadableBuffer reset(); + + /** + * Resets the buffer position to zero and clears and previously set mark. + * + * @return a reference to this {@link ReadableBuffer}. + */ + ReadableBuffer rewind(); + + /** + * Resets the buffer position to zero and sets the limit to the buffer capacity, + * the mark value is discarded if set. + * + * @return a reference to this {@link ReadableBuffer}. + */ + ReadableBuffer clear(); + + /** + * @return the remaining number of readable bytes in this buffer. + */ + int remaining(); + + /** + * @return true if there are readable bytes still remaining in this buffer. + */ boolean hasRemaining(); + /** + * Creates a duplicate {@link ReadableBuffer} to this instance. + * <p> + * The duplicated buffer will have the same position, limit and mark as this + * buffer. The two buffers share the same backing data. + * + * @return a duplicate of this {@link ReadableBuffer}. + */ ReadableBuffer duplicate(); + /** + * @return a ByteBuffer view of the current readable portion of this buffer. + */ ByteBuffer byteBuffer(); - String readUTF8(); + /** + * Reads a UTF-8 encoded String from the buffer starting the decode at the + * current position and reading until the limit is reached. The position + * is advanced to the limit once this method returns. If there is no bytes + * remaining in the buffer when this method is called a null is returned. + * + * @return a string decoded from the remaining bytes in this buffer. + * + * @throws CharacterCodingException if the encoding is invalid for any reason. + */ + String readUTF8() throws CharacterCodingException; + + /** + * Decodes a String from the buffer using the provided {@link CharsetDecoder} + * starting the decode at the current position and reading until the limit is + * reached. The position is advanced to the limit once this method returns. + * If there is no bytes remaining in the buffer when this method is called a + * null is returned. + * + * @return a string decoded from the remaining bytes in this buffer. + * + * @throws CharacterCodingException if the encoding is invalid for any reason. + */ + String readString(CharsetDecoder decoder) throws CharacterCodingException; final class ByteBufferReader implements ReadableBuffer { - private static final Charset Charset_UTF8 = Charset.forName("UTF-8"); - private ByteBuffer buffer; public static ByteBufferReader allocate(int size) { @@ -78,16 +337,34 @@ public interface ReadableBuffer { return new ByteBufferReader(allocated); } + public static ByteBufferReader wrap(ByteBuffer buffer) { + return new ByteBufferReader(buffer); + } + + public static ByteBufferReader wrap(byte[] array) { + return new ByteBufferReader(ByteBuffer.wrap(array)); + } + public ByteBufferReader(ByteBuffer buffer) { this.buffer = buffer; } @Override + public int capacity() { + return buffer.capacity(); + } + + @Override public byte get() { return buffer.get(); } @Override + public byte get(int index) { + return buffer.get(index); + } + + @Override public int getInt() { return buffer.getInt(); } @@ -179,13 +456,95 @@ public interface ReadableBuffer { @Override public String readUTF8() { - CharBuffer charBuf = Charset_UTF8.decode(buffer); - return charBuf.toString(); + return StandardCharsets.UTF_8.decode(buffer).toString(); + } + + @Override + public String readString(CharsetDecoder decoder) throws CharacterCodingException { + return decoder.decode(buffer).toString(); + } + + @Override + public boolean hasArray() { + return buffer.hasArray(); + } + + @Override + public byte[] array() { + return buffer.array(); + } + + @Override + public int arrayOffset() { + return buffer.arrayOffset(); + } + + @Override + public ReadableBuffer reclaimRead() { + // Don't compact ByteBuffer due to the expense of the copy + return this; } @Override - public void put(ReadableBuffer other) { - this.buffer.put(other.byteBuffer()); + public ReadableBuffer mark() { + buffer.mark(); + return this; + } + + @Override + public ReadableBuffer reset() { + buffer.reset(); + return this; + } + + @Override + public ReadableBuffer rewind() { + buffer.rewind(); + return this; + } + + @Override + public ReadableBuffer clear() { + buffer.clear(); + return this; + } + + @Override + public ReadableBuffer get(WritableBuffer target) { + target.put(buffer); + return this; + } + + @Override + public String toString() { + return buffer.toString(); + } + + @Override + public int hashCode() { + return buffer.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (!(other instanceof ReadableBuffer)) { + return false; + } + + ReadableBuffer readable = (ReadableBuffer) other; + if (this.remaining() != readable.remaining()) { + return false; + } + + if (other instanceof CompositeReadableBuffer) { + return other.equals(this); + } + + return buffer.equals(readable.byteBuffer()); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java index 91476bc..dfc449c 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.proton.codec; -import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.CharsetDecoder; import java.util.Arrays; @@ -31,13 +30,12 @@ public class StringType extends AbstractPrimitiveType<String> private static final DecoderImpl.TypeDecoder<String> _stringCreator = new DecoderImpl.TypeDecoder<String>() { - - public String decode(DecoderImpl decoder, final ByteBuffer buf) + public String decode(DecoderImpl decoder, final ReadableBuffer buffer) { CharsetDecoder charsetDecoder = decoder.getCharsetDecoder(); try { - return decoder.getCharsetDecoder().decode(buf).toString(); + return buffer.readString(charsetDecoder); } catch (CharacterCodingException e) { @@ -50,7 +48,6 @@ public class StringType extends AbstractPrimitiveType<String> } }; - public static interface StringEncoding extends PrimitiveTypeEncoding<String> { void setValue(String val, int length); @@ -106,7 +103,6 @@ public class StringType extends AbstractPrimitiveType<String> return len; } - public StringEncoding getCanonicalEncoding() { return _stringEncoding; @@ -121,11 +117,9 @@ public class StringType extends AbstractPrimitiveType<String> extends LargeFloatingSizePrimitiveTypeEncoding<String> implements StringEncoding { - private String _value; private int _length; - public AllStringEncoding(final EncoderImpl encoder, final DecoderImpl decoder) { super(encoder, decoder); @@ -143,7 +137,6 @@ public class StringType extends AbstractPrimitiveType<String> return (val == _value) ? _length : calculateUTF8Length(val); } - @Override public byte getEncodingCode() { @@ -162,7 +155,6 @@ public class StringType extends AbstractPrimitiveType<String> public String readValue() { - DecoderImpl decoder = getDecoder(); int size = decoder.readRawInt(); return decoder.readRaw(_stringCreator, size); @@ -177,7 +169,7 @@ public class StringType extends AbstractPrimitiveType<String> public void skipValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = decoder.readRawInt(); buffer.position(buffer.position() + size); } @@ -187,7 +179,6 @@ public class StringType extends AbstractPrimitiveType<String> extends SmallFloatingSizePrimitiveTypeEncoding<String> implements StringEncoding { - private String _value; private int _length; @@ -196,7 +187,6 @@ public class StringType extends AbstractPrimitiveType<String> super(encoder, decoder); } - @Override protected void writeEncodedValue(final String val) { @@ -209,7 +199,6 @@ public class StringType extends AbstractPrimitiveType<String> return (val == _value) ? _length : calculateUTF8Length(val); } - @Override public byte getEncodingCode() { @@ -228,7 +217,6 @@ public class StringType extends AbstractPrimitiveType<String> public String readValue() { - DecoderImpl decoder = getDecoder(); int size = ((int)decoder.readRawByte()) & 0xff; return decoder.readRaw(_stringCreator, size); @@ -243,10 +231,9 @@ public class StringType extends AbstractPrimitiveType<String> public void skipValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = ((int)decoder.readRawByte()) & 0xff; buffer.position(buffer.position() + size); } } - } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java index e333e6a..00051ac 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java @@ -22,7 +22,6 @@ package org.apache.qpid.proton.codec; import org.apache.qpid.proton.amqp.Symbol; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Collection; @@ -35,27 +34,27 @@ public class SymbolType extends AbstractPrimitiveType<Symbol> private final SymbolEncoding _symbolEncoding; private final SymbolEncoding _shortSymbolEncoding; - private final Map<ByteBuffer, Symbol> _symbolCache = new HashMap<ByteBuffer, Symbol>(); + private final Map<ReadableBuffer, Symbol> _symbolCache = new HashMap<ReadableBuffer, Symbol>(); private DecoderImpl.TypeDecoder<Symbol> _symbolCreator = - new DecoderImpl.TypeDecoder<Symbol>() + new DecoderImpl.TypeDecoder<Symbol>() + { + @Override + public Symbol decode(DecoderImpl decoder, ReadableBuffer buffer) { - @Override - public Symbol decode(DecoderImpl decoder, ByteBuffer buf) + Symbol symbol = _symbolCache.get(buffer); + if (symbol == null) { - Symbol symbol = _symbolCache.get(buf); - if(symbol == null) - { - byte[] bytes = new byte[buf.limit()]; - buf.get(bytes); - - String str = new String(bytes, ASCII_CHARSET); - symbol = Symbol.getSymbol(str); - - _symbolCache.put(ByteBuffer.wrap(bytes), symbol); - } - return symbol; + byte[] bytes = new byte[buffer.limit()]; + buffer.get(bytes); + + String str = new String(bytes, ASCII_CHARSET); + symbol = Symbol.getSymbol(str); + + _symbolCache.put(ReadableBuffer.ByteBufferReader.wrap(bytes), symbol); } - }; + return symbol; + } + }; public static interface SymbolEncoding extends PrimitiveTypeEncoding<Symbol> { @@ -155,7 +154,7 @@ public class SymbolType extends AbstractPrimitiveType<Symbol> public void skipValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = decoder.readRawInt(); buffer.position(buffer.position() + size); } @@ -210,7 +209,7 @@ public class SymbolType extends AbstractPrimitiveType<Symbol> public void skipValue() { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = decoder.getByteBuffer(); + ReadableBuffer buffer = decoder.getBuffer(); int size = ((int)decoder.readRawByte()) & 0xff; buffer.position(buffer.position() + size); } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java index 79676b3..67c8292 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java @@ -49,6 +49,8 @@ public interface WritableBuffer void put(ByteBuffer payload); + void put(ReadableBuffer payload); + int limit(); class ByteBufferWrapper implements WritableBuffer @@ -133,11 +135,27 @@ public interface WritableBuffer } @Override + public void put(ReadableBuffer src) + { + src.get(this); + } + + @Override public int limit() { return _buf.limit(); } + public ByteBuffer byteBuffer() + { + return _buf; + } + + public ReadableBuffer toReadableBuffer() + { + return ReadableBuffer.ByteBufferReader.wrap((ByteBuffer) _buf.duplicate().flip()); + } + @Override public String toString() { @@ -149,5 +167,15 @@ public interface WritableBuffer ByteBuffer allocated = ByteBuffer.allocate(size); return new ByteBufferWrapper(allocated); } + + public static ByteBufferWrapper wrap(ByteBuffer buffer) + { + return new ByteBufferWrapper(buffer); + } + + public static ByteBufferWrapper wrap(byte[] bytes) + { + return new ByteBufferWrapper(ByteBuffer.wrap(bytes)); + } } } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java index 3624836..d9eb991 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java @@ -80,18 +80,18 @@ public class FastPathAcceptedType implements AMQPType<Accepted>, FastPathDescrib @Override public Accepted readValue() { DecoderImpl decoder = getDecoder(); - byte typeCode = decoder.getByteBuffer().get(); + byte typeCode = decoder.getBuffer().get(); switch (typeCode) { case EncodingCodes.LIST0: break; case EncodingCodes.LIST8: - decoder.getByteBuffer().get(); - decoder.getByteBuffer().get(); + decoder.getBuffer().get(); + decoder.getBuffer().get(); break; case EncodingCodes.LIST32: - decoder.getByteBuffer().getInt(); - decoder.getByteBuffer().getInt(); + decoder.getBuffer().getInt(); + decoder.getBuffer().getInt(); break; default: throw new DecodeException("Incorrect type found in Accepted type encoding: " + typeCode); http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java index 189b360..06d8026 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java @@ -55,7 +55,7 @@ public class FastPathHeaderType implements AMQPType<Header>, FastPathDescribedTy @Override public Header readValue() { DecoderImpl decoder = getDecoder(); - byte typeCode = decoder.getByteBuffer().get(); + byte typeCode = decoder.getBuffer().get(); @SuppressWarnings("unused") int size = 0; @@ -65,12 +65,12 @@ public class FastPathHeaderType implements AMQPType<Header>, FastPathDescribedTy case EncodingCodes.LIST0: break; case EncodingCodes.LIST8: - size = ((int)decoder.getByteBuffer().get()) & 0xff; - count = ((int)decoder.getByteBuffer().get()) & 0xff; + size = ((int)decoder.getBuffer().get()) & 0xff; + count = ((int)decoder.getBuffer().get()) & 0xff; break; case EncodingCodes.LIST32: - size = decoder.getByteBuffer().getInt(); - count = decoder.getByteBuffer().getInt(); + size = decoder.getBuffer().getInt(); + count = decoder.getBuffer().getInt(); break; default: throw new DecodeException("Incorrect type found in Header encoding: " + typeCode); http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java index e3caca5..e071ea9 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java @@ -55,7 +55,7 @@ public class FastPathPropertiesType implements AMQPType<Properties>, FastPathDes @Override public Properties readValue() { DecoderImpl decoder = getDecoder(); - byte typeCode = decoder.getByteBuffer().get(); + byte typeCode = decoder.getBuffer().get(); @SuppressWarnings("unused") int size = 0; @@ -65,12 +65,12 @@ public class FastPathPropertiesType implements AMQPType<Properties>, FastPathDes case EncodingCodes.LIST0: break; case EncodingCodes.LIST8: - size = ((int)decoder.getByteBuffer().get()) & 0xff; - count = ((int)decoder.getByteBuffer().get()) & 0xff; + size = ((int)decoder.getBuffer().get()) & 0xff; + count = ((int)decoder.getBuffer().get()) & 0xff; break; case EncodingCodes.LIST32: - size = decoder.getByteBuffer().getInt(); - count = decoder.getByteBuffer().getInt(); + size = decoder.getBuffer().getInt(); + count = decoder.getBuffer().getInt(); break; default: throw new DecodeException("Incorrect type found in Properties encoding: " + typeCode); http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java index 01e18e7..c329aa7 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java @@ -84,7 +84,7 @@ public class FastPathDispositionType implements AMQPType<Disposition>, FastPathD @Override public Disposition readValue() { DecoderImpl decoder = getDecoder(); - byte typeCode = decoder.getByteBuffer().get(); + byte typeCode = decoder.getBuffer().get(); @SuppressWarnings("unused") int size = 0; @@ -95,12 +95,12 @@ public class FastPathDispositionType implements AMQPType<Disposition>, FastPathD // TODO - Technically invalid however old decoder also allowed this. break; case EncodingCodes.LIST8: - size = ((int)decoder.getByteBuffer().get()) & 0xff; - count = ((int)decoder.getByteBuffer().get()) & 0xff; + size = ((int)decoder.getBuffer().get()) & 0xff; + count = ((int)decoder.getBuffer().get()) & 0xff; break; case EncodingCodes.LIST32: - size = decoder.getByteBuffer().getInt(); - count = decoder.getByteBuffer().getInt(); + size = decoder.getBuffer().getInt(); + count = decoder.getBuffer().getInt(); break; default: throw new DecodeException("Incorrect type found in Disposition encoding: " + typeCode); http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java index 78abc5c..6f500be 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java @@ -80,7 +80,7 @@ public class FastPathFlowType implements AMQPType<Flow>, FastPathDescribedTypeCo @Override public Flow readValue() { DecoderImpl decoder = getDecoder(); - byte typeCode = decoder.getByteBuffer().get(); + byte typeCode = decoder.getBuffer().get(); @SuppressWarnings("unused") int size = 0; @@ -91,12 +91,12 @@ public class FastPathFlowType implements AMQPType<Flow>, FastPathDescribedTypeCo // TODO - Technically invalid however old decoder also allowed this. break; case EncodingCodes.LIST8: - size = ((int)decoder.getByteBuffer().get()) & 0xff; - count = ((int)decoder.getByteBuffer().get()) & 0xff; + size = ((int)decoder.getBuffer().get()) & 0xff; + count = ((int)decoder.getBuffer().get()) & 0xff; break; case EncodingCodes.LIST32: - size = decoder.getByteBuffer().getInt(); - count = decoder.getByteBuffer().getInt(); + size = decoder.getBuffer().getInt(); + count = decoder.getBuffer().getInt(); break; default: throw new DecodeException("Incorrect type found in Flow encoding: " + typeCode); http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java index 685890a..79842db 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java @@ -61,7 +61,7 @@ public class FastPathTransferType implements AMQPType<Transfer>, FastPathDescrib @Override public Transfer readValue() { DecoderImpl decoder = getDecoder(); - byte typeCode = decoder.getByteBuffer().get(); + byte typeCode = decoder.getBuffer().get(); @SuppressWarnings("unused") int size = 0; @@ -72,12 +72,12 @@ public class FastPathTransferType implements AMQPType<Transfer>, FastPathDescrib // TODO - Technically invalid however old decoder also allowed this. break; case EncodingCodes.LIST8: - size = ((int)decoder.getByteBuffer().get()) & 0xff; - count = ((int)decoder.getByteBuffer().get()) & 0xff; + size = ((int)decoder.getBuffer().get()) & 0xff; + count = ((int)decoder.getBuffer().get()) & 0xff; break; case EncodingCodes.LIST32: - size = decoder.getByteBuffer().getInt(); - count = decoder.getByteBuffer().getInt(); + size = decoder.getBuffer().getInt(); + count = decoder.getBuffer().getInt(); break; default: throw new DecodeException("Incorrect type found in Transfer encoding: " + typeCode); http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java index f9d718f..fea4361 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.proton.engine; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.WritableBuffer; /** @@ -69,6 +70,17 @@ public interface Receiver extends Link */ public int recv(WritableBuffer buffer); + /** + * Receive message data for the current delivery returning the data in a Readable buffer. + * + * The delivery will return an empty buffer if there is no pending data to be read or if all + * data has been read either by a previous call to this method or by a call to one of the other + * receive methods. + * + * @return a ReadableBuffer that contains the currently available data for the current delivery. + */ + public ReadableBuffer recv(); + public void drain(int credit); /** http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java index 159d5c3..fdb2552 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java @@ -64,6 +64,22 @@ public interface Sender extends Link public int send(ReadableBuffer buffer); /** + * Sends data to the current delivery attempting not to copy the data unless a previous + * send has already added data to the Delivery in which case a copy may occur depending on + * the implementation. + * <p> + * Care should be taken when passing ReadableBuffer instances that wrapped pooled bytes + * as the send does not mean the data will be sent immediately when the transport is + * flushed so the pooled bytes could be held for longer than expected. + * + * @param buffer + * An immutable ReadableBuffer that can be held until the next transport flush. + * + * @return the number of bytes read from the provided buffer. + */ + public int sendNoCopy(ReadableBuffer buffer); + + /** * Abort the current delivery. * * Note "pn_link_abort" is commented out in the .h --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org