PROTON-1672 Handle multi-frame transfer payloads more efficiently

Replace reallocation and consolidation of transfer payloads when
multiple framed transfers are inbound.  Creates a
CompositeReadableBuffer that can be used to house the assembled payload
for use in the decoder. The decoder implementation refactored to handle
ReadableBuffer as the source of bytes as well as ByteBuffer.  Adds
no-copy method variants to the Sender and Receiver API such that clients
or servers can process inbound and outbound deliveries without copying
the payloads when it is known to be safe not to copy.

Adds tests and jacoco reports to validate test coverage.

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/commit/ec554715
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/tree/ec554715
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/diff/ec554715

Branch: refs/heads/master
Commit: ec5547152fed5a74afbe11bbb33541a2cb149fa4
Parents: c4cd774
Author: Timothy Bish <tabish...@gmail.com>
Authored: Wed Apr 11 16:00:33 2018 -0400
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Wed Apr 11 16:00:33 2018 -0400

----------------------------------------------------------------------
 pom.xml                                         |   33 +
 .../org/apache/qpid/proton/amqp/Binary.java     |   25 +-
 .../org/apache/qpid/proton/codec/ArrayType.java |   14 +-
 .../apache/qpid/proton/codec/BinaryType.java    |    6 +-
 .../proton/codec/CompositeReadableBuffer.java   |  744 ++++++
 .../proton/codec/CompositeWritableBuffer.java   |   21 +
 .../apache/qpid/proton/codec/DecoderImpl.java   |   22 +-
 .../proton/codec/DroppingWritableBuffer.java    |    8 +-
 .../codec/FixedSizePrimitiveTypeEncoding.java   |    2 +-
 .../org/apache/qpid/proton/codec/ListType.java  |    9 +-
 .../org/apache/qpid/proton/codec/MapType.java   |   13 +-
 .../qpid/proton/codec/ReadableBuffer.java       |  395 ++-
 .../apache/qpid/proton/codec/StringType.java    |   21 +-
 .../apache/qpid/proton/codec/SymbolType.java    |   39 +-
 .../qpid/proton/codec/WritableBuffer.java       |   28 +
 .../codec/messaging/FastPathAcceptedType.java   |   10 +-
 .../codec/messaging/FastPathHeaderType.java     |   10 +-
 .../codec/messaging/FastPathPropertiesType.java |   10 +-
 .../transport/FastPathDispositionType.java      |   10 +-
 .../codec/transport/FastPathFlowType.java       |   10 +-
 .../codec/transport/FastPathTransferType.java   |   10 +-
 .../org/apache/qpid/proton/engine/Receiver.java |   12 +
 .../org/apache/qpid/proton/engine/Sender.java   |   16 +
 .../qpid/proton/engine/impl/DeliveryImpl.java   |  198 +-
 .../qpid/proton/engine/impl/FrameWriter.java    |   13 +-
 .../qpid/proton/engine/impl/ReceiverImpl.java   |   18 +
 .../qpid/proton/engine/impl/SenderImpl.java     |   19 +
 .../qpid/proton/engine/impl/TransportImpl.java  |   21 +-
 .../proton/engine/impl/TransportSession.java    |   22 +-
 .../qpid/proton/message/impl/MessageImpl.java   |   11 +-
 .../org/apache/qpid/proton/codec/Benchmark.java |  102 +-
 .../codec/CompositeReadableBufferTest.java      | 2305 ++++++++++++++++++
 .../proton/engine/impl/DeliveryImplTest.java    |  577 ++++-
 .../proton/engine/impl/TransportImplTest.java   |    4 +-
 34 files changed, 4502 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ba6869b..945058f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,10 @@
 
     <!-- Plugin versions -->
     <maven-bundle-plugin-version>3.2.0</maven-bundle-plugin-version>
+    <jacoco-plugin-version>0.7.9</jacoco-plugin-version>
+
+    <!-- surefire forked jvm arguments -->
+    <argLine>-Xmx2g -enableassertions ${jacoco-config}</argLine>
   </properties>
 
   <dependencyManagement>
@@ -116,6 +120,20 @@
           </excludes>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>prepare-agent</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <propertyName>jacoco-config</propertyName>
+        </configuration>
+      </plugin>
     </plugins>
     <pluginManagement>
       <plugins>
@@ -154,6 +172,11 @@
             <pushChanges>true</pushChanges>
           </configuration>
         </plugin>
+        <plugin>
+          <groupId>org.jacoco</groupId>
+          <artifactId>jacoco-maven-plugin</artifactId>
+          <version>${jacoco-plugin-version}</version>
+        </plugin>
       </plugins>
     </pluginManagement>
   </build>
@@ -181,6 +204,16 @@
     <url>https://builds.apache.org/view/M-R/view/Qpid/job/Qpid-proton-j/</url>
   </ciManagement>
 
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+        <version>${jacoco-plugin-version}</version>
+      </plugin>
+    </plugins>
+  </reporting>
+
   <profiles>
     <!-- Override the apache-release profile from the parent pom -->
     <profile>

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java 
b/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
index aac3fc5..ab1bfe5 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
@@ -23,6 +23,8 @@ package org.apache.qpid.proton.amqp;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
+import org.apache.qpid.proton.codec.ReadableBuffer;
+
 public final class Binary
 {
 
@@ -167,7 +169,26 @@ public final class Binary
         return new Binary(_data, _offset+offset, length);
     }
 
-    public static Binary create(ByteBuffer buffer) 
+    public static Binary create(ReadableBuffer buffer)
+    {
+        if (buffer == null)
+        {
+            return null;
+        }
+        else if (!buffer.hasArray())
+        {
+            byte data[] = new byte [buffer.remaining()];
+            ReadableBuffer dup = buffer.duplicate();
+            dup.get(data);
+            return new Binary(data);
+        }
+        else
+        {
+            return new Binary(buffer.array(), buffer.arrayOffset() + 
buffer.position(), buffer.remaining());
+        }
+    }
+
+    public static Binary create(ByteBuffer buffer)
     {
         if( buffer == null )
             return null;
@@ -178,7 +199,7 @@ public final class Binary
             dup.get(data);
             return new Binary(data);
         }
-        else 
+        else
         {
             return new Binary(buffer.array(), 
buffer.arrayOffset()+buffer.position(), buffer.remaining());
         }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java
index 32d6f85..f4f0c8a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java
@@ -21,7 +21,6 @@
 package org.apache.qpid.proton.codec;
 
 import java.lang.reflect.Array;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -48,7 +47,7 @@ public class ArrayType implements PrimitiveType<Object[]>
         void writeValue(double[] a);
         void writeValue(char[] a);
 
-        void setValue(Object[] val, TypeEncoding encoder, int size);
+        void setValue(Object[] val, TypeEncoding<?> encoder, int size);
 
         int getSizeBytes();
 
@@ -92,7 +91,7 @@ public class ArrayType implements PrimitiveType<Object[]>
 
     public ArrayEncoding getEncoding(final Object[] val)
     {
-        TypeEncoding encoder = calculateEncoder(val,_encoder);
+        TypeEncoding<?> encoder = calculateEncoder(val,_encoder);
         int size = calculateSize(val, encoder);
         ArrayEncoding arrayEncoding = (val.length > 255 || size > 254)
                                       ? _arrayEncoding
@@ -101,7 +100,7 @@ public class ArrayType implements PrimitiveType<Object[]>
         return arrayEncoding;
     }
 
-    private static TypeEncoding calculateEncoder(final Object[] val, final 
EncoderImpl encoder)
+    private static TypeEncoding<?> calculateEncoder(final Object[] val, final 
EncoderImpl encoder)
     {
 
         if(val.length == 0)
@@ -156,7 +155,6 @@ public class ArrayType implements PrimitiveType<Object[]>
             }
             else
             {
-
                 if(underlyingType == null)
                 {
                     checkTypes = true;
@@ -173,7 +171,6 @@ public class ArrayType implements PrimitiveType<Object[]>
                                 .getType(val[i]) + " in array");
                     }
 
