Author: rgodfrey
Date: Wed Jul 29 10:23:26 2015
New Revision: 1693236

URL: http://svn.apache.org/r1693236
Log:
QPID-6662 : store should allow for holding the original byte buffers for 
message content rather than copying into a single large buffer

Added:
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java  
 (with props)
Modified:
    
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
    
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java

Modified: 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 Wed Jul 29 10:23:26 2015
@@ -27,12 +27,16 @@ import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
 import com.sleepycat.bind.tuple.ByteBinding;
 import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleOutput;
 import com.sleepycat.je.Cursor;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseEntry;
@@ -69,6 +73,7 @@ import org.apache.qpid.server.store.berk
 import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
 import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.util.ByteBufferUtils;
 
 
 public abstract class AbstractBDBMessageStore implements MessageStore
@@ -508,13 +513,25 @@ public abstract class AbstractBDBMessage
      * @throws org.apache.qpid.server.store.StoreException If the operation 
fails for any reason, or if the specified message does not exist.
      */
     private void addContent(final Transaction tx, long messageId, int offset,
-                            ByteBuffer contentBody) throws StoreException
+                            Collection<ByteBuffer> contentBody) throws 
StoreException
     {
         DatabaseEntry key = new DatabaseEntry();
         LongBinding.longToEntry(messageId, key);
         DatabaseEntry value = new DatabaseEntry();
-        ByteBufferBinding messageBinding = ByteBufferBinding.getInstance();
-        messageBinding.objectToEntry(contentBody, value);
+
+        int size = 0;
+
+        for(ByteBuffer buf : contentBody)
+        {
+            size += buf.remaining();
+        }
+        byte[] data = new byte[size];
+        ByteBuffer dst = ByteBuffer.wrap(data);
+        for(ByteBuffer buf : contentBody)
+        {
+            dst.put(buf.duplicate());
+        }
+        value.setData(data);
         try
         {
             OperationStatus status = getMessageContentDb().put(tx, key, value);
@@ -886,15 +903,15 @@ public abstract class AbstractBDBMessage
     static interface MessageDataRef<T extends StorableMessageMetaData>
     {
         T getMetaData();
-        ByteBuffer getData();
-        void setData(ByteBuffer data);
+        Collection<ByteBuffer> getData();
+        void setData(Collection<ByteBuffer> data);
         boolean isHardRef();
     }
 
     private static final class MessageDataHardRef<T extends 
StorableMessageMetaData> implements MessageDataRef<T>
     {
         private final T _metaData;
-        private volatile ByteBuffer _data;
+        private volatile Collection<ByteBuffer> _data;
 
         private MessageDataHardRef(final T metaData)
         {
@@ -908,13 +925,13 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public ByteBuffer getData()
+        public Collection<ByteBuffer> getData()
         {
             return _data;
         }
 
         @Override
-        public void setData(final ByteBuffer data)
+        public void setData(final Collection<ByteBuffer> data)
         {
             _data = data;
         }
@@ -929,9 +946,9 @@ public abstract class AbstractBDBMessage
     private static final class MessageData<T extends StorableMessageMetaData>
     {
         private T _metaData;
-        private SoftReference<ByteBuffer> _data;
+        private SoftReference<Collection<ByteBuffer>> _data;
 
-        private MessageData(final T metaData, final ByteBuffer data)
+        private MessageData(final T metaData, final Collection<ByteBuffer> 
data)
         {
             _metaData = metaData;
 
@@ -946,12 +963,12 @@ public abstract class AbstractBDBMessage
             return _metaData;
         }
 
-        public ByteBuffer getData()
+        public Collection<ByteBuffer> getData()
         {
             return _data == null ? null : _data.get();
         }
 
-        public void setData(final ByteBuffer data)
+        public void setData(final Collection<ByteBuffer> data)
         {
             _data = new SoftReference<>(data);
         }
@@ -961,7 +978,7 @@ public abstract class AbstractBDBMessage
     private static final class MessageDataSoftRef<T extends 
StorableMessageMetaData> extends SoftReference<MessageData<T>> implements 
MessageDataRef<T>
     {
 
-        public MessageDataSoftRef(final T metadata, ByteBuffer data)
+        public MessageDataSoftRef(final T metadata, Collection<ByteBuffer> 
data)
         {
             super(new MessageData<T>(metadata, data));
         }
@@ -974,7 +991,7 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public ByteBuffer getData()
+        public Collection<ByteBuffer> getData()
         {
             MessageData<T> ref = get();
 
@@ -982,7 +999,7 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public void setData(final ByteBuffer data)
+        public void setData(final Collection<ByteBuffer> data)
         {
             MessageData<T> ref = get();
             if(ref != null)
@@ -1048,19 +1065,17 @@ public abstract class AbstractBDBMessage
         public void addContent(ByteBuffer src)
         {
             src = src.slice();
-            ByteBuffer data = _messageDataRef.getData();
+            Collection<ByteBuffer> data = _messageDataRef.getData();
             if(data == null)
             {
-                _messageDataRef.setData(src);
+                _messageDataRef.setData(Collections.singleton(src));
             }
             else
             {
-                int size = data.remaining() + src.remaining();
-                ByteBuffer buf = data.isDirect() ? 
ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
-                buf.put(data.duplicate());
-                buf.put(src.duplicate());
-                buf.flip();
-                _messageDataRef.setData(buf);
+                List<ByteBuffer> newCollection = new 
ArrayList<>(data.size()+1);
+                newCollection.addAll(data);
+                newCollection.add(src);
+                
_messageDataRef.setData(Collections.unmodifiableCollection(newCollection));
             }
 
         }
@@ -1074,7 +1089,7 @@ public abstract class AbstractBDBMessage
         @Override
         public int getContent(int offsetInMessage, ByteBuffer dst)
         {
-            ByteBuffer data = getContentAsByteBuffer();
+            ByteBuffer data = 
ByteBufferUtils.combine(getContentAsByteBuffer());
             data = data.slice();
             int length = Math.min(dst.remaining(), data.remaining());
             data.limit(length);
@@ -1082,15 +1097,15 @@ public abstract class AbstractBDBMessage
             return length;
         }
 
-        private ByteBuffer getContentAsByteBuffer()
+        private Collection<ByteBuffer> getContentAsByteBuffer()
         {
-            ByteBuffer data = _messageDataRef.getData();
+            Collection<ByteBuffer> data = _messageDataRef.getData();
             if(data == null)
             {
                 if(stored())
                 {
                     checkMessageStoreOpen();
-                    data = 
AbstractBDBMessageStore.this.getAllContent(_messageId);
+                    data = 
Collections.singleton(AbstractBDBMessageStore.this.getAllContent(_messageId));
                     T metaData = _messageDataRef.getMetaData();
                     if (metaData == null)
                     {
@@ -1104,20 +1119,57 @@ public abstract class AbstractBDBMessage
                 }
                 else
                 {
-                    data = ByteBuffer.wrap(new byte[0]);
+                    data = Collections.emptyList();
                 }
             } return data;
         }
 
         @Override
-        public ByteBuffer getContent(int offsetInMessage, int size)
+        public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
         {
-            ByteBuffer data = getContentAsByteBuffer();
-            data = data.duplicate();
-            data.position(offsetInMessage);
-            data = data.slice();
-            data.limit(size);
-            return data;
+            int pos = 0;
+            int added = 0;
+
+            Collection<ByteBuffer> bufs = getContentAsByteBuffer();
+            List<ByteBuffer> content = new ArrayList<>(bufs.size());
+            for(ByteBuffer buf : bufs)
+            {
+                if(pos < offsetInMessage)
+                {
+                    final int remaining = buf.remaining();
+                    if(pos+ remaining >=offsetInMessage)
+                    {
+                        buf = buf.slice();
+                        buf.position(offsetInMessage-pos);
+                        buf = buf.slice();
+                        if(buf.remaining()>size)
+                        {
+                            buf.limit(size);
+                        }
+
+                        content.add(buf);
+                        added += buf.remaining();
+                    }
+                    pos+= remaining;
+
+                }
+                else
+                {
+                    buf = buf.slice();
+                    if(buf.remaining() > (size-added))
+                    {
+                        buf.limit(size-added);
+                    }
+                    content.add(buf.slice());
+                    added += buf.remaining();
+                }
+                if(added >= size)
+                {
+                    break;
+                }
+            }
+
+            return content;
         }
 
         synchronized Runnable store(Transaction txn)
@@ -1128,7 +1180,7 @@ public abstract class AbstractBDBMessage
                 AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, 
_messageDataRef.getMetaData());
                 AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
                                                         
_messageDataRef.getData() == null
-                                                                ? 
EMPTY_BYTE_BUFFER
+                                                                ? 
Collections.<ByteBuffer>emptySet()
                                                                 : 
_messageDataRef.getData());
 
 

Modified: 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
 Wed Jul 29 10:23:26 2015
@@ -266,7 +266,6 @@ public class UpgradeFrom5To6 extends Abs
         NewDataBinding dataBinding = new NewDataBinding();
         DatabaseEntry value = new DatabaseEntry();
         dataBinding.objectToEntry(consolidatedData, value);
-
         put(newDatabase, txn, key, value);
     }
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
 Wed Jul 29 10:23:26 2015
@@ -166,7 +166,7 @@ public abstract class AbstractServerMess
     }
 
     @Override
-    final public ByteBuffer getContent(int offset, int size)
+    final public Collection<ByteBuffer> getContent(int offset, int size)
     {
         return getStoredMessage().getContent(offset, size);
     }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
 Wed Jul 29 10:23:26 2015
@@ -22,11 +22,12 @@
 package org.apache.qpid.server.message;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 
 public interface MessageContentSource
 {
     int getContent(ByteBuffer buf, int offset);
-    ByteBuffer getContent(int offset, int size);
+    Collection<ByteBuffer> getContent(int offset, int size);
 
     long getSize();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
 Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.message;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
@@ -54,7 +55,7 @@ public interface ServerMessage<T extends
 
     public int getContent(ByteBuffer buf, int offset);
 
-    ByteBuffer getContent(int offset, int size);
+    Collection<ByteBuffer> getContent(int offset, int size);
 
     Object getConnectionReference();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
 Wed Jul 29 10:23:26 2015
@@ -27,6 +27,8 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +40,7 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.util.ByteBufferInputStream;
+import org.apache.qpid.util.ByteBufferUtils;
 
 public class InternalMessage extends 
AbstractServerMessageImpl<InternalMessage, InternalMessageMetaData>
 {
@@ -61,9 +64,9 @@ public class InternalMessage extends Abs
     {
         super(msg, null);
         _contentSize = msg.getMetaData().getContentSize();
-        ByteBuffer buf = msg.getContent(0, _contentSize);
+        Collection<ByteBuffer> bufs = msg.getContent(0, _contentSize);
 
-        try(ObjectInputStream is = new ObjectInputStream(new 
ByteBufferInputStream(buf)))
+        try(ObjectInputStream is = new ObjectInputStream(new 
ByteBufferInputStream(ByteBufferUtils.combine(bufs))))
         {
             _messageBody = is.readObject();
 
@@ -235,9 +238,9 @@ public class InternalMessage extends Abs
                     }
 
                     @Override
-                    public ByteBuffer getContent(final int offsetInMessage, 
final int size)
+                    public Collection<ByteBuffer> getContent(final int 
offsetInMessage, final int size)
                     {
-                        return ByteBuffer.wrap(bytes, offsetInMessage, size);
+                        return Collections.singleton(ByteBuffer.wrap(bytes, 
offsetInMessage, size));
                     }
 
                     @Override

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
 Wed Jul 29 10:23:26 2015
@@ -31,6 +31,8 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -1670,14 +1672,14 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public ByteBuffer getContent(int offsetInMessage, int size)
+        public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
         {
             ByteBuffer data = getContentAsByteBuffer();
             data = data.duplicate();
             data.position(offsetInMessage);
             data = data.slice();
             data.limit(size);
-            return data;
+            return Collections.singleton(data);
         }
 
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
 Wed Jul 29 10:23:26 2015
@@ -22,6 +22,8 @@
 package org.apache.qpid.server.store;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
 
 public class StoredMemoryMessage<T extends StorableMessageMetaData> implements 
StoredMessage<T>, MessageHandle<T>
 {
@@ -101,7 +103,7 @@ public class StoredMemoryMessage<T exten
     }
 
 
-    public ByteBuffer getContent(int offsetInMessage, int size)
+    public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
     {
         if(_content == null)
         {
@@ -116,7 +118,7 @@ public class StoredMemoryMessage<T exten
         }
 
         buf.limit(size);
-        return buf;
+        return Collections.singleton(buf);
     }
 
     public T getMetaData()

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
 Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.store;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 
 public interface StoredMessage<M extends StorableMessageMetaData>
 {
@@ -30,7 +31,7 @@ public interface StoredMessage<M extends
 
     int getContent(int offsetInMessage, ByteBuffer dst);
 
-    ByteBuffer getContent(int offsetInMessage, int size);
+    Collection<ByteBuffer> getContent(int offsetInMessage, int size);
 
     void remove();
 

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
 Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.store;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
@@ -104,7 +105,7 @@ public class TestMessageMetaDataType imp
         }
 
         @Override
-        public ByteBuffer getContent(int offset, int size)
+        public Collection<ByteBuffer> getContent(int offset, int size)
         {
             return null;
         }

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
 Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.txn;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 
 
 import org.apache.qpid.server.message.AMQMessageHeader;
@@ -106,7 +107,7 @@ class MockServerMessage implements Serve
     }
 
 
-    public ByteBuffer getContent(int offset, int size)
+    public Collection<ByteBuffer> getContent(int offset, int size)
     {
         throw new UnsupportedOperationException();
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 Wed Jul 29 10:23:26 2015
@@ -55,6 +55,7 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageTransfer;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.Option;
+import org.apache.qpid.util.ByteBufferUtils;
 import org.apache.qpid.util.GZIPUtils;
 
 public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements 
FlowCreditManager.FlowCreditManagerListener
@@ -265,7 +266,7 @@ public class ConsumerTarget_0_10 extends
         boolean msgCompressed = messageProps != null && 
GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding());
 
 
-        ByteBuffer body = msg.getBody();
+        ByteBuffer body = ByteBufferUtils.combine(msg.getBody());
 
         boolean compressionSupported = 
_session.getConnection().getConnectionDelegate().isCompressionSupported();
 
@@ -295,7 +296,6 @@ public class ConsumerTarget_0_10 extends
                 body = ByteBuffer.wrap(compressed);
             }
         }
-        long size = body == null ? 0 : body.remaining();
 
         Header header = new Header(deliveryProps, messageProps, 
msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
 Wed Jul 29 10:23:26 2015
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
 
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
@@ -89,9 +91,9 @@ public class MessageConverter_Internal_t
                     }
 
                     @Override
-                    public ByteBuffer getContent(int offsetInMessage, int size)
+                    public Collection<ByteBuffer> getContent(int 
offsetInMessage, int size)
                     {
-                        return ByteBuffer.wrap(messageContent, 
offsetInMessage, size);
+                        return 
Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size));
                     }
 
                     @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
 Wed Jul 29 10:23:26 2015
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -64,7 +66,7 @@ public class MessageConverter_v0_10 impl
         return new MessageTransferMessage(convertToStoredMessage(serverMsg), 
null);
     }
 
-    private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final 
ServerMessage serverMsg)
+    private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final 
ServerMessage<?> serverMsg)
     {
         final MessageMetaData_0_10 messageMetaData_0_10 = 
convertMetaData(serverMsg);
 
@@ -89,7 +91,7 @@ public class MessageConverter_v0_10 impl
                     }
 
                     @Override
