Author: lquack
Date: Wed Oct 14 08:53:42 2015
New Revision: 1708561

URL: http://svn.apache.org/viewvc?rev=1708561&view=rev
Log:
QPID-6735: [Java Broker] Refactor how persisted messages are loaded from disk.

Messages that are loaded from disk are immediately reflown to disk potentially 
releasing the underlying QpidByteBuffer.
Add extra code to avoid reloading of message content on creation of chunk for 
delivery by doing the chunking on a higher level.
Removed unused offset parameter from MessageContentSource#getContent(ByteBuffer 
dst, int offset).

Modified:
    
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
    
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
    
qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
    
qpid/java/trunk/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java

Modified: 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 Wed Oct 14 08:53:42 2015
@@ -1040,7 +1040,7 @@ public abstract class AbstractBDBMessage
                 {
                     checkMessageStoreOpen();
                     metaData = (T) getMessageMetaData(_messageId);
-                    _messageDataRef = new MessageDataSoftRef<>(metaData, null);
+                    _messageDataRef = new MessageDataSoftRef<>(metaData, 
_messageDataRef.getData());
                 }
                 return metaData;
             }
@@ -1078,8 +1078,9 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public synchronized int getContent(int offsetInMessage, final 
ByteBuffer dst)
+        public synchronized int getContent(final ByteBuffer dst)
         {
+            // These do not need to be disposed of because 
getContentAsByteBuffer() retains a reference
             Collection<QpidByteBuffer> allContent = getContentAsByteBuffer();
             int length = 0;
             for(QpidByteBuffer contentChunk : allContent)
@@ -1090,6 +1091,9 @@ public abstract class AbstractBDBMessage
             return length;
         }
 