-
                     TypeEncoding elementEncoding = 
underlyingType.getEncoding(val[i]);
                     if(elementEncoding != underlyingEncoding && 
!underlyingEncoding.encodesSuperset(elementEncoding))
                     {
@@ -438,7 +435,6 @@ public class ArrayType implements PrimitiveType<Object[]>
             extends LargeFloatingSizePrimitiveTypeEncoding<Object[]>
             implements ArrayEncoding
     {
-
         private Object[] _val;
         private TypeEncoding _underlyingEncoder;
         private int _size;
@@ -635,7 +631,7 @@ public class ArrayType implements PrimitiveType<Object[]>
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -900,7 +896,7 @@ public class ArrayType implements PrimitiveType<Object[]>
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)decoder.readRawByte()) & 0xFF;
             buffer.position(buffer.position() + size);
         }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java
index 1d739b8..be0a8f5 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java
@@ -22,7 +22,6 @@ package org.apache.qpid.proton.codec;
 
 import org.apache.qpid.proton.amqp.Binary;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -135,7 +134,7 @@ public class BinaryType extends 
AbstractPrimitiveType<Binary>
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -190,7 +189,8 @@ public class BinaryType extends 
AbstractPrimitiveType<Binary>
 
         public void skipValue()
         {
-            ByteBuffer buffer = getDecoder().getByteBuffer();
+            DecoderImpl decoder = getDecoder();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)getDecoder().readRawByte()) & 0xff;
             buffer.position(buffer.position() + size);
         }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java