-                    public ByteBuffer getContent(int offsetInMessage, int size)
+                    public Collection<ByteBuffer> getContent(int 
offsetInMessage, int size)
                     {
                         return serverMsg.getContent(offsetInMessage, size);
                     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
 Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
@@ -76,7 +77,7 @@ public class MessageTransferMessage exte
         return getMetaData().getHeader();
     }
 
-    public ByteBuffer getBody()
+    public Collection<ByteBuffer> getBody()
     {
         return  getContent(0, (int)getSize());
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
 Wed Jul 29 10:23:26 2015
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -101,9 +103,9 @@ public class MessageConverter_Internal_t
             }
 
             @Override
-            public ByteBuffer getContent(int offsetInMessage, int size)
+            public Collection<ByteBuffer> getContent(int offsetInMessage, int 
size)
             {
-                return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+                return Collections.singleton(ByteBuffer.wrap(messageContent, 
offsetInMessage, size));
             }
 
             @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
 Wed Jul 29 10:23:26 2015
@@ -23,6 +23,11 @@ package org.apache.qpid.server.protocol.
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.QpidException;
 import org.apache.qpid.framing.AMQBody;
@@ -43,6 +48,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferUtils;
 import org.apache.qpid.util.GZIPUtils;
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -51,6 +57,8 @@ public class ProtocolOutputConverterImpl
     private final AMQPConnection_0_8 _connection;
     private static final AMQShortString GZIP_ENCODING = 
AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING);
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolOutputConverterImpl.class);
+
     public ProtocolOutputConverterImpl(AMQPConnection_0_8 connection)
     {
         _connection = connection;
@@ -102,7 +110,8 @@ public class ProtocolOutputConverterImpl
         boolean compressionSupported = _connection.isCompressionSupported();
 
         if(msgCompressed && !compressionSupported &&
-                (modifiedContent = 
GZIPUtils.uncompressBufferToArray(message.getContent(0,bodySize))) != null)
+                (modifiedContent = GZIPUtils.uncompressBufferToArray(
+                        ByteBufferUtils.combine(message.getContent(0, 
bodySize)))) != null)
         {
             BasicContentHeaderProperties modifiedProps =
                     new 
BasicContentHeaderProperties(contentHeaderBody.getProperties());
@@ -116,7 +125,7 @@ public class ProtocolOutputConverterImpl
                 && compressionSupported
                 && contentHeaderBody.getProperties().getEncoding()==null
                 && bodySize > _connection.getMessageCompressionThreshold()
-                && (modifiedContent = 
GZIPUtils.compressBufferToArray(message.getContent(0, bodySize))) != null)
+                && (modifiedContent = 
GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(message.getContent(0, 
bodySize)))) != null)
         {
             BasicContentHeaderProperties modifiedProps =
                     new 
BasicContentHeaderProperties(contentHeaderBody.getProperties());
@@ -154,9 +163,9 @@ public class ProtocolOutputConverterImpl
             }
 
             @Override
