This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fb5b89  ARTEMIS-3449: fix some issues from using position to track an 
unrelated state, plus simplify some previous changes and more
8fb5b89 is described below

commit 8fb5b8969f98b0994b9563cb7f5441e58253d89f
Author: Robbie Gemmell <[email protected]>
AuthorDate: Thu Sep 9 10:37:36 2021 +0100

    ARTEMIS-3449: fix some issues from using position to track an unrelated 
state, plus simplify some previous changes and more
---
 .../amqp/proton/ProtonServerSenderContext.java     | 48 +++++++++++-----------
 1 file changed, 25 insertions(+), 23 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 32585d7..da09c6c 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -563,26 +563,16 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
       final MessageReference reference;
       final AMQPLargeMessage message;
       final Delivery delivery;
+      boolean initialPacketHandled;
 
       void resume() {
          connection.runNow(this::deliver);
       }
 
       void deliver() {
-
          // This is discounting some bytes due to Transfer payload
          final int frameSize = 
protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit()
 - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);
 
-         DeliveryAnnotations deliveryAnnotationsToEncode;
-
-         message.checkReference(reference);
-
-         if (reference.getProtocolData() != null && 
reference.getProtocolData() instanceof DeliveryAnnotations) {
-            deliveryAnnotationsToEncode = 
(DeliveryAnnotations)reference.getProtocolData();
-         } else {
-            deliveryAnnotationsToEncode = null;
-         }
-
          try {
             final ByteBuf frameBuffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize);
             final NettyReadable frameView = new NettyReadable(frameBuffer);
@@ -593,10 +583,12 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                // materialize it so we can use its internal NIO buffer
                frameBuffer.ensureWritable(frameSize);
 
-               if (position == 0 && sender.getLocalState() != 
EndpointState.CLOSED && position < bodySize) {
-                  if (!deliverInitialPacket(context, 
deliveryAnnotationsToEncode, frameBuffer)) {
+               if (!initialPacketHandled && sender.getLocalState() != 
EndpointState.CLOSED) {
+                  if (!deliverInitialPacket(context, frameBuffer)) {
                      return;
                   }
+
+                  initialPacketHandled = true;
                }
 
                for (; sender.getLocalState() != EndpointState.CLOSED && 
position < bodySize; ) {
@@ -614,7 +606,6 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                   position += readSize;
 
                   if (readSize > 0) {
-
                      if (position < bodySize) {
                         connection.instantFlush();
                      }
@@ -650,13 +641,24 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
       }
 
       private boolean deliverInitialPacket(final LargeBodyReader context,
-                                           final DeliveryAnnotations 
deliveryAnnotationsToEncode,
                                            final ByteBuf frameBuffer) throws 
Exception {
-         assert position == 0 && context.position() == 0;
+         assert position == 0 && context.position() == 0 && 
!initialPacketHandled;
+
          if (!connection.flowControl(this::resume)) {
             return false;
          }
+
          frameBuffer.clear();
+
+         DeliveryAnnotations deliveryAnnotationsToEncode;
+         message.checkReference(reference);
+
+         if (reference.getProtocolData() != null && 
reference.getProtocolData() instanceof DeliveryAnnotations) {
+            deliveryAnnotationsToEncode = 
(DeliveryAnnotations)reference.getProtocolData();
+         } else {
+            deliveryAnnotationsToEncode = null;
+         }
+
          try {
             replaceInitialHeader(deliveryAnnotationsToEncode, context, new 
NettyWritable(frameBuffer));
          } catch (IndexOutOfBoundsException indexOutOfBoundsException) {
@@ -670,15 +672,15 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
             sendAndFlushInitialPacket(deliveryAnnotationsToEncode, context);
             return true;
          }
+
+         int readSize = 0;
          final int writableBytes = frameBuffer.writableBytes();
-         if (writableBytes == 0) {
-            sender.send(new NettyReadable(frameBuffer));
-            connection.instantFlush();
-            return true;
+         if (writableBytes != 0) {
+            final int writtenBytes = frameBuffer.writerIndex();
+            readSize = 
context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes));
+            frameBuffer.writerIndex(writtenBytes + readSize);
          }
-         final int writtenBytes = frameBuffer.writerIndex();
-         final int readSize = 
context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes));
-         frameBuffer.writerIndex(writtenBytes + readSize);
+
          sender.send(new NettyReadable(frameBuffer));
          position += readSize;
          connection.instantFlush();

Reply via email to