Repository: qpid-broker-j Updated Branches: refs/heads/6.1.x 33a653380 -> a53709ba6
QPID-8202: [Broker-J][AMQP 0-9] Make sure that message content is loaded from disk only once before sending it to the client Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a53709ba Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a53709ba Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a53709ba Branch: refs/heads/6.1.x Commit: a53709ba6d4373b037dbd54e01b3dba28fef6d4c Parents: 33a6533 Author: Alex Rudyy <[email protected]> Authored: Tue Jun 5 23:38:32 2018 +0100 Committer: Alex Rudyy <[email protected]> Committed: Sat Jun 9 22:57:56 2018 +0100 ---------------------------------------------------------------------- .../v0_8/ProtocolOutputConverterImpl.java | 173 +++++++++++-------- 1 file changed, 101 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a53709ba/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java index a66318a..85ae75a 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java @@ -216,27 +216,40 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } else { - int maxBodySize = (int) _connection.getMaxFrameSize() - AMQFrame.getFrameOverhead(); - - - int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - - int writtenSize = capacity; - - AMQBody firstContentBody = new MessageContentSourceBody(content, 0, capacity); - - CompositeAMQBodyBlock - compositeBlock = - new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); - writeFrame(compositeBlock); - - while (writtenSize < bodySize) + int maxFrameBodySize = (int) _connection.getMaxFrameSize() - AMQFrame.getFrameOverhead(); + Collection<QpidByteBuffer> contentByteBuffers = content.getContent(0, bodySize); + try { - capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; - AMQBody body = new MessageContentSourceBody(content, writtenSize, capacity); - writtenSize += capacity; + int contentChunkSize = bodySize > maxFrameBodySize ? maxFrameBodySize : bodySize; + Collection<QpidByteBuffer> chunk = getContent(contentByteBuffers, 0, contentChunkSize); + try + { + writeFrame(new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, new MessageContentSourceBody(chunk))); + } + finally + { + dispose(chunk); + } - writeFrame(new AMQFrame(channelId, body)); + int writtenSize = contentChunkSize; + while (writtenSize < bodySize) + { + contentChunkSize = bodySize - writtenSize > maxFrameBodySize ? maxFrameBodySize : bodySize - writtenSize; + chunk = getContent(contentByteBuffers, writtenSize, contentChunkSize); + try + { + writeFrame(new AMQFrame(channelId, new MessageContentSourceBody(chunk))); + writtenSize += contentChunkSize; + } + finally + { + dispose(chunk); + } + } + } + finally + { + dispose(contentByteBuffers); } } } @@ -246,18 +259,76 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return GZIP_ENCODING.equals(contentHeaderBody.getProperties().getEncoding()); } + private static void dispose(final Collection<QpidByteBuffer> contentByteBuffers) + { + for (QpidByteBuffer qbb: contentByteBuffers) + { + qbb.dispose(); + } + } + + private static Collection<QpidByteBuffer> getContent(final Collection<QpidByteBuffer> messageContent, + final int offset, + int length) + { + Collection<QpidByteBuffer> content = new ArrayList<>(messageContent.size()); + int pos = 0; + for (QpidByteBuffer buf : messageContent) + { + if (length > 0) + { + int bufRemaining = buf.remaining(); + if (pos + bufRemaining <= offset) + { + pos += bufRemaining; + } + else if (pos >= offset) + { + buf = buf.duplicate(); + if (bufRemaining <= length) + { + length -= bufRemaining; + } + else + { + buf.limit(length); + length = 0; + } + content.add(buf); + pos+=buf.remaining(); + + } + else + { + int offsetInBuf = offset - pos; + int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf; + final QpidByteBuffer bufView = buf.view(offsetInBuf, limit); + content.add(bufView); + length -= limit; + pos+=limit+offsetInBuf; + } + } + + } + return content; + + } + private class MessageContentSourceBody implements AMQBody { public static final byte TYPE = 3; private final int _length; - private final MessageContentSource _content; - private final int _offset; + private final Collection<QpidByteBuffer> _content; - public MessageContentSourceBody(MessageContentSource content, int offset, int length) + MessageContentSourceBody(Collection<QpidByteBuffer> content) { _content = content; - _offset = offset; - _length = length; + int size = 0; + for (QpidByteBuffer qbb: content) + { + size += qbb.remaining(); + } + _length = size; } public byte getFrameType() @@ -273,13 +344,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter @Override public long writePayload(final ByteBufferSender sender) { - long size = 0L; - for(QpidByteBuffer buf : _content.getContent(_offset, _length)) + long size = 0; + for (QpidByteBuffer qbb: _content) { - size += buf.remaining(); - - sender.send(buf); - buf.dispose(); + size += qbb.remaining(); + sender.send(qbb); } return size; } @@ -292,7 +361,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter @Override public String toString() { - return "[" + getClass().getSimpleName() + " offset: " + _offset + ", length: " + _length + "]"; + return "[" + getClass().getSimpleName() + " length: " + _length + "]"; } } @@ -561,47 +630,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter @Override public Collection<QpidByteBuffer> getContent(final int offset, int length) { - Collection<QpidByteBuffer> content = new ArrayList<>(_buffers.size()); - int pos = 0; - for (QpidByteBuffer buf : _buffers) - { - if (length > 0) - { - int bufRemaining = buf.remaining(); - if (pos + bufRemaining <= offset) - { - pos += bufRemaining; - } - else if (pos >= offset) - { - buf = buf.duplicate(); - if (bufRemaining <= length) - { - length -= bufRemaining; - } - else - { - buf.limit(length); - length = 0; - } - content.add(buf); - pos+=buf.remaining(); - - } - else - { - int offsetInBuf = offset - pos; - int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf; - final QpidByteBuffer bufView = buf.view(offsetInBuf, limit); - content.add(bufView); - length -= limit; - pos+=limit+offsetInBuf; - } - } - - } - return content; - + return ProtocolOutputConverterImpl.getContent(_buffers, offset, length); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