-            public ByteBuffer getContent(final int offset, final int size)
+            public Collection<ByteBuffer> getContent(final int offset, final 
int size)
             {
-                return ByteBuffer.wrap(content, offset, size);
+                return Collections.singleton(ByteBuffer.wrap(content, offset, 
size));
             }
 
             @Override
@@ -238,29 +247,37 @@ public class ProtocolOutputConverterImpl
 
         public void writePayload(DataOutput buffer) throws IOException
         {
-            ByteBuffer buf = _message.getContent(_offset, _length);
+            Collection<ByteBuffer> bufs = _message.getContent(_offset, 
_length);
 
-            if(buf.hasArray())
-            {
-                buffer.write(buf.array(), buf.arrayOffset()+buf.position(), 
buf.remaining());
-            }
-            else
+            for(ByteBuffer buf : bufs)
             {
+                if (buf.hasArray())
+                {
+                    buffer.write(buf.array(), buf.arrayOffset() + 
buf.position(), buf.remaining());
+                }
+                else
+                {
 
-                byte[] data = new byte[_length];
+                    byte[] data = new byte[_length];
 
-                buf.get(data);
+                    buf.get(data);
 
-                buffer.write(data);
+                    buffer.write(data);
+                }
             }
         }
 
         @Override
         public long writePayload(final ByteBufferSender sender) throws 
