[ 
https://issues.apache.org/jira/browse/ARTEMIS-3449?focusedWorklogId=645179&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-645179
 ]

ASF GitHub Bot logged work on ARTEMIS-3449:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Sep/21 09:47
            Start Date: 01/Sep/21 09:47
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on a change in pull request #3711:
URL: https://github.com/apache/activemq-artemis/pull/3711#discussion_r699377134



##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,52 +586,48 @@ void deliver() {
          LargeBodyReader context = message.getLargeBodyReader();
          try {
             context.open();
+            final ByteBuf tmpFrameBuf = 
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+            final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
             try {
+
                context.position(position);
                long bodySize = context.getSize();
-
-               ByteBuffer buf = ByteBuffer.allocate(frameSize);
+               // materialize it so we can use its internal NIO buffer
+               tmpFrameBuf.ensureWritable(frameSize);
 
                for (; sender.getLocalState() != EndpointState.CLOSED && 
position < bodySize; ) {
                   if (!connection.flowControl(this::resume)) {
                      context.close();
                      return;
                   }
-                  buf.clear();
-                  int size = 0;
-
-                  try {
-                     if (position == 0) {
-                        replaceInitialHeader(deliveryAnnotationsToEncode, 
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
-                     }
-                     size = context.readInto(buf);
-
-                     sender.send(new ReadableBuffer.ByteBufferReader(buf));
-                     position += size;
-                  } catch (java.nio.BufferOverflowException overflowException) 
{
-                     if (position == 0) {
-                        if (log.isDebugEnabled()) {
-                           log.debug("Delivery of message failed with an 
overFlowException, retrying again with expandable buffer");
-                        }
-                        // on the very first packet, if the initial header was 
replaced with a much bigger header (re-encoding)
-                        // we could recover the situation with a retry using 
an expandable buffer.
-                        // this is tested on 
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
-                        size = 
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context, 
buf);
+                  tmpFrameBuf.clear();
+                  int headerSize = 0;
+                  int readLimit = frameSize;
+                  if (position == 0) {
+                     replaceInitialHeader(deliveryAnnotationsToEncode, 
context, new NettyWritable(tmpFrameBuf));
+                     headerSize = tmpFrameBuf.writerIndex();
+                     if (headerSize <= frameSize) {
+                        readLimit = frameSize - headerSize;

Review comment:
       This still feels unnecessarily complicated. It complicates the whole 
remainder of the method, for a corner case thats actually quite unlikely to 
even arise. The variable is also confusingly named since it is absolutely not 
the 'header size', its only non-header stuff than can ever get it into this 
situation.
   
   Same comment as yesterdays prior version, why not just send the generated 
first payload at this point (as the old version did when the initial buffer 
wasnt big enough, in retryInitialPacketWithExpandableBuffer()) and then replace 
the buffer with an appropriately initial-sized one, and continue the loop 
(assuming there is more to be sent). That would consolidate all the complexity 
inside this sub-part of the 'if position=0' handling, and leave the rest of the 
handling as the most trivial 'read data, send data' as it can be.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
##########
@@ -146,16 +147,6 @@ private AMQPLargeMessage(final AMQPLargeMessage copy,
       setMessageID(newID);
    }
 
-   public void openLargeMessage() throws Exception {

Review comment:
       The 'getData()' method appears to depend on this method having created 
"parsingData". Though I admit this method does seem ununused, getData() seems 
to be used a lot. I thus wonder why this seems unused

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
##########
@@ -146,16 +147,6 @@ private AMQPLargeMessage(final AMQPLargeMessage copy,
       setMessageID(newID);
    }
 
-   public void openLargeMessage() throws Exception {

Review comment:
       There is a buffer that this opens. That buffer is still being referenced 
in getData(), and throws but only if it isnt set, which it never will be if 
this isnt used. One of these situations seems like it has to be wrong.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 645179)
    Time Spent: 8h 10m  (was: 8h)

> Speedup AMQP large message streaming
> ------------------------------------
>
>                 Key: ARTEMIS-3449
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3449
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>            Reporter: Francesco Nigro
>            Assignee: Francesco Nigro
>            Priority: Major
>          Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> AMQP is using unpooled heap ByteBuffer(s) to stream AMQP large messages: 
> given that the underline NIO sequential file can both use FileChannel or 
> RandomAccessFile (depending if the ByteBuffer used is direct/heap based), 
> both approaches would benefit from using Netty pooled direct buffers and save 
> additional copies (performed by RandomAccessFile) to happen, reducing GC too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to