Repository: qpid-broker-j
Updated Branches:
  refs/heads/6.1.x 33a653380 -> a53709ba6


QPID-8202: [Broker-J][AMQP 0-9] Make sure that message content is loaded from 
disk only once before sending it to the client


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a53709ba
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a53709ba
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a53709ba

Branch: refs/heads/6.1.x
Commit: a53709ba6d4373b037dbd54e01b3dba28fef6d4c
Parents: 33a6533
Author: Alex Rudyy <[email protected]>
Authored: Tue Jun 5 23:38:32 2018 +0100
Committer: Alex Rudyy <[email protected]>
Committed: Sat Jun 9 22:57:56 2018 +0100

----------------------------------------------------------------------
 .../v0_8/ProtocolOutputConverterImpl.java       | 173 +++++++++++--------
 1 file changed, 101 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a53709ba/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
index a66318a..85ae75a 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
@@ -216,27 +216,40 @@ public class ProtocolOutputConverterImpl implements 
ProtocolOutputConverter
         }
         else
         {
-            int maxBodySize = (int) _connection.getMaxFrameSize() - 
AMQFrame.getFrameOverhead();
-
-
-            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
-            int writtenSize = capacity;
-
-            AMQBody firstContentBody = new MessageContentSourceBody(content, 
0, capacity);
-
-            CompositeAMQBodyBlock
-                    compositeBlock =
-                    new CompositeAMQBodyBlock(channelId, deliverBody, 
contentHeaderBody, firstContentBody);
-            writeFrame(compositeBlock);
-
-            while (writtenSize < bodySize)
+            int maxFrameBodySize = (int) _connection.getMaxFrameSize() - 
AMQFrame.getFrameOverhead();
+            Collection<QpidByteBuffer> contentByteBuffers = 
content.getContent(0, bodySize);
+            try
             {
-                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize 
: bodySize - writtenSize;
-                AMQBody body = new MessageContentSourceBody(content, 
writtenSize, capacity);
-                writtenSize += capacity;
+                int contentChunkSize = bodySize > maxFrameBodySize ? 
maxFrameBodySize : bodySize;
+                Collection<QpidByteBuffer> chunk = 
getContent(contentByteBuffers, 0, contentChunkSize);
+                try
+                {
+                    writeFrame(new CompositeAMQBodyBlock(channelId, 
deliverBody, contentHeaderBody, new MessageContentSourceBody(chunk)));
+                }
+                finally
+                {
+                    dispose(chunk);
+                }
 
-                writeFrame(new AMQFrame(channelId, body));
+                int writtenSize = contentChunkSize;
+                while (writtenSize < bodySize)
+                {
+                    contentChunkSize = bodySize - writtenSize > 
maxFrameBodySize ? maxFrameBodySize : bodySize - writtenSize;
+                    chunk = getContent(contentByteBuffers, writtenSize, 
contentChunkSize);
+                    try
+                    {
+                        writeFrame(new AMQFrame(channelId, new 
MessageContentSourceBody(chunk)));
+                        writtenSize += contentChunkSize;
+                    }
+                    finally
+                    {
+                        dispose(chunk);
+                    }
+                }
+            }
+            finally
+            {
+                dispose(contentByteBuffers);
             }
         }
     }
@@ -246,18 +259,76 @@ public class ProtocolOutputConverterImpl implements 
ProtocolOutputConverter
         return 
GZIP_ENCODING.equals(contentHeaderBody.getProperties().getEncoding());
     }
 