+        /**
+         * returns QBBs containing the content. The caller must not dispose of 
them because we keep a reference in _messageDataRef.
+         */
         private Collection<QpidByteBuffer> getContentAsByteBuffer()
         {
             Collection<QpidByteBuffer> data = _messageDataRef == null ? 
Collections.<QpidByteBuffer>emptyList() : _messageDataRef.getData();
@@ -1099,16 +1103,7 @@ public abstract class AbstractBDBMessage
                 {
                     checkMessageStoreOpen();
                     data = 
AbstractBDBMessageStore.this.getAllContent(_messageId);
-                    T metaData = _messageDataRef.getMetaData();
-                    if (metaData == null)
-                    {
-                        metaData = (T) getMessageMetaData(_messageId);
-                        _messageDataRef = new MessageDataSoftRef<T>(metaData, 
data);
-                    }
-                    else
-                    {
-                        _messageDataRef.setData(data);
-                    }
+                    _messageDataRef.setData(data);
                 }
                 else
                 {
@@ -1119,44 +1114,14 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public synchronized Collection<QpidByteBuffer> getContent(final int 
offsetInMessage, final int size)
+        public synchronized Collection<QpidByteBuffer> getContent()
         {
-            int pos = 0;
-            int added = 0;
-
             Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
-            List<QpidByteBuffer> content = new ArrayList<>(bufs.size());
-            for(QpidByteBuffer buf : bufs)
+            Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+            for (QpidByteBuffer buf : bufs)
             {
-                if(pos < offsetInMessage)
-                {
-                    final int remaining = buf.remaining();
-                    if(pos+ remaining >=offsetInMessage)
-                    {
-                        buf = buf.view(offsetInMessage-pos,size);
-
-                        content.add(buf);
-                        added += buf.remaining();
-                    }
-                    pos+= remaining;
-
-                }
-                else
-                {
-                    buf = buf.slice();
-                    if(buf.remaining() > (size-added))
-                    {
-                        buf.limit(size-added);
-                    }
-                    content.add(buf);
-                    added += buf.remaining();
-                }
-                if(added >= size)
-                {
-                    break;
-                }
+                content.add(buf.duplicate());
             }
-
             return content;
         }
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
 Wed Oct 14 08:53:42 2015
@@ -169,15 +169,39 @@ public abstract class AbstractServerMess
     }
 
     @Override
-    final public int getContent(ByteBuffer buf, int offset)
+    final public int getContent(ByteBuffer buf)
     {
-        return getStoredMessage().getContent(offset, buf);
+        StoredMessage<T> storedMessage = getStoredMessage();
+        boolean wasInMemory = storedMessage.isInMemory();
+        try
+        {
+            return storedMessage.getContent(buf);
+        }
+        finally
+        {
+            if (!wasInMemory)
+            {
+                storedMessage.flowToDisk();
+            }
+        }
     }
 
     @Override
-    final public Collection<QpidByteBuffer> getContent(int offset, int size)
+    final public Collection<QpidByteBuffer> getContent()
     {
-        return getStoredMessage().getContent(offset, size);
+        StoredMessage<T> storedMessage = getStoredMessage();
+        boolean wasInMemory = storedMessage.isInMemory();
+        try
+        {
+            return storedMessage.getContent();
+        }
+        finally
+        {
+            if (!wasInMemory)
+            {
+                storedMessage.flowToDisk();
+            }
+        }
     }
 
     final public Object getConnectionReference()

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
 Wed Oct 14 08:53:42 2015
@@ -28,8 +28,8 @@ import org.apache.qpid.bytebuffer.QpidBy
 
 public interface MessageContentSource
 {
-    int getContent(ByteBuffer buf, int offset);
-    Collection<QpidByteBuffer> getContent(int offset, int size);
+    int getContent(ByteBuffer buf);
+    Collection<QpidByteBuffer> getContent();
 
     long getSize();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
 Wed Oct 14 08:53:42 2015
@@ -65,7 +65,7 @@ public class InternalMessage extends Abs
     {
         super(msg, null);
         _contentSize = msg.getMetaData().getContentSize();
-        Collection<QpidByteBuffer> bufs = msg.getContent(0, _contentSize);
+        Collection<QpidByteBuffer> bufs = msg.getContent();
 
         try(ObjectInputStream is = new ObjectInputStream(new 
ByteBufferInputStream(ByteBufferUtils.combine(bufs))))
         {
@@ -224,24 +224,21 @@ public class InternalMessage extends Abs
                     }
 
                     @Override
-                    public int getContent(final int offsetInMessage, final 
ByteBuffer dst)
+                    public int getContent(final ByteBuffer dst)
                     {
-                        ByteBuffer buffer = ByteBuffer.wrap(bytes);
-                        buffer.position(offsetInMessage);
-                        buffer = buffer.slice();
-                        if (dst.remaining() < buffer.remaining())
+                        int size = bytes.length;
+                        if (dst.remaining() < size)
                         {
-                            buffer.limit(dst.remaining());
+                            size = dst.remaining();
                         }
-                        int pos = dst.position();
-                        dst.put(buffer);
-                        return dst.position() - pos;
+                        dst.put(bytes, 0 ,size);
+                        return size;
                     }
 
                     @Override
-                    public Collection<QpidByteBuffer> getContent(final int 
offsetInMessage, final int size)
+                    public Collection<QpidByteBuffer> getContent()
                     {
-                        return 
Collections.singleton(QpidByteBuffer.wrap(bytes, offsetInMessage, size));
+                        return 
Collections.singleton(QpidByteBuffer.wrap(bytes));
                     }
 
                     @Override

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Wed Oct 14 08:53:42 2015
@@ -3463,7 +3463,7 @@ public abstract class AbstractQueue<X ex
                             _size = message.getSize();
                             _content = new byte[(int) _size];
                             _found = true;
-                            message.getContent(ByteBuffer.wrap(_content), 0);
+                            message.getContent(ByteBuffer.wrap(_content));
                         }
                         finally
                         {

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
 Wed Oct 14 08:53:42 2015
@@ -1441,7 +1441,7 @@ public abstract class AbstractJDBCMessag
                     try
                     {
                         metaData = (T) 
AbstractJDBCMessageStore.this.getMetaData(_messageId);
-                        _messageDataRef = new 
MessageDataSoftRef<>(metaData,null);
+                        _messageDataRef = new MessageDataSoftRef<>(metaData, 
_messageDataRef.getData());
                     }
                     catch (SQLException e)
                     {
@@ -1484,8 +1484,9 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public synchronized int getContent(int offsetInMessage, final 
ByteBuffer dst)
+        public synchronized int getContent(final ByteBuffer dst)
         {
+            // These do not need to be disposed of because 
getContentAsByteBuffer() retains a reference
             Collection<QpidByteBuffer> allContent = getContentAsByteBuffer();
             int length = 0;
             for(QpidByteBuffer contentChunk : allContent)
@@ -1496,6 +1497,9 @@ public abstract class AbstractJDBCMessag
             return length;
         }
 
+        /**
+         * returns QBBs containing the content. The caller must not dispose of 
them because we keep a reference in _messageDataRef.
+         */
         private Collection<QpidByteBuffer> getContentAsByteBuffer()
         {
             Collection<QpidByteBuffer> data = _messageDataRef == null ? 
Collections.<QpidByteBuffer>emptyList() : _messageDataRef.getData();
@@ -1505,23 +1509,7 @@ public abstract class AbstractJDBCMessag
                 {
                     checkMessageStoreOpen();
                     data = 
AbstractJDBCMessageStore.this.getAllContent(_messageId);
-                    T metaData = _messageDataRef.getMetaData();
-                    if (metaData == null)
-                    {
-                        try
-                        {
-                            metaData = (T) 
AbstractJDBCMessageStore.this.getMetaData(_messageId);
-                            _messageDataRef = new 
MessageDataSoftRef<T>(metaData, data);
-                        }
-                        catch (SQLException e)
-                        {
-                            throw new StoreException("Failed to get content 
for message id " + _messageId, e);
-                        }
-                    }
-                    else
-                    {
-                        _messageDataRef.setData(data);
-                    }
+                    _messageDataRef.setData(data);
                 }
                 else
                 {
@@ -1532,44 +1520,14 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public synchronized Collection<QpidByteBuffer> getContent(final int 
offsetInMessage, final int size)
+        public synchronized Collection<QpidByteBuffer> getContent()
         {
-            int pos = 0;
-            int added = 0;
-
             Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
-            List<QpidByteBuffer> content = new ArrayList<>(bufs.size());
-            for(QpidByteBuffer buf : bufs)
+            Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+            for (QpidByteBuffer buf : bufs)
             {
-                if(pos < offsetInMessage)
-                {
-                    final int remaining = buf.remaining();
-                    if(pos+ remaining >=offsetInMessage)
-                    {
-                        buf = buf.view(offsetInMessage-pos,size);
-
-                        content.add(buf);
-                        added += buf.remaining();
-                    }
-                    pos+= remaining;
-
-                }
-                else
-                {
-                    buf = buf.slice();
-                    if(buf.remaining() > (size-added))
-                    {
-                        buf.limit(size-added);
-                    }
-                    content.add(buf);
-                    added += buf.remaining();
-                }
-                if(added >= size)
-                {
-                    break;
-                }
+                content.add(buf.duplicate());
             }
-
             return content;
         }
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
 Wed Oct 14 08:53:42 2015
@@ -84,7 +84,7 @@ public class StoredMemoryMessage<T exten
         return this;
     }
 
-    public int getContent(int offset, ByteBuffer dst)
+    public int getContent(ByteBuffer dst)
     {
         if(_content == null)
         {
@@ -92,12 +92,10 @@ public class StoredMemoryMessage<T exten
         }
         QpidByteBuffer src = _content.duplicate();
 
-        int oldPosition = src.position();
-
-        src.position(oldPosition + offset);
+        src.position(0);
 
         int length = dst.remaining() < src.remaining() ? dst.remaining() : 
src.remaining();
-        src.limit(oldPosition + length);
+        src.limit(length);
 
         src.get(dst);
 
@@ -105,23 +103,14 @@ public class StoredMemoryMessage<T exten
         return length;
     }
 
-
-    public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
+    @Override
+    public Collection<QpidByteBuffer> getContent()
     {
         if(_content == null)
         {
             return null;
         }
-        QpidByteBuffer buf = _content.duplicate();
-
-        if(offsetInMessage != 0)
-        {
-            buf.position(offsetInMessage);
-            buf = buf.slice();
-        }
-
-        buf.limit(Math.min(size,buf.remaining()));
-        return Collections.singleton(buf);
+        return Collections.singleton(_content.duplicate());
     }
 
     public T getMetaData()

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
 Wed Oct 14 08:53:42 2015
@@ -31,9 +31,9 @@ public interface StoredMessage<M extends
 
     long getMessageNumber();
 
-    int getContent(int offsetInMessage, ByteBuffer dst);
+    int getContent(ByteBuffer dst);
 
-    Collection<QpidByteBuffer> getContent(int offsetInMessage, int size);
+    Collection<QpidByteBuffer> getContent();
 
     void remove();
 

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
 Wed Oct 14 08:53:42 2015
@@ -100,13 +100,13 @@ public class TestMessageMetaDataType imp
         }
 
         @Override
-        public int getContent(ByteBuffer buf, int offset)
+        public int getContent(ByteBuffer buf)
         {
             return 0;
         }
 
         @Override
-        public Collection<QpidByteBuffer> getContent(int offset, int size)
+        public Collection<QpidByteBuffer> getContent()
         {
             return null;
         }

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
 Wed Oct 14 08:53:42 2015
@@ -49,11 +49,13 @@ class MockServerMessage implements Serve
         this.persistent = persistent;
     }
 
+    @Override
     public boolean isPersistent()
     {
         return persistent;
     }
 
+    @Override
     public MessageReference newReference()
     {
         throw new UnsupportedOperationException();
@@ -77,38 +79,44 @@ class MockServerMessage implements Serve
         return false;
     }
 
+    @Override
     public long getSize()
     {
         throw new UnsupportedOperationException();
     }
 
+    @Override
     public String getInitialRoutingAddress()
     {
         throw new UnsupportedOperationException();
     }
 
+    @Override
     public AMQMessageHeader getMessageHeader()
     {
         throw new UnsupportedOperationException();
     }
 
+    @Override
     public StoredMessage getStoredMessage()
     {
         throw new UnsupportedOperationException();
     }
 
+    @Override
     public long getExpiration()
     {
         throw new UnsupportedOperationException();
     }
 
-    public int getContent(ByteBuffer buf, int offset)
+    @Override
+    public int getContent(ByteBuffer buf)
     {
         throw new UnsupportedOperationException();
     }
 
-
-    public Collection<QpidByteBuffer> getContent(int offset, int size)
+    @Override
+    public Collection<QpidByteBuffer> getContent()
     {
         throw new UnsupportedOperationException();
     }
@@ -119,11 +127,13 @@ class MockServerMessage implements Serve
         return null;
     }
 
+    @Override
     public long getArrivalTime()
     {
         throw new UnsupportedOperationException();
     }
 
+    @Override
     public long getMessageNumber()
     {
         return 0L;

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
 Wed Oct 14 08:53:42 2015
@@ -79,22 +79,21 @@ public class MessageConverter_Internal_t
                     }
 
                     @Override
-                    public int getContent(int offsetInMessage, ByteBuffer dst)
+                    public int getContent(ByteBuffer dst)
                     {
-                        int size = messageContent.length - offsetInMessage;
+                        int size = messageContent.length;
                         if(dst.remaining() < size)
                         {
                             size = dst.remaining();
                         }
-                        ByteBuffer buf = ByteBuffer.wrap(messageContent, 
offsetInMessage, size);
-                        dst.put(buf);
+                        dst.put(messageContent, 0, size);
                         return size;
                     }
 
                     @Override
-                    public Collection<QpidByteBuffer> getContent(int 
offsetInMessage, int size)
+                    public Collection<QpidByteBuffer> getContent()
                     {
-                        return 
Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, 
size));
+                        return 
Collections.singleton(QpidByteBuffer.wrap(messageContent));
                     }
 
                     @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
 Wed Oct 14 08:53:42 2015
@@ -26,7 +26,6 @@ import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -86,15 +85,15 @@ public class MessageConverter_v0_10 impl
                     }
 
                     @Override
