http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 61f8ec8..e0b82b2 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -22,7 +22,9 @@ package org.apache.qpid.proton.engine.impl;
 
 import java.util.Arrays;
 
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.codec.CompositeReadableBuffer;
 import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
@@ -33,6 +35,8 @@ public class DeliveryImpl implements Delivery
 {
     public static final int DEFAULT_MESSAGE_FORMAT = 0;
 
+    private static final ReadableBuffer EMPTY_BUFFER = 
ReadableBuffer.ByteBufferReader.allocate(0);
+
     private DeliveryImpl _linkPrevious;
     private DeliveryImpl _linkNext;
 
@@ -63,12 +67,12 @@ public class DeliveryImpl implements Delivery
     private int _flags = (byte) 0;
 
     private TransportDelivery _transportDelivery;
-    private byte[] _data;
-    private int _dataSize;
     private boolean _complete;
     private boolean _updated;
     private boolean _done;
-    private int _offset;
+
+    private CompositeReadableBuffer _dataBuffer;
+    private ReadableBuffer _dataView;
 
     DeliveryImpl(final byte[] tag, final LinkImpl link, DeliveryImpl previous)
     {
@@ -76,7 +80,7 @@ public class DeliveryImpl implements Delivery
         _link = link;
         _link.incrementUnsettled();
         _linkPrevious = previous;
-        if(previous != null)
+        if (previous != null)
         {
             previous._linkNext = this;
         }
@@ -212,7 +216,6 @@ public class DeliveryImpl implements Delivery
         return _workPrev;
     }
 
-
     void setWorkNext(DeliveryImpl workNext)
     {
         _workNext = workNext;
@@ -226,41 +229,53 @@ public class DeliveryImpl implements Delivery
     int recv(final byte[] bytes, int offset, int size)
     {
         final int consumed;
-        if (_data != null)
+        if (_dataBuffer != null && _dataBuffer.hasRemaining())
         {
-            //TODO - should only be if no bytes left
-            consumed = Math.min(size, _dataSize);
+            consumed = Math.min(size, _dataBuffer.remaining());
 
-            System.arraycopy(_data, _offset, bytes, offset, consumed);
-            _offset += consumed;
-            _dataSize -= consumed;
+            _dataBuffer.get(bytes, offset, consumed);
+            _dataBuffer.reclaimRead();
         }
         else
         {
-            _dataSize = consumed = 0;
+            consumed = 0;
         }
 
         return (_complete && consumed == 0) ? Transport.END_OF_STREAM : 
consumed;  //TODO - Implement
     }
 
-    int recv(final WritableBuffer buffer) {
+    int recv(final WritableBuffer buffer)
+    {
         final int consumed;
-        if (_data != null)
+        if (_dataBuffer != null && _dataBuffer.hasRemaining())
         {
-            consumed = Math.min(buffer.remaining(), _dataSize);
-
-            buffer.put(_data, _offset, consumed);
-            _offset += consumed;
-            _dataSize -= consumed;
+            consumed = Math.min(buffer.remaining(), _dataBuffer.remaining());
+            buffer.put(_dataBuffer);
+            _dataBuffer.reclaimRead();
         }
         else
         {
-            _dataSize = consumed = 0;
+            consumed = 0;
         }
 
         return (_complete && consumed == 0) ? Transport.END_OF_STREAM : 
consumed;
     }
 
+    ReadableBuffer recv()
+    {
+        ReadableBuffer result = _dataView;
+        if (_dataView != null)
+        {
+            _dataView = _dataBuffer = null;
+        }
+        else
+        {
+            result = EMPTY_BUFFER;
+        }
+
+        return result;
+    }
+
     void updateWork()
     {
         getLink().getConnectionImpl().workUpdate(this);
@@ -278,13 +293,11 @@ public class DeliveryImpl implements Delivery
         getLink().getConnectionImpl().addTransportWork(this);
     }
 
-
     DeliveryImpl getTransportWorkNext()
     {
         return _transportWorkNext;
     }
 
-
     DeliveryImpl getTransportWorkPrev()
     {
         return _transportWorkPrev;
@@ -318,78 +331,132 @@ public class DeliveryImpl implements Delivery
 
     int send(byte[] bytes, int offset, int length)
     {
-        if(_data == null)
-        {
-            _data = new byte[length];
-        }
-        else if(_data.length - _dataSize < length)
-        {
-            byte[] oldData = _data;
-            _data = new byte[oldData.length + _dataSize];
-            System.arraycopy(oldData, _offset, _data, 0, _dataSize);
-            _offset = 0;
-        }
-        System.arraycopy(bytes, offset, _data, _dataSize + _offset, length);
-        _dataSize += length;
+        byte[] copy = new byte[length];
+        System.arraycopy(bytes, offset, copy, 0, length);
+        getOrCreateDataBuffer().append(copy);
         addToTransportWorkList();
-        return length;  //TODO - Implement.
+        return length;
     }
 
     int send(final ReadableBuffer buffer)
     {
         int length = buffer.remaining();
+        getOrCreateDataBuffer().append(copyContents(buffer));
+        addToTransportWorkList();
+        return length;
+    }
+
+    int sendNoCopy(ReadableBuffer buffer)
+    {
+        int length = buffer.remaining();
 
-        if(_data == null)
+        if (_dataView == null || !_dataView.hasRemaining())
         {
-            _data = new byte[length];
+            _dataView = buffer;
         }
-        else if(_data.length - _dataSize < length)
+        else
         {
-            byte[] oldData = _data;
-            _data = new byte[oldData.length + _dataSize];
-            System.arraycopy(oldData, _offset, _data, 0, _dataSize);
-            _offset = 0;
+            consolidateSendBuffers(buffer);
         }
-        buffer.get(_data, _offset, length);
-        _dataSize+=length;
+
         addToTransportWorkList();
         return length;
     }
 
-    byte[] getData()
+    private byte[] copyContents(ReadableBuffer buffer)
     {
-        return _data;
+        byte[] copy = new byte[buffer.remaining()];
+
+        if (buffer.hasArray())
+        {
+            System.arraycopy(buffer.array(), buffer.arrayOffset(), copy, 0, 
buffer.remaining());
+            buffer.position(buffer.limit());
+        }
+        else
+        {
+            buffer.get(copy, 0, buffer.remaining());
+        }
+
+        return copy;
     }
 
-    int getDataOffset()
+    private void consolidateSendBuffers(ReadableBuffer buffer)
     {
-        return _offset;
+        if (_dataView == _dataBuffer)
+        {
+            getOrCreateDataBuffer().append(copyContents(buffer));
+        }
+        else
+        {
+            ReadableBuffer oldView = _dataView;
+
+            CompositeReadableBuffer dataBuffer = getOrCreateDataBuffer();
+            dataBuffer.append(copyContents(oldView));
+            dataBuffer.append(copyContents(buffer));
+
+            oldView.reclaimRead();
+        }
+
+        buffer.reclaimRead();  // A pooled buffer could release now.
     }
 
-    int getDataLength()
+    void append(Binary payload)
+    {
+        byte[] data = payload.getArray();
+
+        // The Composite buffer cannot handle composites where the array
+        // is a view of a larger array so we must copy the payload into
+        // an array of the exact size
+        if (payload.getArrayOffset() > 0 || payload.getLength() < data.length)
+        {
+            data = new byte[payload.getLength()];
+            System.arraycopy(payload.getArray(), payload.getArrayOffset(), 
data, 0, payload.getLength());
+        }
+
+        getOrCreateDataBuffer().append(data);
+    }
+
+    private CompositeReadableBuffer getOrCreateDataBuffer()
+    {
+        if (_dataBuffer == null)
+        {
+            _dataView = _dataBuffer = new CompositeReadableBuffer();
+        }
+
+        return _dataBuffer;
+    }
+
+    void append(byte[] data)
     {
-        return _dataSize;  //TODO - Implement.
+        getOrCreateDataBuffer().append(data);
     }
 
-    void setData(byte[] data)
+    void afterSend()
     {
-        _data = data;
+        if (_dataView != null)
+        {
+            _dataView.reclaimRead();
+            if (!_dataView.hasRemaining())
+            {
+                _dataView = _dataBuffer;
+            }
+        }
     }
 
-    void setDataLength(int length)
+    ReadableBuffer getData()
     {
-        _dataSize = length;
+        return _dataView == null ? EMPTY_BUFFER : _dataView;
     }
 
-    public void setDataOffset(int arrayOffset)
+    int getDataLength()
     {
-        _offset = arrayOffset;
+        return _dataView == null ? 0 : _dataView.remaining();
     }
 
     @Override
     public int available()
     {
-        return _dataSize;
+        return _dataView == null ? 0 : _dataView.remaining();
     }
 
     @Override
@@ -437,7 +504,6 @@ public class DeliveryImpl implements Delivery
         getLink().getConnectionImpl().workUpdate(this);
     }
 
-
     void setDone()
     {
         _done = true;
@@ -462,7 +528,12 @@ public class DeliveryImpl implements Delivery
             if (isDone()) {
                 return false;
             } else {
-                return _complete || _dataSize > 0;
+                boolean hasRemaining = false;
+                if (_dataView != null) {
+                    hasRemaining = _dataView.hasRemaining();
+                }
+
+                return _complete || hasRemaining;
             }
         } else {
             return false;
@@ -505,18 +576,18 @@ public class DeliveryImpl implements Delivery
             .append(", _flags=").append(_flags)
             .append(", _defaultDeliveryState=").append(_defaultDeliveryState)
             .append(", _transportDelivery=").append(_transportDelivery)
-            .append(", _dataSize=").append(_dataSize)
+            .append(", _data Size=").append(getDataLength())
             .append(", _complete=").append(_complete)
             .append(", _updated=").append(_updated)
             .append(", _done=").append(_done)
-            .append(", _offset=").append(_offset).append("]");
+            .append("]");
         return builder.toString();
     }
 
     @Override
     public int pending()
     {
-        return _dataSize;
+        return _dataView == null ? 0 : _dataView.remaining();
     }
 
     @Override
@@ -530,5 +601,4 @@ public class DeliveryImpl implements Delivery
     {
         return _defaultDeliveryState;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
index eb16624..abf4ba9 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
@@ -24,6 +24,7 @@ import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.transport.EmptyFrame;
 import org.apache.qpid.proton.amqp.transport.FrameBody;
 import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.framing.TransportFrame;
 
@@ -96,7 +97,7 @@ class FrameWriter
         _frameStart = _buffer.position();
     }
 
-    private void writePerformative(Object frameBody, ByteBuffer payload, 
Runnable onPayloadTooLarge)
+    private void writePerformative(Object frameBody, ReadableBuffer payload, 
Runnable onPayloadTooLarge)
     {
         while (_buffer.remaining() < 8) {
             grow();
@@ -146,7 +147,7 @@ class FrameWriter
         _buffer.position(limit);
     }
 
-    void writeFrame(int channel, Object frameBody, ByteBuffer payload,
+    void writeFrame(int channel, Object frameBody, ReadableBuffer payload,
                     Runnable onPayloadTooLarge)
     {
         startFrame();
@@ -162,7 +163,7 @@ class FrameWriter
         int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), 
capacity);
 
         ProtocolTracer tracer = _protocolTracer == null ? null : 
_protocolTracer.get();
-        if(tracer != null || _transport.isTraceFramesEnabled())
+        if (tracer != null || _transport.isTraceFramesEnabled())
         {
             logFrame(tracer, channel, frameBody, payload, payloadSize);
         }
@@ -185,13 +186,11 @@ class FrameWriter
         _framesOutput += 1;
     }
 
-    private void logFrame(ProtocolTracer tracer, int channel, Object 
frameBody, ByteBuffer payload, int payloadSize)
+    private void logFrame(ProtocolTracer tracer, int channel, Object 
frameBody, ReadableBuffer payload, int payloadSize)
     {
-        // XXX: this is a bit of a hack but it eliminates duplicate
-        // code, further refactor will fix this
         if (_frameType == AMQP_FRAME_TYPE)
         {
-            ByteBuffer originalPayload = null;
+            ReadableBuffer originalPayload = null;
             if (payload!=null)
             {
                 originalPayload = payload.duplicate();

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
index 6f86700..337f847 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.proton.engine.impl;
 
 import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Receiver;
 
@@ -114,6 +115,23 @@ public class ReceiverImpl extends LinkImpl implements 
Receiver
     }
 
     @Override
+    public ReadableBuffer recv()
+    {
+        if (_current == null) {
+            throw new IllegalStateException("no current delivery");
+        }
+
+        ReadableBuffer consumed = _current.recv();
+        if (consumed.remaining() > 0) {
+            getSession().incrementIncomingBytes(-consumed.remaining());
+            if 
(getSession().getTransportSession().getIncomingWindowSize().equals(UnsignedInteger.ZERO))
 {
+                modified();
+            }
+        }
+        return consumed;
+    }
+
+    @Override
     void doFree()
     {
         getSession().freeReceiver(this);

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
index f418655..afb1b2e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
@@ -79,6 +79,25 @@ public class SenderImpl  extends LinkImpl implements Sender
     }
 
     @Override
+    public int sendNoCopy(final ReadableBuffer buffer)
+    {
+        if (getLocalState() == EndpointState.CLOSED)
+        {
+            throw new IllegalStateException("send not allowed after the sender 
is closed.");
+        }
+        DeliveryImpl current = current();
+        if (current == null || current.getLink() != this)
+        {
+            throw new IllegalArgumentException();
+        }
+        int sent = current.sendNoCopy(buffer);
+        if (sent > 0) {
+            getSession().incrementOutgoingBytes(sent);
+        }
+        return sent;
+    }
+
+    @Override
     public void abort()
     {
         //TODO.

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 0f969c8..1d0103e 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -47,6 +47,7 @@ import org.apache.qpid.proton.amqp.transport.Transfer;
 import org.apache.qpid.proton.codec.AMQPDefinedTypes;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
@@ -589,24 +590,23 @@ public class TransportImpl extends EndpointImpl
                 
transfer.setMessageFormat(UnsignedInteger.valueOf(messageFormat));
             }
 
-            ByteBuffer payload = delivery.getData() ==  null ? null :
-                ByteBuffer.wrap(delivery.getData(), delivery.getDataOffset(),
-                                delivery.getDataLength());
+            ReadableBuffer payload = delivery.getData();
+
+            int pending = payload.remaining();
 
             try {
                 writeFrame(tpSession.getLocalChannel(), transfer, payload, 
partialTransferHandler.setTransfer(transfer));
             } finally {
                 partialTransferHandler.setTransfer(null);
+                delivery.afterSend();  // Allow for freeing resources after 
write of buffered data
             }
 
             tpSession.incrementOutgoingId();
             tpSession.decrementRemoteIncomingWindow();
 
-            if(payload == null || !payload.hasRemaining())
+            if (payload == null || !payload.hasRemaining())
             {
-                session.incrementOutgoingBytes(-delivery.pending());
-                delivery.setData(null);
-                delivery.setDataLength(0);
+                session.incrementOutgoingBytes(-pending);
 
                 if (!transfer.getMore()) {
                     // Clear the in-progress delivery marker
@@ -622,10 +622,7 @@ public class TransportImpl extends EndpointImpl
             }
             else
             {
-                int delta = delivery.getDataLength() - payload.remaining();
-                delivery.setDataOffset(delivery.getDataOffset() + delta);
-                delivery.setDataLength(payload.remaining());
-                session.incrementOutgoingBytes(-delta);
+                session.incrementOutgoingBytes(-(pending - 
payload.remaining()));
 
                 // Remember the delivery we are still processing
                 // the body transfer frames for
@@ -1072,7 +1069,7 @@ public class TransportImpl extends EndpointImpl
     }
 
     protected void writeFrame(int channel, FrameBody frameBody,
-                            ByteBuffer payload, Runnable onPayloadTooLarge)
+                              ReadableBuffer payload, Runnable 
onPayloadTooLarge)
     {
         _frameWriter.writeFrame(channel, frameBody, payload, 
onPayloadTooLarge);
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index a09889e..bbacd30 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -298,28 +298,14 @@ class TransportSession
             delivery.setRemoteDeliveryState(transfer.getState());
         }
         _unsettledIncomingSize++;
-        // TODO - should this be a copy?
-        if(payload != null)
+
+        if (payload != null)
         {
-            if(delivery.getDataLength() == 0)
-            {
-                delivery.setData(payload.getArray());
-                delivery.setDataLength(payload.getLength());
-                delivery.setDataOffset(payload.getArrayOffset());
-            }
-            else
-            {
-                byte[] data = new byte[delivery.getDataLength() + 
payload.getLength()];
-                System.arraycopy(delivery.getData(), delivery.getDataOffset(), 
data, 0, delivery.getDataLength());
-                System.arraycopy(payload.getArray(), payload.getArrayOffset(), 
data, delivery.getDataLength(), payload.getLength());
-                delivery.setData(data);
-                delivery.setDataOffset(0);
-                delivery.setDataLength(data.length);
-            }
+            delivery.append(payload);
             getSession().incrementIncomingBytes(payload.getLength());
         }
-        delivery.updateWork();
 
+        delivery.updateWork();
 
         if(!(transfer.getMore() || transfer.getAborted()))
         {

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
index cd90789..a31c169 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
@@ -40,7 +40,7 @@ public class MessageImpl implements ProtonJMessage
     private ApplicationProperties _applicationProperties;
     private Section _body;
     private Footer _footer;
-    
+
     private static class EncoderDecoderPair {
       DecoderImpl decoder = new DecoderImpl();
       EncoderImpl encoder = new EncoderImpl(decoder);
@@ -576,8 +576,13 @@ public class MessageImpl implements ProtonJMessage
 
     public void decode(ByteBuffer buffer)
     {
+        decode(ReadableBuffer.ByteBufferReader.wrap(buffer));
+    }
+
+    public void decode(ReadableBuffer buffer)
+    {
         DecoderImpl decoder = tlsCodec.get().decoder;
-        decoder.setByteBuffer(buffer);
+        decoder.setBuffer(buffer);
 
         _header = null;
         _deliveryAnnotations = null;
@@ -681,7 +686,7 @@ public class MessageImpl implements ProtonJMessage
 
         }
 
-        decoder.setByteBuffer(null);
+        decoder.setBuffer(null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/ec554715/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java 
b/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java
index 529a808..5dd4077 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java
@@ -22,7 +22,6 @@ package org.apache.qpid.proton.codec;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -44,12 +43,13 @@ import org.apache.qpid.proton.amqp.transport.Disposition;
 import org.apache.qpid.proton.amqp.transport.Flow;
 import org.apache.qpid.proton.amqp.transport.Role;
 import org.apache.qpid.proton.amqp.transport.Transfer;
+import org.apache.qpid.proton.codec.WritableBuffer.ByteBufferWrapper;
 
 public class Benchmark implements Runnable {
 
     private static final int ITERATIONS = 10 * 1024 * 1024;
 
-    private ByteBuffer byteBuf = ByteBuffer.allocate(8192);
+    private ByteBufferWrapper outputBuf = 
WritableBuffer.ByteBufferWrapper.allocate(8192);
     private BenchmarkResult resultSet = new BenchmarkResult();
     private boolean warming = true;
 
@@ -66,8 +66,7 @@ public class Benchmark implements Runnable {
     public void run() {
         AMQPDefinedTypes.registerAllTypes(decoder, encoder);
 
-        encoder.setByteBuffer(byteBuf);
-        decoder.setByteBuffer(byteBuf);
+        encoder.setByteBuffer(outputBuf);
 
         try {
             doBenchmarks();
@@ -102,6 +101,16 @@ public class Benchmark implements Runnable {
         warming = false;
     }
 
+    private CompositeReadableBuffer convertToComposite(WritableBuffer buffer) {
+        CompositeReadableBuffer composite = new CompositeReadableBuffer();
+        ReadableBuffer readableView = outputBuf.toReadableBuffer();
+
+        byte[] copy = new byte[readableView.remaining()];
+        readableView.get(copy);
+
+        return composite.append(copy);
+    }
+
     private void benchmarkListOfInts() throws IOException {
         ArrayList<Object> list = new ArrayList<>(10);
         for (int j = 0; j < 10; j++) {
@@ -110,15 +119,18 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeList(list);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readList();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -130,15 +142,18 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeUUID(uuid);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readUUID();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -152,15 +167,18 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(header);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -175,15 +193,18 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(transfer);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -202,15 +223,18 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(flow);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -225,15 +249,18 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(properties);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -248,22 +275,24 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(annotations);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
         time("MessageAnnotations", resultSet);
     }
 
-    @SuppressWarnings("unchecked")
     private void benchmarkApplicationProperties() throws IOException {
         ApplicationProperties properties = new ApplicationProperties(new 
HashMap<String, Object>());
         properties.getValue().put("test1", UnsignedByte.valueOf((byte) 128));
@@ -272,15 +301,18 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(properties);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -294,19 +326,22 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeSymbol(symbol1);
             encoder.writeSymbol(symbol2);
             encoder.writeSymbol(symbol3);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readSymbol();
             decoder.readSymbol();
             decoder.readSymbol();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -323,15 +358,18 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(disposition);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -345,19 +383,22 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeString(string1);
             encoder.writeString(string2);
             encoder.writeString(string3);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readString();
             decoder.readString();
             decoder.readString();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -371,19 +412,22 @@ public class Benchmark implements Runnable {
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(data1);
             encoder.writeObject(data2);
             encoder.writeObject(data3);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
             decoder.readObject();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to