new file mode 100644
index 0000000..e6652e4
--- /dev/null
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.InvalidMarkException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * ReadableBuffer implementation whose content is made up of one or more
+ * byte arrays.
+ */
+public class CompositeReadableBuffer implements ReadableBuffer {
+
+    private static final List<byte[]> EMPTY_LIST = 
Collections.unmodifiableList(new ArrayList<byte[]>());
+    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(new 
byte[0]);
+    private static final CompositeReadableBuffer EMPTY_SLICE = new 
CompositeReadableBuffer(true);
+    private static int UNSET_MARK = -1;
+
+    private ArrayList<byte[]> contents;
+
+    // Track active array and our offset into it.
+    private int currentArrayIndex = -1;
+    private byte[] currentArray;
+    private int currentOffset;
+
+    // State global to the buffer.
+    private int position;
+    private int limit;
+    private int capacity;
+    private int mark = -1;
+    private boolean compactable = true;
+
+    /**
+     * Creates a default empty composite buffer
+     */
+    public CompositeReadableBuffer() {
+    }
+
+    private CompositeReadableBuffer(byte[] array, int offset) {
+        this.currentArray = array;
+        this.currentOffset = offset;
+        this.capacity = array.length;
+        this.limit = capacity;
+    }
+
+    private CompositeReadableBuffer(boolean compactable) {
+        this.compactable = compactable;
+    }
+
+    public List<byte[]> getArrays() {
+        return contents == null ? EMPTY_LIST : 
Collections.unmodifiableList(contents);
+    }
+
+    public int getCurrentIndex() {
+        return currentArrayIndex;
+    }
+
+    @Override
+    public boolean hasArray() {
+        return currentArray != null && (contents == null || contents.size() == 
1);
+    }
+
+    public int capacity() {
+        return capacity;
+    }
+
+    @Override
+    public byte[] array() {
+        if (hasArray()) {
+            return currentArray;
+        }
+
+        throw new UnsupportedOperationException("Buffer not backed by a single 
array");
+    }
+
+    @Override
+    public int arrayOffset() {
+        if (hasArray()) {
+            return currentOffset;
+        }
+
+        throw new UnsupportedOperationException("Buffer not backed by a single 
array");
+    }
+
+    @Override
+    public byte get() {
+        if (position == limit) {
+            throw new BufferUnderflowException();
+        }
+
+        final byte result = currentArray[currentOffset++];
+        position++;
+        maybeMoveToNextArray();
+
+        return result;
+    }
+
+    @Override
+    public byte get(int index) {
+        if (index < 0 || index >= limit) {
+            throw new IndexOutOfBoundsException("The given index is not valid: 
" + index);
+        }
+
+        byte result = 0;
+
+        if (index == position) {
+            result = currentArray[currentOffset];
+        } else if (index < position) {
+            result = getBackwards(index);
+        } else {
+            result = getForward(index);
+        }
+
+        return result;
+    }
+
+    private byte getForward(int index) {
+        byte result = 0;
+
+        int currentArrayIndex = this.currentArrayIndex;
+        int currentOffset = this.currentOffset;
+        byte[] currentArray = this.currentArray;
+
+        for (int amount = index - position; amount >= 0;) {
+            if (amount < currentArray.length - currentOffset) {
+                result = currentArray[currentOffset + amount];
+                break;
+            } else {
+                amount -= currentArray.length - currentOffset;
+                currentArray = contents.get(++currentArrayIndex);
+                currentOffset = 0;
+            }
+        }
+
+        return result;
+    }
+
+    private byte getBackwards(int index) {
+        byte result = 0;
+
+        int currentArrayIndex = this.currentArrayIndex;
+        int currentOffset = this.currentOffset;
+        byte[] currentArray = this.currentArray;
+
+        for (int amount = position - index; amount >= 0;) {
+            if ((currentOffset - amount) >= 0) {
+                result = currentArray[currentOffset - amount];
+                break;
+            } else {
+                amount -= currentOffset;
+                currentArray = contents.get(--currentArrayIndex);
+                currentOffset = currentArray.length;
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public int getInt() {
+        if (remaining() < Integer.BYTES) {
+            throw new BufferUnderflowException();
+        }
+
+        int result = 0;
+
+        if (currentArray.length - currentOffset >= 4) {
+            result = (int)(currentArray[currentOffset++] & 0xFF) << 24 |
+                     (int)(currentArray[currentOffset++] & 0xFF) << 16 |
+                     (int)(currentArray[currentOffset++] & 0xFF) << 8 |
+                     (int)(currentArray[currentOffset++] & 0xFF) << 0;
+        } else {
+            for (int i = Integer.BYTES - 1; i >= 0; --i) {
+                result |= (int)(currentArray[currentOffset++] & 0xFF) << (i * 
Byte.SIZE);
+                maybeMoveToNextArray();
+            }
+        }
+
+        position += 4;
+
+        return result;
+    }
+
+    @Override
+    public long getLong() {
+        if (remaining() < Long.BYTES) {
+            throw new BufferUnderflowException();
+        }
+
+        long result = 0;
+
+        if (currentArray.length - currentOffset >= 8) {
+            result = (long)(currentArray[currentOffset++] & 0xFF) << 56 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 48 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 40 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 32 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 24 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 16 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 8 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 0;
+        } else {
+            for (int i = Long.BYTES - 1; i >= 0; --i) {
+                result |= (long)(currentArray[currentOffset++] & 0xFF) << (i * 
Byte.SIZE);
+                maybeMoveToNextArray();
+            }
+        }
+
+        position += 8;
+
+        return result;
+    }
+
+    @Override
+    public short getShort() {
+        if (remaining() < Short.BYTES) {
+            throw new BufferUnderflowException();
+        }
+
+        short result = 0;
+
+        for (int i = Short.BYTES - 1; i >= 0; --i) {
+            result |= (currentArray[currentOffset++] & 0xFF) << (i * 
Byte.SIZE);
+            maybeMoveToNextArray();
+        }
+
+        position += 2;
+
+        return result;
+    }
+
+    @Override
+    public float getFloat() {
+        return Float.intBitsToFloat(getInt());
+    }
+
+    @Override
+    public double getDouble() {
+        return Double.longBitsToDouble(getLong());
+    }
+
+    @Override
+    public CompositeReadableBuffer get(byte[] data) {
+        return get(data, 0, data.length);
+    }
+
+    @Override
+    public CompositeReadableBuffer get(byte[] data, int offset, int length) {
+        validateReadTarget(data.length, offset, length);
+
+        if (length > remaining()) {
+            throw new BufferUnderflowException();
+        }
+
+        int copied = 0;
+        while (length > 0) {
+            final int chunk = Math.min((currentArray.length - currentOffset), 
length);
+            System.arraycopy(currentArray, currentOffset, data, offset + 
copied, chunk);
+
+            currentOffset += chunk;
+            length -= chunk;
+            copied += chunk;
+
+            maybeMoveToNextArray();
+        }
+
+        position += copied;
+
+        return this;
+    }
+
+    @Override
+    public CompositeReadableBuffer get(WritableBuffer target) {
+        int length = Math.min(target.remaining(), remaining());
+
+        do {
+            final int chunk = Math.min((currentArray.length - currentOffset), 
length);
+
+            if (chunk == 0) {
+                break;  // This buffer is out of data
+            }
+
+            target.put(currentArray, currentOffset, chunk);
+
+            currentOffset += chunk;
+            position += chunk;
+            length -= chunk;
+
+            maybeMoveToNextArray();
+        } while (length > 0);
+
+        return this;
+    }
+
+    @Override
+    public CompositeReadableBuffer position(int position) {
+        if (position < 0 || position > limit) {
+            throw new IllegalArgumentException("position must be non-negative 
and no greater than the limit");
+        }
+
+        int moveBy = position - this.position;
+        if (moveBy >= 0) {
+            moveForward(moveBy);
+        } else {
+            moveBackwards(Math.abs(moveBy));
+        }
+
+        this.position = position;
+
+        if (mark > position) {
+            mark = UNSET_MARK;
+        }
+
+        return this;
+    }
+
+    private void moveForward(int moveBy) {
+        while (moveBy > 0) {
+            if (moveBy < currentArray.length - currentOffset) {
+                currentOffset += moveBy;
+                break;
+            } else {
+                moveBy -= currentArray.length - currentOffset;
+                if (currentArrayIndex != -1 && currentArrayIndex < 
contents.size() - 1) {
+                    currentArray = contents.get(++currentArrayIndex);
+                    currentOffset = 0;
+                } else {
+                    currentOffset = currentArray.length;
+                }
+            }
+        }
+    }
+
+    private void moveBackwards(int moveBy) {
+        while (moveBy > 0) {
+            if ((currentOffset - moveBy) >= 0) {
+                currentOffset -= moveBy;
+                break;
+            } else {
+                moveBy -= currentOffset;
+                currentArray = contents.get(--currentArrayIndex);
+                currentOffset = currentArray.length;
+            }
+        }
+    }
+
+    @Override
+    public int position() {
+        return position;
+    }
+
+    @Override
+    public CompositeReadableBuffer slice() {
+        int newCapacity = limit() - position();
+
+        final CompositeReadableBuffer result;
+
+        if (newCapacity == 0) {
+            result = EMPTY_SLICE;
+        } else {
+            result = new CompositeReadableBuffer(currentArray, currentOffset);
+            result.contents = contents;
+            result.currentArrayIndex = currentArrayIndex;
+            result.capacity = newCapacity;
+            result.limit = newCapacity;
+            result.position = 0;
+            result.compactable = false;
+        }
+
+        return result;
+    }
+
+    @Override
+    public CompositeReadableBuffer flip() {
+        limit = position;
+        position(0); // Move by index to avoid corrupting a slice.
+        mark = UNSET_MARK;
+
+        return this;
+    }
+
+    @Override
+    public CompositeReadableBuffer limit(int limit) {
+        if (limit < 0 || limit > capacity) {
+            throw new IllegalArgumentException("limit must be non-negative and 
no greater than the capacity");
+        }
+
+        if (mark > limit) {
+            mark = UNSET_MARK;
+        }
+
+        if (position > limit) {
+            position(limit);
+        }
+
+        this.limit = limit;
+
+        return this;
+    }
+
+    @Override
+    public int limit() {
+        return limit;
+    }
+
+    @Override
+    public CompositeReadableBuffer mark() {
+        this.mark = position;
+        return this;
+    }
+
+    @Override
+    public CompositeReadableBuffer reset() {
+        if (mark < 0) {
+            throw new InvalidMarkException();
+        }
+
+        position(mark);
+
+        return this;
+    }
+
+    @Override
+    public CompositeReadableBuffer rewind() {
+        return position(0);
+    }
+
+    @Override
+    public CompositeReadableBuffer clear() {
+        mark = UNSET_MARK;
+        limit = capacity;
+
+        return position(0);
+    }
+
+    @Override
+    public int remaining() {
+        return limit - position;
+    }
+
+    @Override
+    public boolean hasRemaining() {
+        return remaining() > 0;
+    }
+
+    @Override
+    public CompositeReadableBuffer duplicate() {
+        CompositeReadableBuffer duplicated =
+            new CompositeReadableBuffer(currentArray, currentOffset);
+
+        if (contents != null) {
+            duplicated.contents = new ArrayList<>(contents);
+        }
+
+        duplicated.capacity = capacity;
+        duplicated.currentArrayIndex = currentArrayIndex;
+        duplicated.limit = limit;
+        duplicated.position = position;
+        duplicated.mark = mark;
+        duplicated.compactable = compactable;   // A slice duplicated should 
not allow compaction.
+
+        return duplicated;
+    }
+
+    @Override
+    public ByteBuffer byteBuffer() {
+        int viewSpan = limit() - position();
+
+        final ByteBuffer result;
+
+        if (viewSpan == 0) {
+            result = EMPTY_BUFFER;
+        } else if (viewSpan <= currentArray.length - currentOffset) {
+            result = ByteBuffer.wrap(currentArray, currentOffset, viewSpan);
+        } else {
+            result = buildByteBuffer(viewSpan);
+        }
+
+        return result.asReadOnlyBuffer();
+    }
+
+    private ByteBuffer buildByteBuffer(int span) {
+        byte[] compactedView = new byte[span];
+        int arrayIndex = currentArrayIndex;
+
+        // Take whatever is left from the current array;
+        System.arraycopy(currentArray, currentOffset, compactedView, 0, 
currentArray.length - currentOffset);
+        int copied = currentArray.length - currentOffset;
+
+        while (copied < span) {
+            byte[] next = contents.get(++arrayIndex);
+            final int length = Math.min(span - copied, next.length);
+            System.arraycopy(next, 0, compactedView, copied, length);
+            copied += length;
+        }
+
+        return ByteBuffer.wrap(compactedView);
+    }
+
+    @Override
+    public String readUTF8() throws CharacterCodingException {
+        return readString(StandardCharsets.UTF_8.newDecoder());
+    }
+
+    @Override
+    public String readString(CharsetDecoder decoder) throws 
CharacterCodingException {
+        if (!hasRemaining()) {
+            return null;
+        }
+
+        CharBuffer decoded = null;
+
+        if (hasArray()) {
+            decoded = decoder.decode(ByteBuffer.wrap(currentArray, 
currentOffset, remaining()));
+        } else {
+            decoded = readStringFromComponents(decoder);
+        }
+
+        return decoded.toString();
+    }
+
+    private CharBuffer readStringFromComponents(CharsetDecoder decoder) throws 
CharacterCodingException {
+        int size = (int)(remaining() * decoder.averageCharsPerByte());
+        CharBuffer decoded = CharBuffer.allocate(size);
+
+        int arrayIndex = currentArrayIndex;
+        final int viewSpan = limit() - position();
+        int processed = Math.min(currentArray.length - currentOffset, 
viewSpan);
+        ByteBuffer wrapper = ByteBuffer.wrap(currentArray, currentOffset, 
processed);
+
+        CoderResult step = CoderResult.OVERFLOW;
+
+        do {
+            boolean endOfInput = processed == viewSpan;
+            step = decoder.decode(wrapper, decoded, endOfInput);
+            if (step.isUnderflow() && endOfInput) {
+                step = decoder.flush(decoded);
+                break;
+            }
+
+            if (step.isOverflow()) {
+                size = 2 * size + 1;
+                CharBuffer upsized = CharBuffer.allocate(size);
+                decoded.flip();
+                upsized.put(decoded);
+                decoded = upsized;
+                continue;
+            }
+
+            byte[] next = contents.get(++arrayIndex);
+            int wrapSize = Math.min(next.length, viewSpan - processed);
+            wrapper = ByteBuffer.wrap(next, 0, wrapSize);
+            processed += wrapSize;
+        } while (!step.isError());
+
+        if (step.isError()) {
+            step.throwException();
+        }
+
+        return (CharBuffer) decoded.flip();
+    }
+
+    /**
+     * Compact the buffer dropping arrays that have been consumed by previous
+     * reads from this Composite buffer.  The limit is reset to the new 
capacity
+     */
+    @Override
+    public CompositeReadableBuffer reclaimRead() {
+        if (!compactable || (currentArray == null && contents == null)) {
+            return this;
+        }
+
+        int totalCompaction = 0;
+        int totalRemovals = 0;
+
+        for (; totalRemovals < currentArrayIndex; ++totalRemovals) {
+            byte[] element = contents.remove(0);
+            totalCompaction += element.length;
+        }
+
+        currentArrayIndex -= totalRemovals;
+
+        if (currentArray.length == currentOffset) {
+            totalCompaction += currentArray.length;
+
+            // If we are sitting on the end of the data (length == offest) then
+            // we are also at the last element in the ArrayList if one is 
currently
+            // in use, so remove the data and release the list.
+            if (currentArrayIndex == 0) {
+                contents.clear();
+                contents = null;
+            }
+
+            currentArray = null;
+            currentArrayIndex = -1;
+            currentOffset = 0;
+        }
+
+        position -= totalCompaction;
+        limit = capacity -= totalCompaction;
+
+        if (mark != UNSET_MARK) {
+            mark -= totalCompaction;
+        }
+
+        return this;
+    }
+
+    /**
+     * Adds the given array into the composite buffer at the end.
+     * <p>
+     * The appended array is not copied so changes to the source array are 
visible in this
+     * buffer and vice versa.  If this composite was empty than it would 
return true for the
+     * {@link #hasArray()} method until another array is appended.
+     * <p>
+     * Calling this method resets the limit to the new capacity.
+     *
+     * @param array
+     *      The array to add to this composite buffer.
+     *
+     * @throws IllegalArgumentException if the array is null or zero size.
+     * @throws IllegalStateException if the buffer does not allow appends.
+     *
+     * @return a reference to this {@link CompositeReadableBuffer}.
+     */
+    public CompositeReadableBuffer append(byte[] array) {
+        if (!compactable) {
+            throw new IllegalStateException();
+        }
+
+        if (array == null || array.length == 0) {
+            throw new IllegalArgumentException("Array must not be empty or 
null");
+        }
+
+        if (currentArray == null) {
+            currentArray = array;
+            currentOffset = 0;
+        } else if (contents == null) {
+            contents = new ArrayList<>();
+            contents.add(currentArray);
+            contents.add(array);
+            currentArrayIndex = 0;
+        } else {
+            contents.add(array);
+        }
+
+        capacity += array.length;
+        limit = capacity;
+
+        return this;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+
+        if (currentArrayIndex < 0) {
+            int span = limit() - position();
+            while (span > 0) {
+                hash = 31 * hash + currentArray[currentOffset + --span];
+            }
+        } else {
+            final int currentPos = position();
+            for (int i = limit() - 1; i >= currentPos; i--) {
+                hash = 31 * hash + (int)get(i);
+            }
+        }
+
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (!(other instanceof ReadableBuffer)) {
+            return false;
+        }
+
+        ReadableBuffer buffer = (ReadableBuffer)other;
+        if (this.remaining() != buffer.remaining()) {
+            return false;
+        }
+
+        final int currentPos = position();
+
+        for (int i = buffer.position(); hasRemaining(); i++) {
+            if (!equals(this.get(), buffer.get(i))) {
+                return false;
+            }
+        }
+
+        position(currentPos);
+
+        return true;
+    }
+
+    private static boolean equals(byte x, byte y) {
+        return x == y;
+    }
+
+    private void maybeMoveToNextArray() {
+        if (currentArray.length == currentOffset) {
+            if (currentArrayIndex >= 0 && currentArrayIndex < (contents.size() 
- 1)) {
+                currentArray = contents.get(++currentArrayIndex);
+                currentOffset = 0;
+            }
+        }
+    }
+
+    private static void validateReadTarget(int destSize, int offset, int 
length) {
+        if ((offset | length) < 0) {
+            throw new IndexOutOfBoundsException("offset and legnth must be 
non-negative");
+        }
+
+        if (((long) offset + (long) length) > destSize) {
+            throw new IndexOutOfBoundsException("target is to small for 
specified read size");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
index 5786924..5b2c71c 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
@@ -188,4 +188,25 @@ public class CompositeWritableBuffer implements 
WritableBuffer
     {
         return _first.toString() + " + "+_second.toString();
     }
+
+    @Override
+    public void put(ReadableBuffer payload) {
+        int firstRemaining = _first.remaining();
+        if(firstRemaining > 0)
+        {
+            if(firstRemaining >= payload.remaining())
+            {
+                _first.put(payload);
+                return;
+            }
+            else
+            {
+                int limit = payload.limit();
+                payload.limit(payload.position()+firstRemaining);
+                _first.put(payload);
+                payload.limit(limit);
+            }
+        }
+        _second.put(payload);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java
index c305916..2d4989b 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java
@@ -32,7 +32,6 @@ import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.apache.qpid.proton.amqp.UnsignedShort;
 
-import java.io.IOException;
 import java.lang.reflect.Array;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharsetDecoder;
@@ -41,7 +40,7 @@ import java.util.*;
 
 public class DecoderImpl implements ByteBufferDecoder
 {
-    private ByteBuffer _buffer;
+    private ReadableBuffer _buffer;
 
     private final CharsetDecoder _charsetDecoder = 
StandardCharsets.UTF_8.newDecoder();
 
@@ -58,7 +57,7 @@ public class DecoderImpl implements ByteBufferDecoder
 
     DecoderImpl(final ByteBuffer buffer)
     {
-        _buffer = buffer;
+        _buffer = new ReadableBuffer.ByteBufferReader(buffer);
     }
 
     public TypeConstructor<?> peekConstructor()
@@ -998,21 +997,30 @@ public class DecoderImpl implements ByteBufferDecoder
         _buffer.get(data, offset, length);
     }
 
-
     <V> V readRaw(TypeDecoder<V> decoder, int size)
     {
-        V decode = decoder.decode(this, (ByteBuffer) 
_buffer.slice().limit(size));
+        V decode = decoder.decode(this, _buffer.slice().limit(size));
         _buffer.position(_buffer.position()+size);
         return decode;
     }
 
     public void setByteBuffer(final ByteBuffer buffer)
     {
-        _buffer = buffer;
+        _buffer = new ReadableBuffer.ByteBufferReader(buffer);
     }
 
     public ByteBuffer getByteBuffer()
     {
+        return _buffer.byteBuffer();
+    }
+
+    public void setBuffer(final ReadableBuffer buffer)
+    {
+        _buffer = buffer;
+    }
+
+    public ReadableBuffer getBuffer()
+    {
         return _buffer;
     }
 
@@ -1023,7 +1031,7 @@ public class DecoderImpl implements ByteBufferDecoder
 
     interface TypeDecoder<V>
     {
-        V decode(DecoderImpl decoder, ByteBuffer buf);
+        V decode(DecoderImpl decoder, ReadableBuffer buf);
     }
 
     private static class UnknownDescribedType implements DescribedType

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java
index a6949b5..ade5a08 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java
@@ -27,7 +27,7 @@ public class DroppingWritableBuffer implements WritableBuffer
     private int _pos = 0;
 
     @Override
-    public boolean hasRemaining() 
+    public boolean hasRemaining()
     {
         return true;
     }
@@ -104,4 +104,10 @@ public class DroppingWritableBuffer implements 
WritableBuffer
     {
         return Integer.MAX_VALUE;
     }
+
+    @Override
+    public void put(ReadableBuffer payload) {
+        _pos += payload.remaining();
+        payload.position(payload.limit());
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java
index 7c055ae..872414a 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java
@@ -40,7 +40,7 @@ abstract class FixedSizePrimitiveTypeEncoding<T> extends 
AbstractPrimitiveTypeEn
 
     public final void skipValue()
     {
-        
getDecoder().getByteBuffer().position(getDecoder().getByteBuffer().position() + 
getFixedSize());
+        getDecoder().getBuffer().position(getDecoder().getBuffer().position() 
+ getFixedSize());
     }
 
     protected abstract int getFixedSize();

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java
index ba84141..a3ff4af 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.proton.codec;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -151,7 +150,7 @@ public class ListType extends AbstractPrimitiveType<List>
         public List readValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
 
             int size = decoder.readRawInt();
             // todo - limit the decoder with size
@@ -228,7 +227,7 @@ public class ListType extends AbstractPrimitiveType<List>
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -295,7 +294,7 @@ public class ListType extends AbstractPrimitiveType<List>
         public List readValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
 
             int size = ((int)decoder.readRawByte()) & 0xff;
             // todo - limit the decoder with size
@@ -367,7 +366,7 @@ public class ListType extends AbstractPrimitiveType<List>
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)decoder.readRawByte()) & 0xff;
             buffer.position(buffer.position() + size);
         }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java
index 72d1bbd..b0e2330 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.proton.codec;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
@@ -33,7 +32,7 @@ public class MapType extends AbstractPrimitiveType<Map>
     private final MapEncoding _shortMapEncoding;
     private EncoderImpl _encoder;
 
-    private AMQPType fixedKeyType;
+    private AMQPType<?> fixedKeyType;
 
     private static interface MapEncoding extends PrimitiveTypeEncoding<Map>
     {
@@ -95,7 +94,7 @@ public class MapType extends AbstractPrimitiveType<Map>
         return len;
     }
 
-    private static TypeConstructor<?> findNextDecoder(DecoderImpl decoder, 
ByteBuffer buffer, TypeConstructor<?> previousConstructor)
+    private static TypeConstructor<?> findNextDecoder(DecoderImpl decoder, 
ReadableBuffer buffer, TypeConstructor<?> previousConstructor)
     {
         if (previousConstructor == null)
         {
@@ -205,7 +204,7 @@ public class MapType extends AbstractPrimitiveType<Map>
         public Map readValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
 
             int size = decoder.readRawInt();
             // todo - limit the decoder with size
@@ -264,7 +263,7 @@ public class MapType extends AbstractPrimitiveType<Map>
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -343,7 +342,7 @@ public class MapType extends AbstractPrimitiveType<Map>
         public Map readValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
 
             int size = (decoder.readRawByte()) & 0xff;
             // todo - limit the decoder with size
@@ -398,7 +397,7 @@ public class MapType extends AbstractPrimitiveType<Map>
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)decoder.readRawByte()) & 0xff;
             buffer.position(buffer.position() + size);
         }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java
index 1360d76..ea3ef17 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java
@@ -18,59 +18,318 @@
  */
 package org.apache.qpid.proton.codec;
 
+import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.Charset;
+import java.nio.InvalidMarkException;
+import java.nio.ReadOnlyBufferException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
 
 /**
  * Interface to abstract a buffer, similar to {@link WritableBuffer}
  */
 public interface ReadableBuffer {
 
-    void put(ReadableBuffer other);
-
+    /**
+     * Returns the capacity of the backing buffer of this ReadableBuffer
+     * @return the capacity of the backing buffer of this ReadableBuffer
+     */
+    int capacity();
+
+    /**
+     * Returns true if this ReadableBuffer is backed by an array which can be
+     * accessed by the {@link #array()} and {@link #arrayOffset()} methods.
+     *
+     * @return true if the buffer is backed by a primitive array.
+     */
+    boolean hasArray();
+
+    /**
+     * Returns the primitive array that backs this buffer if one exists and the
+     * buffer is not read-only.  The caller should have checked the {@link 
#hasArray()}
+     * method before calling this method.
+     *
+     * @return the array that backs this buffer is available.
+     *
+     * @throws UnsupportedOperationException if this {@link ReadableBuffer} 
doesn't support array access.
+     * @throws ReadOnlyBufferException if the ReadableBuffer is read-only.
+     */
+    byte[] array();
+
+    /**
+     * Returns the offset into the backing array where data should be read 
from.  The caller
+     * should have checked the {@link #hasArray()} method before calling this 
method.
+     *
+     * @return the offset into the backing array to start reading from.
+     *
+     * @throws UnsupportedOperationException if this {@link ReadableBuffer} 
doesn't support array access.
+     * @throws ReadOnlyBufferException if the ReadableBuffer is read-only.
+     */
+    int arrayOffset();
+
+    /**
+     * Compact the backing storage of this ReadableBuffer, possibly freeing 
previously-read
+     * portions of pooled data or reducing the number of backing arrays if 
present.
+     * <p>
+     * This is an optional operation and care should be taken in its 
implementation.
+     *
+     * @return a reference to this buffer
+     */
+    ReadableBuffer reclaimRead();
+
+    /**
+     * Reads the byte at the current position and advances the position by 1.
+     *
+     * @return the byte at the current position.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the 
limit.
+     */
     byte get();
 
+    /**
+     * Reads the byte at the given index, the buffer position is not affected.
+     *
+     * @param index
+     *      The index in the buffer from which to read the byte.
+     *
+     * @return the byte value stored at the target index.
+     *
+     * @throws IndexOutOfBoundsException if the index is not in range for this 
buffer.
+     */
+    byte get(int index);
+
+    /**
+     * Reads four bytes from the buffer and returns them as an integer value.  
The
+     * buffer position is advanced by four byes.
+     *
+     * @return and integer value composed of bytes read from the buffer.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the 
limit.
+     */
     int getInt();
 
+    /**
+     * Reads eight bytes from the buffer and returns them as an long value.  
The
+     * buffer position is advanced by eight byes.
+     *
+     * @return and long value composed of bytes read from the buffer.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the 
limit.
+     */
     long getLong();
 
+    /**
+     * Reads two bytes from the buffer and returns them as an short value.  The
+     * buffer position is advanced by two byes.
+     *
+     * @return and short value composed of bytes read from the buffer.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the 
limit.
+     */
     short getShort();
 
+    /**
+     * Reads four bytes from the buffer and returns them as an float value.  
The
+     * buffer position is advanced by four byes.
+     *
+     * @return and float value composed of bytes read from the buffer.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the 
limit.
+     */
     float getFloat();
 
+    /**
+     * Reads eight bytes from the buffer and returns them as an double value.  
The
+     * buffer position is advanced by eight byes.
+     *
+     * @return and double value composed of bytes read from the buffer.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the 
limit.
+     */
     double getDouble();
 
-    ReadableBuffer get(final byte[] data, final int offset, final int length);
-
-    ReadableBuffer get(final byte[] data);
-
-    ReadableBuffer position(int position);
-
+    /**
+     * A bulk read method that copies bytes from this buffer into the target 
byte array.
+     *
+     * @param target
+     *      The byte array to copy bytes read from this buffer.
+     * @param offset
+     *      The offset into the given array where the copy starts.
+     * @param length
+     *      The number of bytes to copy into the target array.
+     *
+     * @return a reference to this ReadableBuffer instance.
+     *
+     * @throws BufferUnderflowException if the are less readable bytes than 
the array length.
+     * @throws IndexOutOfBoundsException if the offset or length values are 
invalid.
+     */
+    ReadableBuffer get(final byte[] target, final int offset, final int 
length);
+
+    /**
+     * A bulk read method that copies bytes from this buffer into the target 
byte array.
+     *
+     * @param target
+     *      The byte array to copy bytes read from this buffer.
+     *
+     * @return a reference to this ReadableBuffer instance.
+     *
+     * @throws BufferUnderflowException if the are less readable bytes than 
the array length.
+     */
+    ReadableBuffer get(final byte[] target);
+
+    /**
+     * Copy data from this buffer to the target buffer starting from the 
current
+     * position and continuing until either this buffer's remaining bytes are
+     * consumed or the target is full.
+     *
+     * @param target
+     *      The WritableBuffer to transfer this buffer's data to.
+     *
+     * @return a reference to this ReadableBuffer instance.
+     */
+    ReadableBuffer get(WritableBuffer target);
+
+    /**
+     * Creates a new ReadableBuffer instance that is a view of the readable 
portion of
+     * this buffer.  The position will be set to zero and the limit and the 
reported capacity
+     * will match the value returned by this buffer's {@link #remaining()} 
method, the mark
+     * will be undefined.
+     *
+     * @return a new ReadableBuffer that is a view of the readable portion of 
this buffer.
+     */
     ReadableBuffer slice();
 
+    /**
+     * Sets the buffer limit to the current position and the position is set 
to zero, the
+     * mark value reset to undefined.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     */
     ReadableBuffer flip();
 
+    /**
+     * Sets the current read limit of this buffer to the given value.  If the 
buffer mark
+     * value is defined and is larger than the limit the mark will be 
discarded.  If the
+     * position is larger than the new limit it will be reset to the new limit.
+     *
+     * @param limit
+     *      The new read limit to set for this buffer.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     *
+     * @throws IllegalArgumentException if the limit value is negative or 
greater than the capacity.
+     */
     ReadableBuffer limit(int limit);
 
+    /**
+     * @return the current value of this buffer's limit.
+     */
     int limit();
 
-    int remaining();
+    /**
+     * Sets the current position of this buffer to the given value.  If the 
buffer mark
+     * value is defined and is larger than the newly set position is must be 
discarded.
+     *
+     * @param position
+     *      The new position to set for this buffer.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     *
+     * @throws IllegalArgumentException if the position value is negative or 
greater than the limit.
+     */
+    ReadableBuffer position(int position);
 
+    /**
+     * @return the current position from which the next read operation will 
start.
+     */
     int position();
 
+    /**
+     * Mark the current position of this buffer which can be returned to after 
a
+     * read operation by calling {@link #reset()}.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     */
+    ReadableBuffer mark();
+
+    /**
+     * Reset the buffer's position to a previously marked value, the mark 
should remain
+     * set after calling this method.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     *
+     * @throws InvalidMarkException if the mark value is undefined.
+     */
+    ReadableBuffer reset();
+
+    /**
+     * Resets the buffer position to zero and clears and previously set mark.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     */
+    ReadableBuffer rewind();
+
+    /**
+     * Resets the buffer position to zero and sets the limit to the buffer 
capacity,
+     * the mark value is discarded if set.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     */
+    ReadableBuffer clear();
+
+    /**
+     * @return the remaining number of readable bytes in this buffer.
+     */
+    int remaining();
+
+    /**
+     * @return true if there are readable bytes still remaining in this buffer.
+     */
     boolean hasRemaining();
 
+    /**
+     * Creates a duplicate {@link ReadableBuffer} to this instance.
+     * <p>
+     * The duplicated buffer will have the same position, limit and mark as 
this
+     * buffer.  The two buffers share the same backing data.
+     *
+     * @return a duplicate of this {@link ReadableBuffer}.
+     */
     ReadableBuffer duplicate();
 
+    /**
+     * @return a ByteBuffer view of the current readable portion of this 
buffer.
+     */
     ByteBuffer byteBuffer();
 
-    String readUTF8();
+    /**
+     * Reads a UTF-8 encoded String from the buffer starting the decode at the
+     * current position and reading until the limit is reached.  The position
+     * is advanced to the limit once this method returns.  If there is no bytes
+     * remaining in the buffer when this method is called a null is returned.
+     *
+     * @return a string decoded from the remaining bytes in this buffer.
+     *
+     * @throws CharacterCodingException if the encoding is invalid for any 
reason.
+     */
+    String readUTF8() throws CharacterCodingException;
+
+    /**
+     * Decodes a String from the buffer using the provided {@link 
CharsetDecoder}
+     * starting the decode at the current position and reading until the limit 
is
+     * reached.  The position is advanced to the limit once this method 
returns.
+     * If there is no bytes remaining in the buffer when this method is called 
a
+     * null is returned.
+     *
+     * @return a string decoded from the remaining bytes in this buffer.
+     *
+     * @throws CharacterCodingException if the encoding is invalid for any 
reason.
+     */
+    String readString(CharsetDecoder decoder) throws CharacterCodingException;
 
     final class ByteBufferReader implements ReadableBuffer {
 
-        private static final Charset Charset_UTF8 = Charset.forName("UTF-8");
-
         private ByteBuffer buffer;
 
         public static ByteBufferReader allocate(int size) {
@@ -78,16 +337,34 @@ public interface ReadableBuffer {
             return new ByteBufferReader(allocated);
         }
 
+        public static ByteBufferReader wrap(ByteBuffer buffer) {
+            return new ByteBufferReader(buffer);
+        }
+
+        public static ByteBufferReader wrap(byte[] array) {
+            return new ByteBufferReader(ByteBuffer.wrap(array));
+        }
+
         public ByteBufferReader(ByteBuffer buffer) {
             this.buffer = buffer;
         }
 
         @Override
+        public int capacity() {
+            return buffer.capacity();
+        }
+
+        @Override
         public byte get() {
             return buffer.get();
         }
 
         @Override
+        public byte get(int index) {
+            return buffer.get(index);
+        }
+
+        @Override
         public int getInt() {
             return buffer.getInt();
         }
@@ -179,13 +456,95 @@ public interface ReadableBuffer {
 
         @Override
         public String readUTF8() {
-            CharBuffer charBuf = Charset_UTF8.decode(buffer);
-            return charBuf.toString();
+            return StandardCharsets.UTF_8.decode(buffer).toString();
+        }
+
+        @Override
+        public String readString(CharsetDecoder decoder) throws 
CharacterCodingException {
+            return decoder.decode(buffer).toString();
+        }
+
+        @Override
+        public boolean hasArray() {
+            return buffer.hasArray();
+        }
+
+        @Override
+        public byte[] array() {
+            return buffer.array();
+        }
+
+        @Override
+        public int arrayOffset() {
+            return buffer.arrayOffset();
+        }
+
+        @Override
+        public ReadableBuffer reclaimRead() {
+            // Don't compact ByteBuffer due to the expense of the copy
+            return this;
         }
 
         @Override
-        public void put(ReadableBuffer other) {
-            this.buffer.put(other.byteBuffer());
+        public ReadableBuffer mark() {
+            buffer.mark();
+            return this;
+        }
+
+        @Override
+        public ReadableBuffer reset() {
+            buffer.reset();
+            return this;
+        }
+
+        @Override
+        public ReadableBuffer rewind() {
+            buffer.rewind();
+            return this;
+        }
+
+        @Override
+        public ReadableBuffer clear() {
+            buffer.clear();
+            return this;
+        }
+
+        @Override
+        public ReadableBuffer get(WritableBuffer target) {
+            target.put(buffer);
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return buffer.toString();
+        }
+
+        @Override
+        public int hashCode() {
+            return buffer.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
+            }
+
+            if (!(other instanceof ReadableBuffer)) {
+                return false;
+            }
+
+            ReadableBuffer readable = (ReadableBuffer) other;
+            if (this.remaining() != readable.remaining()) {
+                return false;
+            }
+
+            if (other instanceof CompositeReadableBuffer) {
+                return other.equals(this);
+            }
+
+            return buffer.equals(readable.byteBuffer());
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java
index 91476bc..dfc449c 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.proton.codec;
 
-import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.CharsetDecoder;
 import java.util.Arrays;
@@ -31,13 +30,12 @@ public class StringType extends 
AbstractPrimitiveType<String>
     private static final DecoderImpl.TypeDecoder<String> _stringCreator =
         new DecoderImpl.TypeDecoder<String>()
         {
-
-            public String decode(DecoderImpl decoder, final ByteBuffer buf)
+            public String decode(DecoderImpl decoder, final ReadableBuffer 
buffer)
             {
                 CharsetDecoder charsetDecoder = decoder.getCharsetDecoder();
                 try
                 {
-                    return decoder.getCharsetDecoder().decode(buf).toString();
+                    return buffer.readString(charsetDecoder);
                 }
                 catch (CharacterCodingException e)
                 {
@@ -50,7 +48,6 @@ public class StringType extends AbstractPrimitiveType<String>
             }
         };
 
-
     public static interface StringEncoding extends 
PrimitiveTypeEncoding<String>
     {
         void setValue(String val, int length);
@@ -106,7 +103,6 @@ public class StringType extends 
AbstractPrimitiveType<String>
         return len;
     }
 
-
     public StringEncoding getCanonicalEncoding()
     {
         return _stringEncoding;
@@ -121,11 +117,9 @@ public class StringType extends 
AbstractPrimitiveType<String>
             extends LargeFloatingSizePrimitiveTypeEncoding<String>
             implements StringEncoding
     {
-
         private String _value;
         private int _length;
 
-
         public AllStringEncoding(final EncoderImpl encoder, final DecoderImpl 
decoder)
         {
             super(encoder, decoder);
@@ -143,7 +137,6 @@ public class StringType extends 
AbstractPrimitiveType<String>
             return (val == _value) ? _length : calculateUTF8Length(val);
         }
 
-
         @Override
         public byte getEncodingCode()
         {
@@ -162,7 +155,6 @@ public class StringType extends 
AbstractPrimitiveType<String>
 
         public String readValue()
         {
-
             DecoderImpl decoder = getDecoder();
             int size = decoder.readRawInt();
             return decoder.readRaw(_stringCreator, size);
@@ -177,7 +169,7 @@ public class StringType extends 
AbstractPrimitiveType<String>
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -187,7 +179,6 @@ public class StringType extends 
AbstractPrimitiveType<String>
             extends SmallFloatingSizePrimitiveTypeEncoding<String>
             implements StringEncoding
     {
-
         private String _value;
         private int _length;
 
@@ -196,7 +187,6 @@ public class StringType extends 
AbstractPrimitiveType<String>
             super(encoder, decoder);
         }
 
-
         @Override
         protected void writeEncodedValue(final String val)
         {
@@ -209,7 +199,6 @@ public class StringType extends 
AbstractPrimitiveType<String>
             return (val == _value) ? _length : calculateUTF8Length(val);
         }
 
-
         @Override
         public byte getEncodingCode()
         {
@@ -228,7 +217,6 @@ public class StringType extends 
AbstractPrimitiveType<String>
 
         public String readValue()
         {
-
             DecoderImpl decoder = getDecoder();
             int size = ((int)decoder.readRawByte()) & 0xff;
             return decoder.readRaw(_stringCreator, size);
@@ -243,10 +231,9 @@ public class StringType extends 
AbstractPrimitiveType<String>
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)decoder.readRawByte()) & 0xff;
             buffer.position(buffer.position() + size);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java
index e333e6a..00051ac 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java
@@ -22,7 +22,6 @@ package org.apache.qpid.proton.codec;
 
 import org.apache.qpid.proton.amqp.Symbol;
 
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,27 +34,27 @@ public class SymbolType extends 
AbstractPrimitiveType<Symbol>
     private final SymbolEncoding _symbolEncoding;
     private final SymbolEncoding _shortSymbolEncoding;
 
-    private final Map<ByteBuffer, Symbol> _symbolCache = new 
HashMap<ByteBuffer, Symbol>();
+    private final Map<ReadableBuffer, Symbol> _symbolCache = new 
HashMap<ReadableBuffer, Symbol>();
     private DecoderImpl.TypeDecoder<Symbol> _symbolCreator =
-            new DecoderImpl.TypeDecoder<Symbol>()
+        new DecoderImpl.TypeDecoder<Symbol>()
+        {
+            @Override
+            public Symbol decode(DecoderImpl decoder, ReadableBuffer buffer)
             {
-                @Override
-                public Symbol decode(DecoderImpl decoder, ByteBuffer buf)
+                Symbol symbol = _symbolCache.get(buffer);
+                if (symbol == null)
                 {
-                    Symbol symbol = _symbolCache.get(buf);
-                    if(symbol == null)
-                    {
-                        byte[] bytes = new byte[buf.limit()];
-                        buf.get(bytes);
-
-                        String str = new String(bytes, ASCII_CHARSET);
-                        symbol = Symbol.getSymbol(str);
-
-                        _symbolCache.put(ByteBuffer.wrap(bytes), symbol);
-                    }
-                    return symbol;
+                    byte[] bytes = new byte[buffer.limit()];
+                    buffer.get(bytes);
+
+                    String str = new String(bytes, ASCII_CHARSET);
+                    symbol = Symbol.getSymbol(str);
+
+                    
_symbolCache.put(ReadableBuffer.ByteBufferReader.wrap(bytes), symbol);
                 }
-            };
+                return symbol;
+            }
+        };
 
     public static interface SymbolEncoding extends 
PrimitiveTypeEncoding<Symbol>
     {
@@ -155,7 +154,7 @@ public class SymbolType extends 
AbstractPrimitiveType<Symbol>
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -210,7 +209,7 @@ public class SymbolType extends 
AbstractPrimitiveType<Symbol>
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)decoder.readRawByte()) & 0xff;
             buffer.position(buffer.position() + size);
         }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
index 79676b3..67c8292 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
@@ -49,6 +49,8 @@ public interface WritableBuffer
 
     void put(ByteBuffer payload);
 
+    void put(ReadableBuffer payload);
+
     int limit();
 
     class ByteBufferWrapper implements WritableBuffer
@@ -133,11 +135,27 @@ public interface WritableBuffer
         }
 
         @Override
+        public void put(ReadableBuffer src)
+        {
+            src.get(this);
+        }
+
+        @Override
         public int limit()
         {
             return _buf.limit();
         }
 
+        public ByteBuffer byteBuffer()
+        {
+            return _buf;
+        }
+
+        public ReadableBuffer toReadableBuffer()
+        {
+            return ReadableBuffer.ByteBufferReader.wrap((ByteBuffer) 
_buf.duplicate().flip());
+        }
+
         @Override
         public String toString()
         {
@@ -149,5 +167,15 @@ public interface WritableBuffer
             ByteBuffer allocated = ByteBuffer.allocate(size);
             return new ByteBufferWrapper(allocated);
         }
+
+        public static ByteBufferWrapper wrap(ByteBuffer buffer)
+        {
+            return new ByteBufferWrapper(buffer);
+        }
+
+        public static ByteBufferWrapper wrap(byte[] bytes)
+        {
+            return new ByteBufferWrapper(ByteBuffer.wrap(bytes));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java
index 3624836..d9eb991 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java
@@ -80,18 +80,18 @@ public class FastPathAcceptedType implements 
AMQPType<Accepted>, FastPathDescrib
     @Override
     public Accepted readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         switch (typeCode) {
             case EncodingCodes.LIST0:
                 break;
             case EncodingCodes.LIST8:
-                decoder.getByteBuffer().get();
-                decoder.getByteBuffer().get();
+                decoder.getBuffer().get();
+                decoder.getBuffer().get();
                 break;
             case EncodingCodes.LIST32:
-                decoder.getByteBuffer().getInt();
-                decoder.getByteBuffer().getInt();
+                decoder.getBuffer().getInt();
+                decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Accepted 
type encoding: " + typeCode);

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java
index 189b360..06d8026 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java
@@ -55,7 +55,7 @@ public class FastPathHeaderType implements AMQPType<Header>, 
FastPathDescribedTy
     @Override
     public Header readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         @SuppressWarnings("unused")
         int size = 0;
@@ -65,12 +65,12 @@ public class FastPathHeaderType implements 
AMQPType<Header>, FastPathDescribedTy
             case EncodingCodes.LIST0:
                 break;
             case EncodingCodes.LIST8:
-                size = ((int)decoder.getByteBuffer().get()) & 0xff;
-                count = ((int)decoder.getByteBuffer().get()) & 0xff;
+                size = ((int)decoder.getBuffer().get()) & 0xff;
+                count = ((int)decoder.getBuffer().get()) & 0xff;
                 break;
             case EncodingCodes.LIST32:
-                size = decoder.getByteBuffer().getInt();
-                count = decoder.getByteBuffer().getInt();
+                size = decoder.getBuffer().getInt();
+                count = decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Header 
encoding: " + typeCode);

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java
index e3caca5..e071ea9 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java
@@ -55,7 +55,7 @@ public class FastPathPropertiesType implements 
AMQPType<Properties>, FastPathDes
     @Override
     public Properties readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         @SuppressWarnings("unused")
         int size = 0;
@@ -65,12 +65,12 @@ public class FastPathPropertiesType implements 
AMQPType<Properties>, FastPathDes
             case EncodingCodes.LIST0:
                 break;
             case EncodingCodes.LIST8:
-                size = ((int)decoder.getByteBuffer().get()) & 0xff;
-                count = ((int)decoder.getByteBuffer().get()) & 0xff;
+                size = ((int)decoder.getBuffer().get()) & 0xff;
+                count = ((int)decoder.getBuffer().get()) & 0xff;
                 break;
             case EncodingCodes.LIST32:
-                size = decoder.getByteBuffer().getInt();
-                count = decoder.getByteBuffer().getInt();
+                size = decoder.getBuffer().getInt();
+                count = decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Properties 
encoding: " + typeCode);

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
index 01e18e7..c329aa7 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
@@ -84,7 +84,7 @@ public class FastPathDispositionType implements 
AMQPType<Disposition>, FastPathD
     @Override
     public Disposition readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         @SuppressWarnings("unused")
         int size = 0;
@@ -95,12 +95,12 @@ public class FastPathDispositionType implements 
AMQPType<Disposition>, FastPathD
                 // TODO - Technically invalid however old decoder also allowed 
this.
                 break;
             case EncodingCodes.LIST8:
-                size = ((int)decoder.getByteBuffer().get()) & 0xff;
-                count = ((int)decoder.getByteBuffer().get()) & 0xff;
+                size = ((int)decoder.getBuffer().get()) & 0xff;
+                count = ((int)decoder.getBuffer().get()) & 0xff;
                 break;
             case EncodingCodes.LIST32:
-                size = decoder.getByteBuffer().getInt();
-                count = decoder.getByteBuffer().getInt();
+                size = decoder.getBuffer().getInt();
+                count = decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Disposition 
encoding: " + typeCode);

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
index 78abc5c..6f500be 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
@@ -80,7 +80,7 @@ public class FastPathFlowType implements AMQPType<Flow>, 
FastPathDescribedTypeCo
     @Override
     public Flow readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         @SuppressWarnings("unused")
         int size = 0;
@@ -91,12 +91,12 @@ public class FastPathFlowType implements AMQPType<Flow>, 
FastPathDescribedTypeCo
                 // TODO - Technically invalid however old decoder also allowed 
this.
                 break;
             case EncodingCodes.LIST8:
-                size = ((int)decoder.getByteBuffer().get()) & 0xff;
-                count = ((int)decoder.getByteBuffer().get()) & 0xff;
+                size = ((int)decoder.getBuffer().get()) & 0xff;
+                count = ((int)decoder.getBuffer().get()) & 0xff;
                 break;
             case EncodingCodes.LIST32:
-                size = decoder.getByteBuffer().getInt();
-                count = decoder.getByteBuffer().getInt();
+                size = decoder.getBuffer().getInt();
+                count = decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Flow 
encoding: " + typeCode);

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
index 685890a..79842db 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
@@ -61,7 +61,7 @@ public class FastPathTransferType implements 
AMQPType<Transfer>, FastPathDescrib
     @Override
     public Transfer readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         @SuppressWarnings("unused")
         int size = 0;
@@ -72,12 +72,12 @@ public class FastPathTransferType implements 
AMQPType<Transfer>, FastPathDescrib
                 // TODO - Technically invalid however old decoder also allowed 
this.
                 break;
             case EncodingCodes.LIST8:
-                size = ((int)decoder.getByteBuffer().get()) & 0xff;
-                count = ((int)decoder.getByteBuffer().get()) & 0xff;
+                size = ((int)decoder.getBuffer().get()) & 0xff;
+                count = ((int)decoder.getBuffer().get()) & 0xff;
                 break;
             case EncodingCodes.LIST32:
-                size = decoder.getByteBuffer().getInt();
-                count = decoder.getByteBuffer().getInt();
+                size = decoder.getBuffer().getInt();
+                count = decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Transfer 
encoding: " + typeCode);

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
index f9d718f..fea4361 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.proton.engine;
 
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 
 /**
@@ -69,6 +70,17 @@ public interface Receiver extends Link
      */
     public int recv(WritableBuffer buffer);
 
+    /**
+     * Receive message data for the current delivery returning the data in a 
Readable buffer.
+     *
+     * The delivery will return an empty buffer if there is no pending data to 
be read or if all
+     * data has been read either by a previous call to this method or by a 
call to one of the other
+     * receive methods.
+     *
+     * @return a ReadableBuffer that contains the currently available data for 
the current delivery.
+     */
+    public ReadableBuffer recv();
+
     public void drain(int credit);
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
index 159d5c3..fdb2552 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
@@ -64,6 +64,22 @@ public interface Sender extends Link
     public int send(ReadableBuffer buffer);
 
     /**
+     * Sends data to the current delivery attempting not to copy the data 
unless a previous
+     * send has already added data to the Delivery in which case a copy may 
occur depending on
+     * the implementation.
+     * <p>
+     * Care should be taken when passing ReadableBuffer instances that wrapped 
pooled bytes
+     * as the send does not mean the data will be sent immediately when the 
transport is
+     * flushed so the pooled bytes could be held for longer than expected.
+     *
+     * @param buffer
+     *      An immutable ReadableBuffer that can be held until the next 
transport flush.
+     *
+     * @return the number of bytes read from the provided buffer.
+     */
+    public int sendNoCopy(ReadableBuffer buffer);
+
+    /**
      * Abort the current delivery.
      *
      * Note "pn_link_abort" is commented out in the .h


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to