This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 43336b3e57 ARTEMIS-5696 Ensure that staged bytes are not lost during 
flow control
43336b3e57 is described below

commit 43336b3e5770e00f21a36dde500a3ccb90cbe8d7
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Oct 7 16:35:00 2025 -0400

    ARTEMIS-5696 Ensure that staged bytes are not lost during flow control
    
    If the connection enters flow control and there are pending bytes in the 
frame
    buffer of the tunneled core large message writer we need to move them into 
the
    sender before releasing the frame buffer to avoid losing them.
---
 .../proton/AMQPTunneledCoreLargeMessageWriter.java |  23 +-
 .../AMQPTunneledCoreLargeMessageWriterTest.java    | 271 ++++++++++++++++++++-
 2 files changed, 288 insertions(+), 6 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
index 649eb82138..b5da502f4c 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
@@ -31,6 +31,7 @@ import 
org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.EncodingCodes;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
@@ -218,8 +219,8 @@ public class AMQPTunneledCoreLargeMessageWriter implements 
MessageWriter {
    private boolean trySendDeliveryAnnotations(ByteBuf frameBuffer, 
NettyReadable frameView) {
       for (; protonSender.getLocalState() != EndpointState.CLOSED && state == 
State.STREAMING_DELIVERY_ANNOTATIONS; ) {
          if (annotations != null && annotations.getValue() != null && 
!annotations.getValue().isEmpty()) {
-            if (!connection.flowControl(this::resume)) {
-               break; // Resume will restart writing the delivery annotations 
section from where we left off.
+            if (isFlowControlled(frameBuffer, frameView)) {
+               break;
             }
 
             final ByteBuf annotationsBuffer = 
getOrCreateDeliveryAnnotationsBuffer();
@@ -251,8 +252,8 @@ public class AMQPTunneledCoreLargeMessageWriter implements 
MessageWriter {
    // data could be sent due to a flow control event.
    private boolean trySendHeadersAndProperties(ByteBuf frameBuffer, 
NettyReadable frameView) {
       for (; protonSender.getLocalState() != EndpointState.CLOSED && state == 
State.STREAMING_CORE_HEADERS; ) {
-         if (!connection.flowControl(this::resume)) {
-            break; // Resume will restart writing the headers section from 
where we left off.
+         if (isFlowControlled(frameBuffer, frameView)) {
+            break;
          }
 
          final ByteBuf headerBuffer = getOrCreateMessageHeaderBuffer();
@@ -287,7 +288,7 @@ public class AMQPTunneledCoreLargeMessageWriter implements 
MessageWriter {
          final long bodySize = context.getSize();
 
          for (; protonSender.getLocalState() != EndpointState.CLOSED && state 
== State.STREAMING_BODY; ) {
-            if (!connection.flowControl(this::resume)) {
+            if (isFlowControlled(frameBuffer, frameView)) {
                break;
             }
 
@@ -364,6 +365,18 @@ public class AMQPTunneledCoreLargeMessageWriter implements 
MessageWriter {
       }
    }
 
+   private boolean isFlowControlled(ByteBuf frameBuffer, ReadableBuffer 
frameView) {
+      if (!connection.flowControl(this::resume)) {
+         if (frameBuffer.isReadable()) {
+            protonSender.send(frameView); // Store inflight data in the sender
+         }
+
+         return true;
+      } else {
+         return false;
+      }
+   }
+
    private void writeDataSectionTypeInfo(ByteBuf buffer, int encodedSize) {
       buffer.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
       buffer.writeByte(EncodingCodes.SMALLULONG);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
index 5e9651aa5c..41b724eefb 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
@@ -18,6 +18,7 @@
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -32,7 +33,9 @@ import static org.mockito.Mockito.when;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.artemis.core.message.LargeBodyReader;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
@@ -41,6 +44,7 @@ import 
org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.codec.EncoderImpl;
@@ -71,7 +75,8 @@ import io.netty.buffer.Unpooled;
 public class AMQPTunneledCoreLargeMessageWriterTest {
 
    private static final byte DATA_DESCRIPTOR = 0x75;
-   private final int SMALL_OUTGOING_FRAME_SIZE_LIMIT = 2048;
+   private static final int SMALL_OUTGOING_FRAME_SIZE_LIMIT = 2048;
+   private static final int LARGE_OUTGOING_FRAME_SIZE_LIMIT = 65535;
 
    @Mock
    ProtonServerSenderContext serverSender;
@@ -494,6 +499,270 @@ public class AMQPTunneledCoreLargeMessageWriterTest {
       verifyNoMoreInteractions(protonDelivery);
    }
 
+   @Test
+   public void 
testFlowControlEncounteredDuringCoreHeaderEncodeIntoFrameBufferWithDataPending()
 throws Exception {
+      
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(LARGE_OUTGOING_FRAME_SIZE_LIMIT);
+
+      final byte[] headersBytes = new byte[4];
+
+      headersBytes[0] = 4;
+      headersBytes[1] = 5;
+      headersBytes[2] = 6;
+      headersBytes[3] = 7;
+
+      final byte[] payloadBytes = new byte[4];
+
+      payloadBytes[0] = 1;
+      payloadBytes[1] = 2;
+      payloadBytes[2] = 3;
+      payloadBytes[3] = 4;
+
+      final DeliveryAnnotations annotations = new DeliveryAnnotations(new 
HashMap<>());
+
+      annotations.getValue().put(Symbol.valueOf("a"), "a");
+      annotations.getValue().put(Symbol.valueOf("b"), "b");
+      annotations.getValue().put(Symbol.valueOf("c"), "c");
+
+      AMQPTunneledCoreLargeMessageWriter writer = new 
AMQPTunneledCoreLargeMessageWriter(serverSender);
+
+      writer.open(Mockito.mock(MessageReference.class));
+
+      final ByteBuf expectedEncoding = Unpooled.buffer();
+      final AtomicInteger payloadReaderPosition = new AtomicInteger();
+
+      writeDeliveryAnnotations(expectedEncoding, annotations);
+
+      when(reference.getProtocolData(any())).thenReturn(annotations);
+
+      writeDataSection(expectedEncoding, headersBytes);
+      writeDataSection(expectedEncoding, payloadBytes);
+
+      when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE);
+      when(protonDelivery.isPartial()).thenReturn(true);
+      
when(message.getHeadersAndPropertiesEncodeSize()).thenReturn(headersBytes.length);
+
+      // Provides the simulated encoded core headers and properties
+      doAnswer(invocation -> {
+         final ByteBuf buffer = invocation.getArgument(0);
+
+         buffer.writeBytes(headersBytes);
+
+         return null;
+      }).when(message).encodeHeadersAndProperties(any(ByteBuf.class));
+
+      when(bodyReader.getSize()).thenReturn((long) payloadBytes.length);
+
+      final ByteBuf encodedByteBuf = Unpooled.buffer();
+      final NettyWritable encodedBytes = new NettyWritable(encodedByteBuf);
+
+      // Answer back with the amount of writable bytes
+      doAnswer(invocation -> {
+         final ByteBuffer buffer = invocation.getArgument(0);
+         final int readSize = Math.min(buffer.remaining(), payloadBytes.length 
- payloadReaderPosition.get());
+
+         buffer.put(payloadBytes, payloadReaderPosition.get(), readSize);
+         payloadReaderPosition.addAndGet(readSize);
+
+         return readSize;
+      }).when(bodyReader).readInto(any());
+
+      final AtomicInteger flowControlCalls = new AtomicInteger();
+      final AtomicBoolean hasFlowControlled = new AtomicBoolean();
+
+      // Capture the write for comparison, this avoid issues with released 
netty buffers
+      doAnswer(invocation -> {
+         final ReadableBuffer buffer = invocation.getArgument(0);
+
+         encodedBytes.put(buffer);
+
+         return null;
+      }).when(protonSender).send(any());
+
+      final AtomicReference<ReadyListener> flowControlResume = new 
AtomicReference<>();
+
+      // To recover from flow control we need to run the callback
+      doAnswer(invocation -> {
+         final Runnable recovery = invocation.getArgument(0);
+         recovery.run();
+         return false;
+      }).when(connectionContext).runNow(any());
+
+      doAnswer(invocation -> {
+
+         flowControlResume.set(invocation.getArgument(0));
+
+         // Flow control on second call which is the write of Core headers and 
properties which
+         // has the delivery annotations sitting in wait as they fit into the 
frame buffer without
+         // need for a flush yet.
+         if (flowControlCalls.incrementAndGet() == 2 && 
!hasFlowControlled.getAndSet(true)) {
+            return false;
+         } else {
+            return true;
+         }
+      }).when(connectionContext).flowControl(any());
+
+      try {
+         writer.writeBytes(reference);
+      } catch (IllegalStateException e) {
+         fail("Should not throw from flow controlled write.");
+      }
+
+      assertNotNull(flowControlResume.get());
+
+      flowControlResume.get().readyForWriting();
+
+      verify(message).usageUp();
+      verify(message).getLargeBodyReader();
+      verify(message).getHeadersAndPropertiesEncodeSize();
+      verify(message).encodeHeadersAndProperties(any(ByteBuf.class));
+      verify(reference).getMessage();
+      verify(reference).getProtocolData(any());
+      verify(protonSender).getSession();
+      verify(protonDelivery).getTag();
+      verify(protonSender, atLeastOnce()).getLocalState();
+      verify(protonSender, atLeastOnce()).send(any(ReadableBuffer.class));
+
+      assertTrue(encodedByteBuf.isReadable());
+      assertEquals(expectedEncoding.readableBytes(), 
encodedByteBuf.readableBytes());
+      assertEquals(expectedEncoding, encodedByteBuf);
+
+      verifyNoMoreInteractions(message);
+      verifyNoMoreInteractions(reference);
+      verifyNoMoreInteractions(protonDelivery);
+   }
+
+   @Test
+   public void 
testFlowControlEncounteredDuringCoreBodyEncodeIntoFrameBufferWithDataPending() 
throws Exception {
+      
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(LARGE_OUTGOING_FRAME_SIZE_LIMIT);
+
+      final byte[] headersBytes = new byte[4];
+
+      headersBytes[0] = 4;
+      headersBytes[1] = 5;
+      headersBytes[2] = 6;
+      headersBytes[3] = 7;
+
+      final byte[] payloadBytes = new byte[4];
+
+      payloadBytes[0] = 1;
+      payloadBytes[1] = 2;
+      payloadBytes[2] = 3;
+      payloadBytes[3] = 4;
+
+      final DeliveryAnnotations annotations = new DeliveryAnnotations(new 
HashMap<>());
+
+      annotations.getValue().put(Symbol.valueOf("a"), "a");
+      annotations.getValue().put(Symbol.valueOf("b"), "b");
+      annotations.getValue().put(Symbol.valueOf("c"), "c");
+
+      AMQPTunneledCoreLargeMessageWriter writer = new 
AMQPTunneledCoreLargeMessageWriter(serverSender);
+
+      writer.open(Mockito.mock(MessageReference.class));
+
+      final ByteBuf expectedEncoding = Unpooled.buffer();
+      final AtomicInteger payloadReaderPosition = new AtomicInteger();
+
+      writeDeliveryAnnotations(expectedEncoding, annotations);
+
+      when(reference.getProtocolData(any())).thenReturn(annotations);
+
+      writeDataSection(expectedEncoding, headersBytes);
+      writeDataSection(expectedEncoding, payloadBytes);
+
+      when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE);
+      when(protonDelivery.isPartial()).thenReturn(true);
+      
when(message.getHeadersAndPropertiesEncodeSize()).thenReturn(headersBytes.length);
+
+      // Provides the simulated encoded core headers and properties
+      doAnswer(invocation -> {
+         final ByteBuf buffer = invocation.getArgument(0);
+
+         buffer.writeBytes(headersBytes);
+
+         return null;
+      }).when(message).encodeHeadersAndProperties(any(ByteBuf.class));
+
+      when(bodyReader.getSize()).thenReturn((long) payloadBytes.length);
+
+      final ByteBuf encodedByteBuf = Unpooled.buffer();
+      final NettyWritable encodedBytes = new NettyWritable(encodedByteBuf);
+
+      // Answer back with the amount of writable bytes
+      doAnswer(invocation -> {
+         final ByteBuffer buffer = invocation.getArgument(0);
+         final int readSize = Math.min(buffer.remaining(), payloadBytes.length 
- payloadReaderPosition.get());
+
+         buffer.put(payloadBytes, payloadReaderPosition.get(), readSize);
+         payloadReaderPosition.addAndGet(readSize);
+
+         return readSize;
+      }).when(bodyReader).readInto(any());
+
+      final AtomicInteger flowControlCalls = new AtomicInteger();
+      final AtomicBoolean hasFlowControlled = new AtomicBoolean();
+
+      // Capture the write for comparison, this avoid issues with released 
netty buffers
+      doAnswer(invocation -> {
+         final ReadableBuffer buffer = invocation.getArgument(0);
+
+         encodedBytes.put(buffer);
+
+         return null;
+      }).when(protonSender).send(any());
+
+      final AtomicReference<ReadyListener> flowControlResume = new 
AtomicReference<>();
+
+      // To recover from flow control we need to run the callback
+      doAnswer(invocation -> {
+         final Runnable recovery = invocation.getArgument(0);
+         recovery.run();
+         return false;
+      }).when(connectionContext).runNow(any());
+
+      doAnswer(invocation -> {
+
+         flowControlResume.set(invocation.getArgument(0));
+
+         // Flow control on third call which is the first step into the body 
write from file which
+         // has the delivery annotations and core header encoding sitting in 
wait as they fit into
+         // the frame buffer without need for a flush yet.
+         if (flowControlCalls.incrementAndGet() == 3 && 
!hasFlowControlled.getAndSet(true)) {
+            return false;
+         } else {
+            return true;
+         }
+      }).when(connectionContext).flowControl(any());
+
+      try {
+         writer.writeBytes(reference);
+      } catch (IllegalStateException e) {
+         fail("Should not throw from flow controlled write.");
+      }
+
+      assertNotNull(flowControlResume.get());
+
+      flowControlResume.get().readyForWriting();
+
+      verify(message).usageUp();
+      verify(message, atLeastOnce()).getLargeBodyReader();
+      verify(message).getHeadersAndPropertiesEncodeSize();
+      verify(message).encodeHeadersAndProperties(any(ByteBuf.class));
+      verify(reference).getMessage();
+      verify(reference).getProtocolData(any());
+      verify(protonSender).getSession();
+      verify(protonDelivery).getTag();
+      verify(protonSender, atLeastOnce()).getLocalState();
+      verify(protonSender, atLeastOnce()).send(any(ReadableBuffer.class));
+
+      assertTrue(encodedByteBuf.isReadable());
+      assertEquals(expectedEncoding.readableBytes(), 
encodedByteBuf.readableBytes());
+      assertEquals(expectedEncoding, encodedByteBuf);
+
+      verifyNoMoreInteractions(message);
+      verifyNoMoreInteractions(reference);
+      verifyNoMoreInteractions(protonDelivery);
+   }
+
    private void writeDeliveryAnnotations(ByteBuf buffer, DeliveryAnnotations 
annotations) {
       final EncoderImpl encoder = TLSEncode.getEncoder();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to