-                    public int getContent(int offsetInMessage, ByteBuffer dst)
+                    public int getContent(ByteBuffer dst)
                     {
-                        return serverMsg.getContent(dst, offsetInMessage);
+                        return serverMsg.getContent(dst);
                     }
 
                     @Override
-                    public Collection<QpidByteBuffer> getContent(int 
offsetInMessage, int size)
+                    public Collection<QpidByteBuffer> getContent()
                     {
-                        return serverMsg.getContent(offsetInMessage, size);
+                        return serverMsg.getContent();
                     }
 
                     @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
 Wed Oct 14 08:53:42 2015
@@ -61,7 +61,7 @@ public class MessageConverter_v0_10_to_I
     {
         final String mimeType = serverMessage.getMessageHeader().getMimeType();
         byte[] data = new byte[(int) serverMessage.getSize()];
-        serverMessage.getContent(ByteBuffer.wrap(data), 0);
+        serverMessage.getContent(ByteBuffer.wrap(data));
 
         Object body = convertMessageBody(mimeType, data);
         MessageProperties messageProps = 
serverMessage.getHeader().getMessageProperties();

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
 Wed Oct 14 08:53:42 2015
@@ -83,6 +83,6 @@ public class MessageTransferMessage exte
 
     public Collection<QpidByteBuffer> getBody()
     {
-        return  getContent(0, (int)getSize());
+        return  getContent();
     }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
 Wed Oct 14 08:53:42 2015
@@ -91,22 +91,21 @@ public class MessageConverter_Internal_t
             }
 
             @Override