+    private static void dispose(final Collection<QpidByteBuffer> 
contentByteBuffers)
+    {
+        for (QpidByteBuffer qbb: contentByteBuffers)
+        {
+            qbb.dispose();
+        }
+    }
+
+    private static Collection<QpidByteBuffer> getContent(final 
Collection<QpidByteBuffer> messageContent,
+                                                         final int offset,
+                                                         int length)
+    {
+        Collection<QpidByteBuffer> content = new 
ArrayList<>(messageContent.size());
+        int pos = 0;
+        for (QpidByteBuffer buf : messageContent)
+        {
+            if (length > 0)
+            {
+                int bufRemaining = buf.remaining();
+                if (pos + bufRemaining <= offset)
+                {
+                    pos += bufRemaining;
+                }
+                else if (pos >= offset)
+                {
+                    buf = buf.duplicate();
+                    if (bufRemaining <= length)
+                    {
+                        length -= bufRemaining;
+                    }
+                    else
+                    {
+                        buf.limit(length);
+                        length = 0;
+                    }
+                    content.add(buf);
+                    pos+=buf.remaining();
+
+                }
+                else
+                {
+                    int offsetInBuf = offset - pos;
+                    int limit = length < bufRemaining - offsetInBuf ? length : 
bufRemaining - offsetInBuf;
+                    final QpidByteBuffer bufView = buf.view(offsetInBuf, 
limit);
+                    content.add(bufView);
+                    length -= limit;
+                    pos+=limit+offsetInBuf;
+                }
+            }
+
+        }
+        return content;
+
+    }
+
     private class MessageContentSourceBody implements AMQBody
     {
         public static final byte TYPE = 3;
         private final int _length;
-        private final MessageContentSource _content;
-        private final int _offset;
+        private final Collection<QpidByteBuffer> _content;
 
-        public MessageContentSourceBody(MessageContentSource content, int 
offset, int length)
+        MessageContentSourceBody(Collection<QpidByteBuffer> content)
         {
             _content = content;
-            _offset = offset;
-            _length = length;
+             int size = 0;
+            for (QpidByteBuffer qbb: content)
+            {
+                size += qbb.remaining();
+            }
+            _length = size;
         }
 
         public byte getFrameType()
@@ -273,13 +344,11 @@ public class ProtocolOutputConverterImpl implements 
ProtocolOutputConverter
         @Override
         public long writePayload(final ByteBufferSender sender)
         {
-            long size = 0L;
-            for(QpidByteBuffer buf : _content.getContent(_offset, _length))
+            long size = 0;
+            for (QpidByteBuffer qbb: _content)
             {
-                size += buf.remaining();
-
-                sender.send(buf);
-                buf.dispose();
+                size += qbb.remaining();
+                sender.send(qbb);
             }
             return size;
         }
@@ -292,7 +361,7 @@ public class ProtocolOutputConverterImpl implements 
ProtocolOutputConverter
         @Override
         public String toString()
         {
-            return "[" + getClass().getSimpleName() + " offset: " + _offset + 
", length: " + _length + "]";
+            return "[" + getClass().getSimpleName() + " length: " + _length + 
"]";
         }
 
     }
@@ -561,47 +630,7 @@ public class ProtocolOutputConverterImpl implements 
ProtocolOutputConverter
         @Override
         public Collection<QpidByteBuffer> getContent(final int offset, int 
length)
         {
-            Collection<QpidByteBuffer> content = new 
ArrayList<>(_buffers.size());
-            int pos = 0;
-            for (QpidByteBuffer buf : _buffers)
-            {
-                if (length > 0)
-                {
-                    int bufRemaining = buf.remaining();
-                    if (pos + bufRemaining <= offset)
-                    {
-                        pos += bufRemaining;
-                    }
-                    else if (pos >= offset)
-                    {
-                        buf = buf.duplicate();
-                        if (bufRemaining <= length)
-                        {
-                            length -= bufRemaining;
-                        }
-                        else
-                        {
-                            buf.limit(length);
-                            length = 0;
-                        }
-                        content.add(buf);
-                        pos+=buf.remaining();
-
-                    }
-                    else
-                    {
-                        int offsetInBuf = offset - pos;
-                        int limit = length < bufRemaining - offsetInBuf ? 
length : bufRemaining - offsetInBuf;
-                        final QpidByteBuffer bufView = buf.view(offsetInBuf, 
limit);
-                        content.add(bufView);
-                        length -= limit;
-                        pos+=limit+offsetInBuf;
-                    }
-                }
-
-            }
-            return content;
-
+            return ProtocolOutputConverterImpl.getContent(_buffers, offset, 
length);
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to