[ 
https://issues.apache.org/jira/browse/ARTEMIS-3449?focusedWorklogId=645914&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-645914
 ]

ASF GitHub Bot logged work on ARTEMIS-3449:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Sep/21 16:44
            Start Date: 02/Sep/21 16:44
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on a change in pull request #3711:
URL: https://github.com/apache/activemq-artemis/pull/3711#discussion_r701111216



##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -584,56 +583,45 @@ void deliver() {
             deliveryAnnotationsToEncode = null;
          }
 
-         LargeBodyReader context = message.getLargeBodyReader();
          try {
-            context.open();
-            try {
+            final ByteBuf frameBuffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize);
+            final NettyReadable frameView = new NettyReadable(frameBuffer);
+            try (LargeBodyReader context = message.getLargeBodyReader()) {
+               context.open();
                context.position(position);
                long bodySize = context.getSize();
+               // materialize it so we can use its internal NIO buffer
+               frameBuffer.ensureWritable(frameSize);
 
-               ByteBuffer buf = ByteBuffer.allocate(frameSize);
+               if (position == 0 && sender.getLocalState() != 
EndpointState.CLOSED && position < bodySize) {

Review comment:
       "position = 0..." is a pretty awkward way of saying 'have we finished 
doing the required initial packet work before'. Adding a specific boolean for 
this might be better. Also it doesnt necessarily actually matter if position 
<bodySize() since whats to be sent might not be in that original body.
   
   Thinking about it, checking for position 0 its also potentially wrong as it 
stands, as its not impossible that the original buffer position isnt bumped 
even when actually sending something for the initial packet (e.g there are no 
delivery-annotations or header in the original message to skip past, or the 
subsequently generated ones precisely fit in the buffer with room for nothing 
else, or the connection was flow controlled before sending anything else) and 
so we may not have any need to increment position >0 but did do the work 
already. Which could mean it gets done again on a subsequent run through.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -572,7 +571,7 @@ void resume() {
       void deliver() {
 
          // This is discounting some bytes due to Transfer payload
-         int frameSize = 
protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit()
 - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);
+         final int frameSize = 
protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit()
 - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);

Review comment:
       More than that, this will be the same every time for a given message so 
theres no point calculating it here.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -661,43 +649,67 @@ void deliver() {
          }
       }
 
+      private boolean deliverInitialPacket(final LargeBodyReader context,
+                                           final DeliveryAnnotations 
deliveryAnnotationsToEncode,
+                                           final ByteBuf frameBuffer) throws 
Exception {
+         assert position == 0 && context.position() == 0;
+         if (!connection.flowControl(this::resume)) {
+            return false;
+         }
+         frameBuffer.clear();
+         try {
+            replaceInitialHeader(deliveryAnnotationsToEncode, context, new 
NettyWritable(frameBuffer));
+         } catch (IndexOutOfBoundsException indexOutOfBoundsException) {
+            assert position == 0 : "this shouldn't happen unless 
replaceInitialHeader is updating position before modifying frameBuffer";
+            if (log.isDebugEnabled()) {
+               log.debug("Delivery of message failed with an 
overFlowException, retrying again with expandable buffer");
+            }
+            // on the very first packet, if the initial header was replaced 
with a much bigger header (re-encoding)
+            // we could recover the situation with a retry using an expandable 
buffer.
+            // this is tested on 
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
+            sendAndFlushInitialPacket(deliveryAnnotationsToEncode, context);
+            return true;
+         }
+         final int writableBytes = frameBuffer.writableBytes();
+         if (writableBytes == 0) {
+            sender.send(new NettyReadable(frameBuffer));
+            connection.instantFlush();
+            return true;
+         }
+         final int writtenBytes = frameBuffer.writerIndex();
+         final int readSize = 
context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes));
+         frameBuffer.writerIndex(writtenBytes + readSize);
+         sender.send(new NettyReadable(frameBuffer));
+         position += readSize;
+         connection.instantFlush();
+         return true;

Review comment:
       I think perhaps this can be simplified.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 645914)
    Time Spent: 8h 50m  (was: 8h 40m)

> Speedup AMQP large message streaming
> ------------------------------------
>
>                 Key: ARTEMIS-3449
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3449
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>            Reporter: Francesco Nigro
>            Assignee: Francesco Nigro
>            Priority: Major
>          Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> AMQP is using unpooled heap ByteBuffer(s) to stream AMQP large messages: 
> given that the underline NIO sequential file can both use FileChannel or 
> RandomAccessFile (depending if the ByteBuffer used is direct/heap based), 
> both approaches would benefit from using Netty pooled direct buffers and save 
> additional copies (performed by RandomAccessFile) to happen, reducing GC too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to