-            public int getContent(int offsetInMessage, ByteBuffer dst)
+            public int getContent(ByteBuffer dst)
             {
-                int size = messageContent.length - offsetInMessage;
+                int size = messageContent.length;
                 if(dst.remaining() < size)
                 {
                     size = dst.remaining();
                 }
-                ByteBuffer buf = ByteBuffer.wrap(messageContent, 
offsetInMessage, size);
-                dst.put(buf);
+                dst.put(messageContent, 0, size);
                 return size;
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(int offsetInMessage, 
int size)
+            public Collection<QpidByteBuffer> getContent()
             {
-                return 
Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, 
size));
+                return 
Collections.singleton(QpidByteBuffer.wrap(messageContent));
             }
 
             @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
 Wed Oct 14 08:53:42 2015
@@ -61,7 +61,7 @@ public class MessageConverter_v0_8_to_In
     {
         final String mimeType = serverMessage.getMessageHeader().getMimeType();
         byte[] data = new byte[(int) serverMessage.getSize()];
-        serverMessage.getContent(ByteBuffer.wrap(data), 0);
+        serverMessage.getContent(ByteBuffer.wrap(data));
 
         Object body = convertMessageBody(mimeType, data);
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
 Wed Oct 14 08:53:42 2015
@@ -22,10 +22,10 @@ package org.apache.qpid.server.protocol.
 
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,14 +111,14 @@ public class ProtocolOutputConverterImpl
         // straight through case
         boolean compressionSupported = _connection.isCompressionSupported();
 
-        Collection<QpidByteBuffer> buffers = null;
+        Collection<QpidByteBuffer> buffers = message.getContent();
 
         long length;
         if(msgCompressed
            && !compressionSupported
-           && ((buffers = message.getContent(0, bodySize)) != null)
+           && (buffers != null)
            && (modifiedContent = GZIPUtils.uncompressBufferToArray(
-                        ByteBufferUtils.combine(buffers))) != null)
+                ByteBufferUtils.combine(buffers))) != null)
         {
             BasicContentHeaderProperties modifiedProps =
                     new 
BasicContentHeaderProperties(contentHeaderBody.getProperties());
@@ -132,7 +132,7 @@ public class ProtocolOutputConverterImpl
                 && compressionSupported
                 && contentHeaderBody.getProperties().getEncoding()==null
                 && bodySize > _connection.getMessageCompressionThreshold()
-                && ((buffers = message.getContent(0, bodySize)) != null)
+                && (buffers != null)
                 && (modifiedContent = 
GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(buffers))) != null)
         {
             BasicContentHeaderProperties modifiedProps =
@@ -145,7 +145,7 @@ public class ProtocolOutputConverterImpl
         }
         else
         {
-            writeMessageDeliveryUnchanged(message, contentHeaderBody, 
channelId, deliverBody, bodySize);
+            writeMessageDeliveryUnchanged(buffers, contentHeaderBody, 
channelId, deliverBody, bodySize);
 
             length = bodySize;
         }