IOException
         {
-            ByteBuffer buf = _message.getContent(_offset, _length);
-            long size = buf.remaining();
-            sender.send(buf.duplicate());
+
+            Collection<ByteBuffer> bufs = _message.getContent(_offset, 
_length);
+            long size = 0l;
+            for(ByteBuffer buf : bufs)
+            {
+                size += buf.remaining();
+                sender.send(buf.duplicate());
+            }
             return size;
         }
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
 Wed Jul 29 10:23:26 2015
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
 
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -101,12 +103,12 @@ public class MockStoredMessage implement
 
 
 
-    public ByteBuffer getContent(int offsetInMessage, int size)
+    public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
     {
         ByteBuffer buf = ByteBuffer.allocate(size);
         getContent(offsetInMessage, buf);
         buf.position(0);
-        return  buf;
+        return Collections.singleton(buf);
     }
 
     public void remove()

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
 Wed Jul 29 10:23:26 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.protocol.
 import java.io.EOFException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.ListIterator;
@@ -255,7 +257,7 @@ public abstract class MessageConverter_t
                         }
 
                         @Override
-                        public ByteBuffer getContent(int offsetInMessage, int 
size)
+                        public Collection<ByteBuffer> getContent(int 
offsetInMessage, int size)
                         {
                             ByteBuffer buf = allData.duplicate();
                             buf.position(offsetInMessage);
@@ -264,7 +266,7 @@ public abstract class MessageConverter_t
                             {
                                 buf.limit(size);
                             }
-                            return buf;
+                            return Collections.singleton(buf);
                         }
 
                         @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
 Wed Jul 29 10:23:26 2015
