Author: rgodfrey
Date: Wed Jul 29 10:23:26 2015
New Revision: 1693236
URL: http://svn.apache.org/r1693236
Log:
QPID-6662 : store should allow for holding the original byte buffers for
message content rather than copying into a single large buffer
Added:
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
(with props)
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
Wed Jul 29 10:23:26 2015
@@ -27,12 +27,16 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
@@ -69,6 +73,7 @@ import org.apache.qpid.server.store.berk
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.util.ByteBufferUtils;
public abstract class AbstractBDBMessageStore implements MessageStore
@@ -508,13 +513,25 @@ public abstract class AbstractBDBMessage
* @throws org.apache.qpid.server.store.StoreException If the operation
fails for any reason, or if the specified message does not exist.
*/
private void addContent(final Transaction tx, long messageId, int offset,
- ByteBuffer contentBody) throws StoreException
+ Collection<ByteBuffer> contentBody) throws
StoreException
{
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- ByteBufferBinding messageBinding = ByteBufferBinding.getInstance();
- messageBinding.objectToEntry(contentBody, value);
+
+ int size = 0;
+
+ for(ByteBuffer buf : contentBody)
+ {
+ size += buf.remaining();
+ }
+ byte[] data = new byte[size];
+ ByteBuffer dst = ByteBuffer.wrap(data);
+ for(ByteBuffer buf : contentBody)
+ {
+ dst.put(buf.duplicate());
+ }
+ value.setData(data);
try
{
OperationStatus status = getMessageContentDb().put(tx, key, value);
@@ -886,15 +903,15 @@ public abstract class AbstractBDBMessage
static interface MessageDataRef<T extends StorableMessageMetaData>
{
T getMetaData();
- ByteBuffer getData();
- void setData(ByteBuffer data);
+ Collection<ByteBuffer> getData();
+ void setData(Collection<ByteBuffer> data);
boolean isHardRef();
}
private static final class MessageDataHardRef<T extends
StorableMessageMetaData> implements MessageDataRef<T>
{
private final T _metaData;
- private volatile ByteBuffer _data;
+ private volatile Collection<ByteBuffer> _data;
private MessageDataHardRef(final T metaData)
{
@@ -908,13 +925,13 @@ public abstract class AbstractBDBMessage
}
@Override
- public ByteBuffer getData()
+ public Collection<ByteBuffer> getData()
{
return _data;
}
@Override
- public void setData(final ByteBuffer data)
+ public void setData(final Collection<ByteBuffer> data)
{
_data = data;
}
@@ -929,9 +946,9 @@ public abstract class AbstractBDBMessage
private static final class MessageData<T extends StorableMessageMetaData>
{
private T _metaData;
- private SoftReference<ByteBuffer> _data;
+ private SoftReference<Collection<ByteBuffer>> _data;
- private MessageData(final T metaData, final ByteBuffer data)
+ private MessageData(final T metaData, final Collection<ByteBuffer>
data)
{
_metaData = metaData;
@@ -946,12 +963,12 @@ public abstract class AbstractBDBMessage
return _metaData;
}
- public ByteBuffer getData()
+ public Collection<ByteBuffer> getData()
{
return _data == null ? null : _data.get();
}
- public void setData(final ByteBuffer data)
+ public void setData(final Collection<ByteBuffer> data)
{
_data = new SoftReference<>(data);
}
@@ -961,7 +978,7 @@ public abstract class AbstractBDBMessage
private static final class MessageDataSoftRef<T extends
StorableMessageMetaData> extends SoftReference<MessageData<T>> implements
MessageDataRef<T>
{
- public MessageDataSoftRef(final T metadata, ByteBuffer data)
+ public MessageDataSoftRef(final T metadata, Collection<ByteBuffer>
data)
{
super(new MessageData<T>(metadata, data));
}
@@ -974,7 +991,7 @@ public abstract class AbstractBDBMessage
}
@Override
- public ByteBuffer getData()
+ public Collection<ByteBuffer> getData()
{
MessageData<T> ref = get();
@@ -982,7 +999,7 @@ public abstract class AbstractBDBMessage
}
@Override
- public void setData(final ByteBuffer data)
+ public void setData(final Collection<ByteBuffer> data)
{
MessageData<T> ref = get();
if(ref != null)
@@ -1048,19 +1065,17 @@ public abstract class AbstractBDBMessage
public void addContent(ByteBuffer src)
{
src = src.slice();
- ByteBuffer data = _messageDataRef.getData();
+ Collection<ByteBuffer> data = _messageDataRef.getData();
if(data == null)
{
- _messageDataRef.setData(src);
+ _messageDataRef.setData(Collections.singleton(src));
}
else
{
- int size = data.remaining() + src.remaining();
- ByteBuffer buf = data.isDirect() ?
ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
- buf.put(data.duplicate());
- buf.put(src.duplicate());
- buf.flip();
- _messageDataRef.setData(buf);
+ List<ByteBuffer> newCollection = new
ArrayList<>(data.size()+1);
+ newCollection.addAll(data);
+ newCollection.add(src);
+
_messageDataRef.setData(Collections.unmodifiableCollection(newCollection));
}
}
@@ -1074,7 +1089,7 @@ public abstract class AbstractBDBMessage
@Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
- ByteBuffer data = getContentAsByteBuffer();
+ ByteBuffer data =
ByteBufferUtils.combine(getContentAsByteBuffer());
data = data.slice();
int length = Math.min(dst.remaining(), data.remaining());
data.limit(length);
@@ -1082,15 +1097,15 @@ public abstract class AbstractBDBMessage
return length;
}
- private ByteBuffer getContentAsByteBuffer()
+ private Collection<ByteBuffer> getContentAsByteBuffer()
{
- ByteBuffer data = _messageDataRef.getData();
+ Collection<ByteBuffer> data = _messageDataRef.getData();
if(data == null)
{
if(stored())
{
checkMessageStoreOpen();
- data =
AbstractBDBMessageStore.this.getAllContent(_messageId);
+ data =
Collections.singleton(AbstractBDBMessageStore.this.getAllContent(_messageId));
T metaData = _messageDataRef.getMetaData();
if (metaData == null)
{
@@ -1104,20 +1119,57 @@ public abstract class AbstractBDBMessage
}
else
{
- data = ByteBuffer.wrap(new byte[0]);
+ data = Collections.emptyList();
}
} return data;
}
@Override
- public ByteBuffer getContent(int offsetInMessage, int size)
+ public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
{
- ByteBuffer data = getContentAsByteBuffer();
- data = data.duplicate();
- data.position(offsetInMessage);
- data = data.slice();
- data.limit(size);
- return data;
+ int pos = 0;
+ int added = 0;
+
+ Collection<ByteBuffer> bufs = getContentAsByteBuffer();
+ List<ByteBuffer> content = new ArrayList<>(bufs.size());
+ for(ByteBuffer buf : bufs)
+ {
+ if(pos < offsetInMessage)
+ {
+ final int remaining = buf.remaining();
+ if(pos+ remaining >=offsetInMessage)
+ {
+ buf = buf.slice();
+ buf.position(offsetInMessage-pos);
+ buf = buf.slice();
+ if(buf.remaining()>size)
+ {
+ buf.limit(size);
+ }
+
+ content.add(buf);
+ added += buf.remaining();
+ }
+ pos+= remaining;
+
+ }
+ else
+ {
+ buf = buf.slice();
+ if(buf.remaining() > (size-added))
+ {
+ buf.limit(size-added);
+ }
+ content.add(buf.slice());
+ added += buf.remaining();
+ }
+ if(added >= size)
+ {
+ break;
+ }
+ }
+
+ return content;
}
synchronized Runnable store(Transaction txn)
@@ -1128,7 +1180,7 @@ public abstract class AbstractBDBMessage
AbstractBDBMessageStore.this.storeMetaData(txn, _messageId,
_messageDataRef.getMetaData());
AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
_messageDataRef.getData() == null
- ?
EMPTY_BYTE_BUFFER
+ ?
Collections.<ByteBuffer>emptySet()
:
_messageDataRef.getData());
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
Wed Jul 29 10:23:26 2015
@@ -266,7 +266,6 @@ public class UpgradeFrom5To6 extends Abs
NewDataBinding dataBinding = new NewDataBinding();
DatabaseEntry value = new DatabaseEntry();
dataBinding.objectToEntry(consolidatedData, value);
-
put(newDatabase, txn, key, value);
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
Wed Jul 29 10:23:26 2015
@@ -166,7 +166,7 @@ public abstract class AbstractServerMess
}
@Override
- final public ByteBuffer getContent(int offset, int size)
+ final public Collection<ByteBuffer> getContent(int offset, int size)
{
return getStoredMessage().getContent(offset, size);
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
Wed Jul 29 10:23:26 2015
@@ -22,11 +22,12 @@
package org.apache.qpid.server.message;
import java.nio.ByteBuffer;
+import java.util.Collection;
public interface MessageContentSource
{
int getContent(ByteBuffer buf, int offset);
- ByteBuffer getContent(int offset, int size);
+ Collection<ByteBuffer> getContent(int offset, int size);
long getSize();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.message;
import java.nio.ByteBuffer;
+import java.util.Collection;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
@@ -54,7 +55,7 @@ public interface ServerMessage<T extends
public int getContent(ByteBuffer buf, int offset);
- ByteBuffer getContent(int offset, int size);
+ Collection<ByteBuffer> getContent(int offset, int size);
Object getConnectionReference();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
Wed Jul 29 10:23:26 2015
@@ -27,6 +27,8 @@ import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -38,6 +40,7 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.util.ByteBufferInputStream;
+import org.apache.qpid.util.ByteBufferUtils;
public class InternalMessage extends
AbstractServerMessageImpl<InternalMessage, InternalMessageMetaData>
{
@@ -61,9 +64,9 @@ public class InternalMessage extends Abs
{
super(msg, null);
_contentSize = msg.getMetaData().getContentSize();
- ByteBuffer buf = msg.getContent(0, _contentSize);
+ Collection<ByteBuffer> bufs = msg.getContent(0, _contentSize);
- try(ObjectInputStream is = new ObjectInputStream(new
ByteBufferInputStream(buf)))
+ try(ObjectInputStream is = new ObjectInputStream(new
ByteBufferInputStream(ByteBufferUtils.combine(bufs))))
{
_messageBody = is.readObject();
@@ -235,9 +238,9 @@ public class InternalMessage extends Abs
}
@Override
- public ByteBuffer getContent(final int offsetInMessage,
final int size)
+ public Collection<ByteBuffer> getContent(final int
offsetInMessage, final int size)
{
- return ByteBuffer.wrap(bytes, offsetInMessage, size);
+ return Collections.singleton(ByteBuffer.wrap(bytes,
offsetInMessage, size));
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
Wed Jul 29 10:23:26 2015
@@ -31,6 +31,8 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -1670,14 +1672,14 @@ public abstract class AbstractJDBCMessag
}
@Override
- public ByteBuffer getContent(int offsetInMessage, int size)
+ public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
{
ByteBuffer data = getContentAsByteBuffer();
data = data.duplicate();
data.position(offsetInMessage);
data = data.slice();
data.limit(size);
- return data;
+ return Collections.singleton(data);
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
Wed Jul 29 10:23:26 2015
@@ -22,6 +22,8 @@
package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
public class StoredMemoryMessage<T extends StorableMessageMetaData> implements
StoredMessage<T>, MessageHandle<T>
{
@@ -101,7 +103,7 @@ public class StoredMemoryMessage<T exten
}
- public ByteBuffer getContent(int offsetInMessage, int size)
+ public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
{
if(_content == null)
{
@@ -116,7 +118,7 @@ public class StoredMemoryMessage<T exten
}
buf.limit(size);
- return buf;
+ return Collections.singleton(buf);
}
public T getMetaData()
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
+import java.util.Collection;
public interface StoredMessage<M extends StorableMessageMetaData>
{
@@ -30,7 +31,7 @@ public interface StoredMessage<M extends
int getContent(int offsetInMessage, ByteBuffer dst);
- ByteBuffer getContent(int offsetInMessage, int size);
+ Collection<ByteBuffer> getContent(int offsetInMessage, int size);
void remove();
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
+import java.util.Collection;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
@@ -104,7 +105,7 @@ public class TestMessageMetaDataType imp
}
@Override
- public ByteBuffer getContent(int offset, int size)
+ public Collection<ByteBuffer> getContent(int offset, int size)
{
return null;
}
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.txn;
import java.nio.ByteBuffer;
+import java.util.Collection;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -106,7 +107,7 @@ class MockServerMessage implements Serve
}
- public ByteBuffer getContent(int offset, int size)
+ public Collection<ByteBuffer> getContent(int offset, int size)
{
throw new UnsupportedOperationException();
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
Wed Jul 29 10:23:26 2015
@@ -55,6 +55,7 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.util.ByteBufferUtils;
import org.apache.qpid.util.GZIPUtils;
public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements
FlowCreditManager.FlowCreditManagerListener
@@ -265,7 +266,7 @@ public class ConsumerTarget_0_10 extends
boolean msgCompressed = messageProps != null &&
GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding());
- ByteBuffer body = msg.getBody();
+ ByteBuffer body = ByteBufferUtils.combine(msg.getBody());
boolean compressionSupported =
_session.getConnection().getConnectionDelegate().isCompressionSupported();
@@ -295,7 +296,6 @@ public class ConsumerTarget_0_10 extends
body = ByteBuffer.wrap(compressed);
}
}
- long size = body == null ? 0 : body.remaining();
Header header = new Header(deliveryProps, messageProps,
msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
Wed Jul 29 10:23:26 2015
@@ -21,6 +21,8 @@
package org.apache.qpid.server.protocol.v0_10;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
@@ -89,9 +91,9 @@ public class MessageConverter_Internal_t
}
@Override
- public ByteBuffer getContent(int offsetInMessage, int size)
+ public Collection<ByteBuffer> getContent(int
offsetInMessage, int size)
{
- return ByteBuffer.wrap(messageContent,
offsetInMessage, size);
+ return
Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size));
}
@Override
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
Wed Jul 29 10:23:26 2015
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -64,7 +66,7 @@ public class MessageConverter_v0_10 impl
return new MessageTransferMessage(convertToStoredMessage(serverMsg),
null);
}
- private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final
ServerMessage serverMsg)
+ private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final
ServerMessage<?> serverMsg)
{
final MessageMetaData_0_10 messageMetaData_0_10 =
convertMetaData(serverMsg);
@@ -89,7 +91,7 @@ public class MessageConverter_v0_10 impl
}
@Override
- public ByteBuffer getContent(int offsetInMessage, int size)
+ public Collection<ByteBuffer> getContent(int
offsetInMessage, int size)
{
return serverMsg.getContent(offsetInMessage, size);
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_10;
import java.nio.ByteBuffer;
+import java.util.Collection;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
@@ -76,7 +77,7 @@ public class MessageTransferMessage exte
return getMetaData().getHeader();
}
- public ByteBuffer getBody()
+ public Collection<ByteBuffer> getBody()
{
return getContent(0, (int)getSize());
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
Wed Jul 29 10:23:26 2015
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -101,9 +103,9 @@ public class MessageConverter_Internal_t
}
@Override
- public ByteBuffer getContent(int offsetInMessage, int size)
+ public Collection<ByteBuffer> getContent(int offsetInMessage, int
size)
{
- return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ return Collections.singleton(ByteBuffer.wrap(messageContent,
offsetInMessage, size));
}
@Override
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
Wed Jul 29 10:23:26 2015
@@ -23,6 +23,11 @@ package org.apache.qpid.server.protocol.
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
import org.apache.qpid.framing.AMQBody;
@@ -43,6 +48,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferUtils;
import org.apache.qpid.util.GZIPUtils;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -51,6 +57,8 @@ public class ProtocolOutputConverterImpl
private final AMQPConnection_0_8 _connection;
private static final AMQShortString GZIP_ENCODING =
AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProtocolOutputConverterImpl.class);
+
public ProtocolOutputConverterImpl(AMQPConnection_0_8 connection)
{
_connection = connection;
@@ -102,7 +110,8 @@ public class ProtocolOutputConverterImpl
boolean compressionSupported = _connection.isCompressionSupported();
if(msgCompressed && !compressionSupported &&
- (modifiedContent =
GZIPUtils.uncompressBufferToArray(message.getContent(0,bodySize))) != null)
+ (modifiedContent = GZIPUtils.uncompressBufferToArray(
+ ByteBufferUtils.combine(message.getContent(0,
bodySize)))) != null)
{
BasicContentHeaderProperties modifiedProps =
new
BasicContentHeaderProperties(contentHeaderBody.getProperties());
@@ -116,7 +125,7 @@ public class ProtocolOutputConverterImpl
&& compressionSupported
&& contentHeaderBody.getProperties().getEncoding()==null
&& bodySize > _connection.getMessageCompressionThreshold()
- && (modifiedContent =
GZIPUtils.compressBufferToArray(message.getContent(0, bodySize))) != null)
+ && (modifiedContent =
GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(message.getContent(0,
bodySize)))) != null)
{
BasicContentHeaderProperties modifiedProps =
new
BasicContentHeaderProperties(contentHeaderBody.getProperties());
@@ -154,9 +163,9 @@ public class ProtocolOutputConverterImpl
}
@Override
- public ByteBuffer getContent(final int offset, final int size)
+ public Collection<ByteBuffer> getContent(final int offset, final
int size)
{
- return ByteBuffer.wrap(content, offset, size);
+ return Collections.singleton(ByteBuffer.wrap(content, offset,
size));
}
@Override
@@ -238,29 +247,37 @@ public class ProtocolOutputConverterImpl
public void writePayload(DataOutput buffer) throws IOException
{
- ByteBuffer buf = _message.getContent(_offset, _length);
+ Collection<ByteBuffer> bufs = _message.getContent(_offset,
_length);
- if(buf.hasArray())
- {
- buffer.write(buf.array(), buf.arrayOffset()+buf.position(),
buf.remaining());
- }
- else
+ for(ByteBuffer buf : bufs)
{
+ if (buf.hasArray())
+ {
+ buffer.write(buf.array(), buf.arrayOffset() +
buf.position(), buf.remaining());
+ }
+ else
+ {
- byte[] data = new byte[_length];
+ byte[] data = new byte[_length];
- buf.get(data);
+ buf.get(data);
- buffer.write(data);
+ buffer.write(data);
+ }
}
}
@Override
public long writePayload(final ByteBufferSender sender) throws
IOException
{
- ByteBuffer buf = _message.getContent(_offset, _length);
- long size = buf.remaining();
- sender.send(buf.duplicate());
+
+ Collection<ByteBuffer> bufs = _message.getContent(_offset,
_length);
+ long size = 0l;
+ for(ByteBuffer buf : bufs)
+ {
+ size += buf.remaining();
+ sender.send(buf.duplicate());
+ }
return size;
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
Wed Jul 29 10:23:26 2015
@@ -21,6 +21,8 @@
package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -101,12 +103,12 @@ public class MockStoredMessage implement
- public ByteBuffer getContent(int offsetInMessage, int size)
+ public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
{
ByteBuffer buf = ByteBuffer.allocate(size);
getContent(offsetInMessage, buf);
buf.position(0);
- return buf;
+ return Collections.singleton(buf);
}
public void remove()
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
Wed Jul 29 10:23:26 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.protocol.
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
@@ -255,7 +257,7 @@ public abstract class MessageConverter_t
}
@Override
- public ByteBuffer getContent(int offsetInMessage, int
size)
+ public Collection<ByteBuffer> getContent(int
offsetInMessage, int size)
{
ByteBuffer buf = allData.duplicate();
buf.position(offsetInMessage);
@@ -264,7 +266,7 @@ public abstract class MessageConverter_t
{
buf.limit(size);
}
- return buf;
+ return Collections.singleton(buf);
}
@Override
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
Wed Jul 29 10:23:26 2015
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.util.ByteBufferUtils;
public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0,
MessageMetaData_1_0>
{
@@ -68,7 +69,7 @@ public class Message_1_0 extends Abstrac
do
{
- b = storedMessage.getContent(offset,FRAGMENT_SIZE);
+ b =
ByteBufferUtils.combine(storedMessage.getContent(offset,FRAGMENT_SIZE));
if(b.hasRemaining())
{
fragments.add(b);
Modified:
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
Wed Jul 29 10:23:26 2015
@@ -21,6 +21,8 @@
package org.apache.qpid.server.protocol.converter.v0_10_v1_0;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -100,9 +102,9 @@ public class MessageConverter_1_0_to_v0_
}
@Override
- public ByteBuffer getContent(int offsetInMessage, int size)
+ public Collection<ByteBuffer> getContent(int offsetInMessage, int
size)
{
- return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ return Collections.singleton(ByteBuffer.wrap(messageContent,
offsetInMessage, size));
}
@Override
Modified:
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
Wed Jul 29 10:23:26 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -198,7 +199,7 @@ public class MessageConverter_0_10_to_0_
}
@Override
- public ByteBuffer getContent(int offsetInMessage, int size)
+ public Collection<ByteBuffer> getContent(int offsetInMessage, int
size)
{
return message.getContent(offsetInMessage, size);
}
Modified:
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
Wed Jul 29 10:23:26 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Map;
import java.util.UUID;
@@ -87,7 +88,7 @@ public class MessageConverter_0_8_to_0_1
}
@Override
- public ByteBuffer getContent(int offsetInMessage, int size)
+ public Collection<ByteBuffer> getContent(int offsetInMessage, int
size)
{
return message_0_8.getContent(offsetInMessage, size);
}
Modified:
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
Wed Jul 29 10:23:26 2015
@@ -21,6 +21,8 @@
package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -102,9 +104,9 @@ public class MessageConverter_1_0_to_v0_
}
@Override
- public ByteBuffer getContent(int offsetInMessage, int size)
+ public Collection<ByteBuffer> getContent(int offsetInMessage, int
size)
{
- return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ return Collections.singleton(ByteBuffer.wrap(messageContent,
offsetInMessage, size));
}
@Override
Modified:
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java?rev=1693236&r1=1693235&r2=1693236&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
(original)
+++
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
Wed Jul 29 10:23:26 2015
@@ -44,6 +44,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.util.ByteBufferUtils;
public class ReportRunner<T>
{
@@ -211,7 +212,7 @@ public class ReportRunner<T>
}
- private static ReportableMessage convertMessage(final ServerMessage
message)
+ private static ReportableMessage convertMessage(final ServerMessage<?>
message)
{
return new ReportableMessage()
{
@@ -230,7 +231,7 @@ public class ReportRunner<T>
@Override
public ByteBuffer getContent()
{
- ByteBuffer content = message.getContent(0, (int) getSize());
+ ByteBuffer content =
ByteBufferUtils.combine(message.getContent(0, (int) getSize()));
return content.asReadOnlyBuffer();
}
Added:
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java?rev=1693236&view=auto
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
(added)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
Wed Jul 29 10:23:26 2015
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+public class ByteBufferUtils
+{
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+ public static ByteBuffer combine(Collection<ByteBuffer> bufs)
+ {
+ if(bufs == null || bufs.isEmpty())
+ {
+ return EMPTY_BYTE_BUFFER;
+ }
+ else if(bufs.size() == 1)
+ {
+ return bufs.iterator().next();
+ }
+ else
+ {
+ int size = 0;
+ boolean isDirect = false;
+ for(ByteBuffer buf : bufs)
+ {
+ size += buf.remaining();
+ isDirect = isDirect || buf.isDirect();
+ }
+ ByteBuffer combined = isDirect ? ByteBuffer.allocateDirect(size) :
ByteBuffer.allocate(size);
+
+ for(ByteBuffer buf : bufs)
+ {
+ combined.put(buf.duplicate());
+ }
+ combined.flip();
+ return combined;
+ }
+ }
+}
Propchange:
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]