@@ -170,35 +170,17 @@ public class ProtocolOutputConverterImpl
         bodySize = content.length;
         ContentHeaderBody modifiedHeaderBody =
                 new ContentHeaderBody(modifiedProps, bodySize);
-        final MessageContentSource wrappedSource = new MessageContentSource()
-        {
-            @Override
-            public int getContent(final ByteBuffer buf, final int offset)
-            {
-                int size = Math.min(buf.remaining(), content.length - offset);
-                buf.put(content, offset, size);
-                return size;
-            }
-
-            @Override
-            public Collection<QpidByteBuffer> getContent(final int offset, 
final int size)
-            {
-                return Collections.singleton(QpidByteBuffer.wrap(content, 
offset, size));
-            }
-
-            @Override
-            public long getSize()
-            {
-                return content.length;
-            }
-        };
-        writeMessageDeliveryUnchanged(wrappedSource, modifiedHeaderBody, 
channelId, deliverBody, bodySize);
+        
writeMessageDeliveryUnchanged(Collections.singleton(QpidByteBuffer.wrap(content)),
+                                      modifiedHeaderBody, channelId, 
deliverBody, bodySize);
         return bodySize;
     }
 
-    private void writeMessageDeliveryUnchanged(final MessageContentSource 
message,
-                                               final ContentHeaderBody 
contentHeaderBody,
-                                               final int channelId, final 
AMQBody deliverBody, final int bodySize)
+
+    private void writeMessageDeliveryUnchanged(Collection<QpidByteBuffer> 
messageBuffers,
+                                               ContentHeaderBody 
contentHeaderBody,
+                                               int channelId,
+                                               AMQBody deliverBody,
+                                               int bodySize)
     {
         if (bodySize == 0)
         {
@@ -216,7 +198,7 @@ public class ProtocolOutputConverterImpl
 
             int writtenSize = capacity;
 
-            AMQBody firstContentBody = new MessageContentSourceBody(message, 
0, capacity);
+            AMQBody firstContentBody = new 
MessageContentSourceBody(messageBuffers, 0, capacity);
 
             CompositeAMQBodyBlock
                     compositeBlock =
@@ -226,7 +208,7 @@ public class ProtocolOutputConverterImpl
             while (writtenSize < bodySize)
             {
                 capacity = bodySize - writtenSize > maxBodySize ? maxBodySize 
: bodySize - writtenSize;
-                MessageContentSourceBody body = new 
MessageContentSourceBody(message, writtenSize, capacity);
+                AMQBody body = new MessageContentSourceBody(messageBuffers, 
writtenSize, capacity);
                 writtenSize += capacity;
 
                 writeFrame(new AMQFrame(channelId, body));
@@ -243,12 +225,47 @@ public class ProtocolOutputConverterImpl
     {
         public static final byte TYPE = 3;
         private final int _length;
-        private final MessageContentSource _message;
+        private final Collection<QpidByteBuffer> _contentBuffers;
         private final int _offset;
 
-        public MessageContentSourceBody(MessageContentSource message, int 
offset, int length)
+        public MessageContentSourceBody(Collection<QpidByteBuffer> bufs, int 
offset, int length)
         {
-            _message = message;
+            int pos = 0;
+            int added = 0;
+
+            List<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+            for(QpidByteBuffer buf : bufs)
+            {
+                if(pos < offset)
+                {
+                    final int remaining = buf.remaining();
+                    if(pos + remaining > offset)
+                    {
+                        buf = buf.view(offset-pos,length);
+
+                        content.add(buf);
+                        added += buf.remaining();
+                    }
+                    pos += remaining;
+
+                }
+                else
+                {
+                    buf = buf.slice();
+                    if(buf.remaining() > (length-added))
+                    {
+                        buf.limit(length-added);
+                    }
+                    content.add(buf);
+                    added += buf.remaining();
+                }
+                if(added >= length)
+                {
+                    break;
+                }
+            }
+
+            _contentBuffers = content;
             _offset = offset;
             _length = length;
         }
@@ -265,9 +282,7 @@ public class ProtocolOutputConverterImpl
 
         public void writePayload(DataOutput buffer) throws IOException
         {
-            Collection<QpidByteBuffer> bufs = _message.getContent(_offset, 
_length);
-
-            for(QpidByteBuffer buf : bufs)
+            for(QpidByteBuffer buf : _contentBuffers)
             {
                 if (buf.hasArray())
                 {
@@ -282,16 +297,15 @@ public class ProtocolOutputConverterImpl
 
                     buffer.write(data);
                 }
+                buf.dispose();
             }
         }
 
         @Override
         public long writePayload(final ByteBufferSender sender) throws 
IOException
         {
-
-            Collection<QpidByteBuffer> bufs = _message.getContent(_offset, 
_length);
             long size = 0l;
-            for(QpidByteBuffer buf : bufs)
+            for(QpidByteBuffer buf : _contentBuffers)
             {
                 size += buf.remaining();
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
 Wed Oct 14 08:53:42 2015
@@ -62,7 +62,7 @@ public class MessageConverter_from_1_0
     public static Object convertBodyToObject(final Message_1_0 serverMessage)
     {
         byte[] data = new byte[(int) serverMessage.getSize()];
-        serverMessage.getStoredMessage().getContent(0, ByteBuffer.wrap(data));
+        serverMessage.getStoredMessage().getContent(ByteBuffer.wrap(data));
 
         SectionDecoderImpl sectionDecoder = new 
SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY);
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
 Wed Oct 14 08:53:42 2015
@@ -208,7 +208,7 @@ public abstract class MessageConverter_t
     {
         final String mimeType = serverMessage.getMessageHeader().getMimeType();
         byte[] data = new byte[(int) serverMessage.getSize()];
-        serverMessage.getContent(ByteBuffer.wrap(data), 0);
+        serverMessage.getContent(ByteBuffer.wrap(data));
         byte[] uncompressed;
 
         
if(Symbol.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING).equals(metaData.getPropertiesSection().getContentEncoding())
@@ -238,13 +238,12 @@ public abstract class MessageConverter_t
                         }
 
                         @Override
-                        public int getContent(int offsetInMessage, ByteBuffer 
dst)
+                        public int getContent(ByteBuffer dst)
                         {
                             QpidByteBuffer buf = allData.duplicate();
-                            buf.position(offsetInMessage);
-                            buf = buf.slice();
+                            buf.position(0);
                             int size;
-                            if(dst.remaining()<buf.remaining())
+                            if (dst.remaining() < buf.remaining())
                             {
                                 buf.limit(dst.remaining());
                                 size = dst.remaining();
@@ -253,16 +252,15 @@ public abstract class MessageConverter_t
                             {
                                 size = buf.remaining();
                             }
-                            buf.copyTo(dst);
+                            buf.get(dst);
                             buf.dispose();
                             return size;
                         }
 
                         @Override
-                        public Collection<QpidByteBuffer> getContent(int 
offsetInMessage, int size)
+                        public Collection<QpidByteBuffer> getContent()
                         {
-                            QpidByteBuffer buf = allData.view(offsetInMessage, 
Math.min(size,allData.remaining()-offsetInMessage));
-                            return Collections.singleton(buf);
+                            return Collections.singleton(allData.duplicate());
                         }
 
                         @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
 Wed Oct 14 08:53:42 2015
@@ -71,8 +71,7 @@ public class Message_1_0 extends Abstrac
 
     private static Collection<QpidByteBuffer> 
restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
     {
-        return storedMessage.getContent(0, Integer.MAX_VALUE);
-
+        return storedMessage.getContent();
     }
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,

Modified: 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
 Wed Oct 14 08:53:42 2015
@@ -90,22 +90,21 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public int getContent(int offsetInMessage, ByteBuffer dst)
+            public int getContent(ByteBuffer dst)
             {
-                int size = messageContent.length - offsetInMessage;
+                int size = messageContent.length;
                 if(dst.remaining() < size)
                 {
                     size = dst.remaining();
                 }
-                ByteBuffer buf = ByteBuffer.wrap(messageContent, 
offsetInMessage, size);
-                dst.put(buf);
+                dst.put(messageContent, 0, size);
                 return size;
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(int offsetInMessage, 
int size)
+            public Collection<QpidByteBuffer> getContent()
             {
-                return 
Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, 
size));
+                return 
Collections.singleton(QpidByteBuffer.wrap(messageContent));
             }
 
             @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
 Wed Oct 14 08:53:42 2015
@@ -194,15 +194,15 @@ public class MessageConverter_0_10_to_0_
             }
 
             @Override
-            public int getContent(int offsetInMessage, ByteBuffer dst)
+            public int getContent(ByteBuffer dst)
             {
-                return message.getContent(dst, offsetInMessage);
+                return message.getContent(dst);
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(int offsetInMessage, 
int size)
+            public Collection<QpidByteBuffer> getContent()
             {
-                return message.getContent(offsetInMessage, size);
+                return message.getContent();
             }
 
             @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
 Wed Oct 14 08:53:42 2015
@@ -83,15 +83,15 @@ public class MessageConverter_0_8_to_0_1
             }
 
             @Override
-            public int getContent(int offsetInMessage, ByteBuffer dst)
+            public int getContent(ByteBuffer dst)
             {
-                return message_0_8.getContent(dst, offsetInMessage);
+                return message_0_8.getContent(dst);
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(int offsetInMessage, 
int size)
+            public Collection<QpidByteBuffer> getContent()
             {
-                return message_0_8.getContent(offsetInMessage, size);
+                return message_0_8.getContent();
             }
 
             @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
 Wed Oct 14 08:53:42 2015
@@ -92,22 +92,21 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public int getContent(int offsetInMessage, ByteBuffer dst)
+            public int getContent(ByteBuffer dst)
             {
-                int size = messageContent.length - offsetInMessage;
+                int size = messageContent.length;
                 if(dst.remaining() < size)
                 {
                     size = dst.remaining();
                 }
-                ByteBuffer buf = ByteBuffer.wrap(messageContent, 
offsetInMessage, size);
-                dst.put(buf);
+                dst.put(messageContent, 0, size);
                 return size;
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(int offsetInMessage, 
int size)
+            public Collection<QpidByteBuffer> getContent()
             {
-                return 
Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, 
size));
+                return 
Collections.singleton(QpidByteBuffer.wrap(messageContent));
             }
 
             @Override

Modified: 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
 Wed Oct 14 08:53:42 2015
@@ -218,7 +218,7 @@ public class ReportRunner<T>
     private static ReportableMessage convertMessage(QueueEntry entry)
     {
         final MessageInfoImpl messageInfo = new MessageInfoImpl(entry, true);
-        final Collection<QpidByteBuffer> contentBuffers = 
entry.getMessage().getContent(0, (int) entry.getSize());
+        final Collection<QpidByteBuffer> contentBuffers = 
entry.getMessage().getContent();
         final ByteBuffer content = ByteBufferUtils.combine(contentBuffers);
         for(QpidByteBuffer buf : contentBuffers)
         {

Modified: 
qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
 Wed Oct 14 08:53:42 2015
@@ -403,7 +403,7 @@ public class QueueMBean extends AMQManag
         byte[] msgContent = new byte[bodySize];
 
         ByteBuffer buf = ByteBuffer.wrap(msgContent);
-        int stored = serverMsg.getContent(buf, 0);
+        int stored = serverMsg.getContent(buf);
 
         if(bodySize != stored)
         {

Modified: 
qpid/java/trunk/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
 Wed Oct 14 08:53:42 2015
@@ -445,18 +445,15 @@ public class QueueMBeanTest extends Qpid
                 Object[] args = invocation.getArguments();
 
                 //verify the arg types / expected values
-                assertEquals(2, args.length);
+                assertEquals(1, args.length);
                 assertTrue(args[0] instanceof ByteBuffer);
-                assertTrue(args[1] instanceof Integer);
 
                 ByteBuffer dest = (ByteBuffer) args[0];
-                int offset = (Integer) args[1];
-                assertEquals(0, offset);
 
                 dest.put(content);
                 return messageContentSize;
             }
-        }).when(serverMessage).getContent(Matchers.any(ByteBuffer.class), 
Matchers.anyInt());
+        }).when(serverMessage).getContent(Matchers.any(ByteBuffer.class));
 
         final QueueEntry entry = mock(QueueEntry.class);
         when(entry.getMessage()).thenReturn(serverMessage);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to