@@ -28,6 +28,7 @@ import java.util.List;
 
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.util.ByteBufferUtils;
 
 public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, 
MessageMetaData_1_0>
 {
@@ -68,7 +69,7 @@ public class Message_1_0 extends Abstrac
         do
         {
 
-            b = storedMessage.getContent(offset,FRAGMENT_SIZE);
+            b = 
ByteBufferUtils.combine(storedMessage.getContent(offset,FRAGMENT_SIZE));
             if(b.hasRemaining())
             {
                 fragments.add(b);

Modified: 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
 Wed Jul 29 10:23:26 2015
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.protocol.converter.v0_10_v1_0;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -100,9 +102,9 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public ByteBuffer getContent(int offsetInMessage, int size)
+            public Collection<ByteBuffer> getContent(int offsetInMessage, int 
size)
             {
-                return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+                return Collections.singleton(ByteBuffer.wrap(messageContent, 
offsetInMessage, size));
             }
 
             @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
 Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -198,7 +199,7 @@ public class MessageConverter_0_10_to_0_
             }
 
             @Override
-            public ByteBuffer getContent(int offsetInMessage, int size)
+            public Collection<ByteBuffer> getContent(int offsetInMessage, int 
size)
             {
                 return message.getContent(offsetInMessage, size);
             }

Modified: 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
 Wed Jul 29 10:23:26 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
 
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 
@@ -87,7 +88,7 @@ public class MessageConverter_0_8_to_0_1
             }
 
             @Override
-            public ByteBuffer getContent(int offsetInMessage, int size)
+            public Collection<ByteBuffer> getContent(int offsetInMessage, int 
size)
             {
                 return message_0_8.getContent(offsetInMessage, size);
             }

Modified: 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
 Wed Jul 29 10:23:26 2015
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -102,9 +104,9 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public ByteBuffer getContent(int offsetInMessage, int size)
+            public Collection<ByteBuffer> getContent(int offsetInMessage, int 
size)
             {
-                return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+                return Collections.singleton(ByteBuffer.wrap(messageContent, 
offsetInMessage, size));
             }
 
             @Override

Modified: 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
 Wed Jul 29 10:23:26 2015
@@ -44,6 +44,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.util.ByteBufferUtils;
 
 public class ReportRunner<T>
 {
@@ -211,7 +212,7 @@ public class ReportRunner<T>
     }
 
 
-    private static ReportableMessage convertMessage(final ServerMessage 
message)
+    private static ReportableMessage convertMessage(final ServerMessage<?> 
message)
     {
         return new ReportableMessage()
         {
@@ -230,7 +231,7 @@ public class ReportRunner<T>
             @Override
             public ByteBuffer getContent()
             {
-                ByteBuffer content = message.getContent(0, (int) getSize());
+                ByteBuffer content = 
ByteBufferUtils.combine(message.getContent(0, (int) getSize()));
 
                 return content.asReadOnlyBuffer();
             }

Added: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java?rev=1693236&view=auto
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java 
(added)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java 
Wed Jul 29 10:23:26 2015
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+public class ByteBufferUtils
+{
+    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+    public static ByteBuffer combine(Collection<ByteBuffer> bufs)
+    {
+        if(bufs == null || bufs.isEmpty())
+        {
+            return EMPTY_BYTE_BUFFER;
+        }
+        else if(bufs.size() == 1)
+        {
+            return bufs.iterator().next();
+        }
+        else
+        {
+            int size = 0;
+            boolean isDirect = false;
+            for(ByteBuffer buf : bufs)
+            {
+                size += buf.remaining();
+                isDirect = isDirect || buf.isDirect();
+            }
+            ByteBuffer combined = isDirect ? ByteBuffer.allocateDirect(size) : 
ByteBuffer.allocate(size);
+
+            for(ByteBuffer buf : bufs)
+            {
+                combined.put(buf.duplicate());
+            }
+            combined.flip();
+            return combined;
+        }
+    }
+}

Propchange: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to