This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 108d1ce6e2 ARTEMIS-5689 Fix tunneled core large message writer encoding
108d1ce6e2 is described below

commit 108d1ce6e20fab0b0335fe7e1a5a0bf0b550823a
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Oct 3 14:41:01 2025 -0400

    ARTEMIS-5689 Fix tunneled core large message writer encoding
    
    Ensure that the read size from the delivery annotations and the core
    header and properties encoding buffer is calculated correctly so that
    when spanning multiple frames an error is not thrown from the copy.
---
 .../proton/AMQPTunneledCoreLargeMessageWriter.java |  12 +-
 .../AMQPTunneledCoreLargeMessageWriterTest.java    | 185 +++++++++++++++++++++
 2 files changed, 188 insertions(+), 9 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
index a9d53eb054..649eb82138 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
@@ -219,13 +219,11 @@ public class AMQPTunneledCoreLargeMessageWriter 
implements MessageWriter {
       for (; protonSender.getLocalState() != EndpointState.CLOSED && state == 
State.STREAMING_DELIVERY_ANNOTATIONS; ) {
          if (annotations != null && annotations.getValue() != null && 
!annotations.getValue().isEmpty()) {
             if (!connection.flowControl(this::resume)) {
-               break; // Resume will restart writing the headers section from 
where we left off.
+               break; // Resume will restart writing the delivery annotations 
section from where we left off.
             }
 
             final ByteBuf annotationsBuffer = 
getOrCreateDeliveryAnnotationsBuffer();
-            final int readSize = (int) Math.min(frameBuffer.writableBytes(), 
annotationsBuffer.readableBytes() - position);
-
-            position += readSize;
+            final int readSize = Math.min(frameBuffer.writableBytes(), 
annotationsBuffer.readableBytes());
 
             annotationsBuffer.readBytes(frameBuffer, readSize);
 
@@ -239,7 +237,6 @@ public class AMQPTunneledCoreLargeMessageWriter implements 
MessageWriter {
 
             if (!annotationsBuffer.isReadable()) {
                encodingBuffer = null;
-               position = 0;
                state = State.STREAMING_CORE_HEADERS;
             }
          } else {
@@ -259,9 +256,7 @@ public class AMQPTunneledCoreLargeMessageWriter implements 
MessageWriter {
          }
 
          final ByteBuf headerBuffer = getOrCreateMessageHeaderBuffer();
-         final int readSize = (int) Math.min(frameBuffer.writableBytes(), 
headerBuffer.readableBytes() - position);
-
-         position += readSize;
+         final int readSize = Math.min(frameBuffer.writableBytes(), 
headerBuffer.readableBytes());
 
          headerBuffer.readBytes(frameBuffer, readSize);
 
@@ -275,7 +270,6 @@ public class AMQPTunneledCoreLargeMessageWriter implements 
MessageWriter {
 
          if (!headerBuffer.isReadable()) {
             encodingBuffer = null;
-            position = 0;
             state = State.STREAMING_BODY;
          }
       }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
index 8ec31f3dba..5e9651aa5c 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
@@ -30,7 +30,10 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.activemq.artemis.core.message.LargeBodyReader;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import 
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
@@ -68,6 +71,7 @@ import io.netty.buffer.Unpooled;
 public class AMQPTunneledCoreLargeMessageWriterTest {
 
    private static final byte DATA_DESCRIPTOR = 0x75;
+   private final int SMALL_OUTGOING_FRAME_SIZE_LIMIT = 2048;
 
    @Mock
    ProtonServerSenderContext serverSender;
@@ -273,6 +277,187 @@ public class AMQPTunneledCoreLargeMessageWriterTest {
       verifyNoMoreInteractions(protonDelivery);
    }
 
+   @Test
+   public void 
testMessageEncodingWrittenToDeliveryWithDeliveryAnnotationsThatExceedFrameSize()
 throws Exception {
+      
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(SMALL_OUTGOING_FRAME_SIZE_LIMIT);
+
+      final byte[] headersBytes = new byte[4];
+
+      headersBytes[0] = 4;
+      headersBytes[1] = 5;
+      headersBytes[2] = 6;
+      headersBytes[3] = 7;
+
+      final byte[] payloadBytes = new byte[4];
+
+      payloadBytes[0] = 1;
+      payloadBytes[1] = 2;
+      payloadBytes[2] = 3;
+      payloadBytes[3] = 4;
+
+      final DeliveryAnnotations annotations = new DeliveryAnnotations(new 
HashMap<>());
+
+      annotations.getValue().put(Symbol.valueOf("a"), 
"a".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("b"), 
"b".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("c"), 
"c".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("d"), 
"d".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("e"), 
"e".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+
+      
doTestMessageEncodingForTunneledCoreLargeMessageAcrossFrames(annotations, 
headersBytes, payloadBytes);
+   }
+
+   @Test
+   public void 
testMessageEncodingWrittenToDeliveryWithCoreHeaderEncodingThatExceedsFrameSize()
 throws Exception {
+      
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(SMALL_OUTGOING_FRAME_SIZE_LIMIT);
+
+      final byte[] headersBytes = "AA".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT 
+ 1).getBytes(StandardCharsets.US_ASCII);
+      final byte[] payloadBytes = new byte[4];
+
+      payloadBytes[0] = 1;
+      payloadBytes[1] = 2;
+      payloadBytes[2] = 3;
+      payloadBytes[3] = 4;
+
+      final DeliveryAnnotations annotations = new DeliveryAnnotations(new 
HashMap<>());
+
+      annotations.getValue().put(Symbol.valueOf("a"), "a");
+      annotations.getValue().put(Symbol.valueOf("b"), "b");
+      annotations.getValue().put(Symbol.valueOf("c"), "c");
+      annotations.getValue().put(Symbol.valueOf("d"), "d");
+      annotations.getValue().put(Symbol.valueOf("e"), "e");
+
+      
doTestMessageEncodingForTunneledCoreLargeMessageAcrossFrames(annotations, 
headersBytes, payloadBytes);
+   }
+
+   @Test
+   public void 
testMessageEncodingWrittenToDeliveryWithBothDAandCoreHeadersExceedingFrameSize()
 throws Exception {
+      
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(SMALL_OUTGOING_FRAME_SIZE_LIMIT);
+
+      final byte[] headersBytes = 
"AA".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT).getBytes(StandardCharsets.US_ASCII);
+      final byte[] payloadBytes = new byte[4];
+
+      payloadBytes[0] = 1;
+      payloadBytes[1] = 2;
+      payloadBytes[2] = 3;
+      payloadBytes[3] = 4;
+
+      final DeliveryAnnotations annotations = new DeliveryAnnotations(new 
HashMap<>());
+
+      annotations.getValue().put(Symbol.valueOf("a"), 
"a".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("b"), 
"b".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("c"), 
"c".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("d"), 
"d".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("e"), 
"e".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+
+      
doTestMessageEncodingForTunneledCoreLargeMessageAcrossFrames(annotations, 
headersBytes, payloadBytes);
+   }
+
+   @Test
+   public void 
testMessageEncodingWrittenToDeliveryWithAllSectionsExceedFrameSize() throws 
Exception {
+      
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(SMALL_OUTGOING_FRAME_SIZE_LIMIT);
+
+      final byte[] headersBytes = 
"AA".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT).getBytes(StandardCharsets.US_ASCII);
+      final byte[] payloadBytes = 
"BB".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT).getBytes(StandardCharsets.US_ASCII);
+      final DeliveryAnnotations annotations = new DeliveryAnnotations(new 
HashMap<>());
+
+      annotations.getValue().put(Symbol.valueOf("a"), 
"a".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("b"), 
"b".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("c"), 
"c".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("d"), 
"d".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+      annotations.getValue().put(Symbol.valueOf("e"), 
"e".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+
+      
doTestMessageEncodingForTunneledCoreLargeMessageAcrossFrames(annotations, 
headersBytes, payloadBytes);
+   }
+
+   private void 
doTestMessageEncodingForTunneledCoreLargeMessageAcrossFrames(DeliveryAnnotations
 annotations, byte[] headersBytes, byte[] payloadBytes) throws Exception {
+      assertEquals(protonTransport.getOutboundFrameSizeLimit(), 
SMALL_OUTGOING_FRAME_SIZE_LIMIT);
+
+      AMQPTunneledCoreLargeMessageWriter writer = new 
AMQPTunneledCoreLargeMessageWriter(serverSender);
+
+      writer.open(Mockito.mock(MessageReference.class));
+
+      final ByteBuf expectedEncoding = Unpooled.buffer();
+      final AtomicInteger payloadReaderPosition = new AtomicInteger();
+
+      writeDeliveryAnnotations(expectedEncoding, annotations);
+
+      when(reference.getProtocolData(any())).thenReturn(annotations);
+
+      writeDataSection(expectedEncoding, headersBytes);
+      writeDataSection(expectedEncoding, payloadBytes);
+
+      when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE);
+      when(protonDelivery.isPartial()).thenReturn(true);
+      
when(message.getHeadersAndPropertiesEncodeSize()).thenReturn(headersBytes.length);
+
+      // Provides the simulated encoded core headers and properties
+      doAnswer(invocation -> {
+         final ByteBuf buffer = invocation.getArgument(0);
+
+         buffer.writeBytes(headersBytes);
+
+         return null;
+      }).when(message).encodeHeadersAndProperties(any(ByteBuf.class));
+
+      when(bodyReader.getSize()).thenReturn((long) payloadBytes.length);
+
+      final ByteBuf encodedByteBuf = Unpooled.buffer();
+      final NettyWritable encodedBytes = new NettyWritable(encodedByteBuf);
+
+      // Answer back with the amount of writable bytes
+      doAnswer(invocation -> {
+         final ByteBuffer buffer = invocation.getArgument(0);
+
+         final int readSize = Math.min(buffer.remaining(), payloadBytes.length 
- payloadReaderPosition.get());
+
+         buffer.put(payloadBytes, payloadReaderPosition.get(), readSize);
+
+         payloadReaderPosition.addAndGet(readSize);
+
+         return readSize;
+      }).when(bodyReader).readInto(any());
+
+      final AtomicInteger estimatedFrameCount = new AtomicInteger();
+
+      // Capture the write for comparison, this avoid issues with released 
netty buffers
+      doAnswer(invocation -> {
+         final ReadableBuffer buffer = invocation.getArgument(0);
+
+         encodedBytes.put(buffer);
+
+         // Should call send multiple times to dispatch the split frame writes.
+         estimatedFrameCount.incrementAndGet();
+
+         return null;
+      }).when(protonSender).send(any());
+
+      try {
+         writer.writeBytes(reference);
+      } catch (IllegalStateException e) {
+         fail("Should not throw as the delivery is completed so no data should 
be written.");
+      }
+
+      verify(message).usageUp();
+      verify(message).getLargeBodyReader();
+      verify(message).getHeadersAndPropertiesEncodeSize();
+      verify(message).encodeHeadersAndProperties(any(ByteBuf.class));
+      verify(reference).getMessage();
+      verify(reference).getProtocolData(any());
+      verify(protonSender).getSession();
+      verify(protonDelivery).getTag();
+      verify(protonSender, atLeastOnce()).getLocalState();
+      verify(protonSender, atLeastOnce()).send(any(ReadableBuffer.class));
+
+      assertTrue(estimatedFrameCount.get() > 1);
+      assertTrue(encodedByteBuf.isReadable());
+      assertEquals(expectedEncoding.readableBytes(), 
encodedByteBuf.readableBytes());
+      assertEquals(expectedEncoding, encodedByteBuf);
+
+      verifyNoMoreInteractions(message);
+      verifyNoMoreInteractions(reference);
+      verifyNoMoreInteractions(protonDelivery);
+   }
+
    @Test
    public void testLargeMessageUsageLoweredOnCloseWhenWriteNotCompleted() 
throws Exception {
       AMQPTunneledCoreLargeMessageWriter writer = new 
AMQPTunneledCoreLargeMessageWriter(serverSender);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to