This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 108d1ce6e2 ARTEMIS-5689 Fix tunneled core large message writer encoding
108d1ce6e2 is described below
commit 108d1ce6e20fab0b0335fe7e1a5a0bf0b550823a
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Oct 3 14:41:01 2025 -0400
ARTEMIS-5689 Fix tunneled core large message writer encoding
Ensure that the read size from the delivery annotations and the core
header and properties encoding buffer is calculated correctly so that
when spanning multiple frames an error is not thrown from the copy.
---
.../proton/AMQPTunneledCoreLargeMessageWriter.java | 12 +-
.../AMQPTunneledCoreLargeMessageWriterTest.java | 185 +++++++++++++++++++++
2 files changed, 188 insertions(+), 9 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
index a9d53eb054..649eb82138 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
@@ -219,13 +219,11 @@ public class AMQPTunneledCoreLargeMessageWriter
implements MessageWriter {
for (; protonSender.getLocalState() != EndpointState.CLOSED && state ==
State.STREAMING_DELIVERY_ANNOTATIONS; ) {
if (annotations != null && annotations.getValue() != null &&
!annotations.getValue().isEmpty()) {
if (!connection.flowControl(this::resume)) {
- break; // Resume will restart writing the headers section from
where we left off.
+ break; // Resume will restart writing the delivery annotations
section from where we left off.
}
final ByteBuf annotationsBuffer =
getOrCreateDeliveryAnnotationsBuffer();
- final int readSize = (int) Math.min(frameBuffer.writableBytes(),
annotationsBuffer.readableBytes() - position);
-
- position += readSize;
+ final int readSize = Math.min(frameBuffer.writableBytes(),
annotationsBuffer.readableBytes());
annotationsBuffer.readBytes(frameBuffer, readSize);
@@ -239,7 +237,6 @@ public class AMQPTunneledCoreLargeMessageWriter implements
MessageWriter {
if (!annotationsBuffer.isReadable()) {
encodingBuffer = null;
- position = 0;
state = State.STREAMING_CORE_HEADERS;
}
} else {
@@ -259,9 +256,7 @@ public class AMQPTunneledCoreLargeMessageWriter implements
MessageWriter {
}
final ByteBuf headerBuffer = getOrCreateMessageHeaderBuffer();
- final int readSize = (int) Math.min(frameBuffer.writableBytes(),
headerBuffer.readableBytes() - position);
-
- position += readSize;
+ final int readSize = Math.min(frameBuffer.writableBytes(),
headerBuffer.readableBytes());
headerBuffer.readBytes(frameBuffer, readSize);
@@ -275,7 +270,6 @@ public class AMQPTunneledCoreLargeMessageWriter implements
MessageWriter {
if (!headerBuffer.isReadable()) {
encodingBuffer = null;
- position = 0;
state = State.STREAMING_BODY;
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
index 8ec31f3dba..5e9651aa5c 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
@@ -30,7 +30,10 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
@@ -68,6 +71,7 @@ import io.netty.buffer.Unpooled;
public class AMQPTunneledCoreLargeMessageWriterTest {
private static final byte DATA_DESCRIPTOR = 0x75;
+ private final int SMALL_OUTGOING_FRAME_SIZE_LIMIT = 2048;
@Mock
ProtonServerSenderContext serverSender;
@@ -273,6 +277,187 @@ public class AMQPTunneledCoreLargeMessageWriterTest {
verifyNoMoreInteractions(protonDelivery);
}
+ @Test
+ public void
testMessageEncodingWrittenToDeliveryWithDeliveryAnnotationsThatExceedFrameSize()
throws Exception {
+
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(SMALL_OUTGOING_FRAME_SIZE_LIMIT);
+
+ final byte[] headersBytes = new byte[4];
+
+ headersBytes[0] = 4;
+ headersBytes[1] = 5;
+ headersBytes[2] = 6;
+ headersBytes[3] = 7;
+
+ final byte[] payloadBytes = new byte[4];
+
+ payloadBytes[0] = 1;
+ payloadBytes[1] = 2;
+ payloadBytes[2] = 3;
+ payloadBytes[3] = 4;
+
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(new
HashMap<>());
+
+ annotations.getValue().put(Symbol.valueOf("a"),
"a".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("b"),
"b".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("c"),
"c".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("d"),
"d".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("e"),
"e".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+
+
doTestMessageEncodingForTunneledCoreLargeMessageAcrossFrames(annotations,
headersBytes, payloadBytes);
+ }
+
+ @Test
+ public void
testMessageEncodingWrittenToDeliveryWithCoreHeaderEncodingThatExceedsFrameSize()
throws Exception {
+
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(SMALL_OUTGOING_FRAME_SIZE_LIMIT);
+
+ final byte[] headersBytes = "AA".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT
+ 1).getBytes(StandardCharsets.US_ASCII);
+ final byte[] payloadBytes = new byte[4];
+
+ payloadBytes[0] = 1;
+ payloadBytes[1] = 2;
+ payloadBytes[2] = 3;
+ payloadBytes[3] = 4;
+
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(new
HashMap<>());
+
+ annotations.getValue().put(Symbol.valueOf("a"), "a");
+ annotations.getValue().put(Symbol.valueOf("b"), "b");
+ annotations.getValue().put(Symbol.valueOf("c"), "c");
+ annotations.getValue().put(Symbol.valueOf("d"), "d");
+ annotations.getValue().put(Symbol.valueOf("e"), "e");
+
+
doTestMessageEncodingForTunneledCoreLargeMessageAcrossFrames(annotations,
headersBytes, payloadBytes);
+ }
+
+ @Test
+ public void
testMessageEncodingWrittenToDeliveryWithBothDAandCoreHeadersExceedingFrameSize()
throws Exception {
+
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(SMALL_OUTGOING_FRAME_SIZE_LIMIT);
+
+ final byte[] headersBytes =
"AA".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT).getBytes(StandardCharsets.US_ASCII);
+ final byte[] payloadBytes = new byte[4];
+
+ payloadBytes[0] = 1;
+ payloadBytes[1] = 2;
+ payloadBytes[2] = 3;
+ payloadBytes[3] = 4;
+
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(new
HashMap<>());
+
+ annotations.getValue().put(Symbol.valueOf("a"),
"a".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("b"),
"b".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("c"),
"c".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("d"),
"d".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("e"),
"e".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+
+
doTestMessageEncodingForTunneledCoreLargeMessageAcrossFrames(annotations,
headersBytes, payloadBytes);
+ }
+
+ @Test
+ public void
testMessageEncodingWrittenToDeliveryWithAllSectionsExceedFrameSize() throws
Exception {
+
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(SMALL_OUTGOING_FRAME_SIZE_LIMIT);
+
+ final byte[] headersBytes =
"AA".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT).getBytes(StandardCharsets.US_ASCII);
+ final byte[] payloadBytes =
"BB".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT).getBytes(StandardCharsets.US_ASCII);
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(new
HashMap<>());
+
+ annotations.getValue().put(Symbol.valueOf("a"),
"a".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("b"),
"b".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("c"),
"c".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("d"),
"d".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+ annotations.getValue().put(Symbol.valueOf("e"),
"e".repeat(SMALL_OUTGOING_FRAME_SIZE_LIMIT));
+
+
doTestMessageEncodingForTunneledCoreLargeMessageAcrossFrames(annotations,
headersBytes, payloadBytes);
+ }
+
+ private void
doTestMessageEncodingForTunneledCoreLargeMessageAcrossFrames(DeliveryAnnotations
annotations, byte[] headersBytes, byte[] payloadBytes) throws Exception {
+ assertEquals(protonTransport.getOutboundFrameSizeLimit(),
SMALL_OUTGOING_FRAME_SIZE_LIMIT);
+
+ AMQPTunneledCoreLargeMessageWriter writer = new
AMQPTunneledCoreLargeMessageWriter(serverSender);
+
+ writer.open(Mockito.mock(MessageReference.class));
+
+ final ByteBuf expectedEncoding = Unpooled.buffer();
+ final AtomicInteger payloadReaderPosition = new AtomicInteger();
+
+ writeDeliveryAnnotations(expectedEncoding, annotations);
+
+ when(reference.getProtocolData(any())).thenReturn(annotations);
+
+ writeDataSection(expectedEncoding, headersBytes);
+ writeDataSection(expectedEncoding, payloadBytes);
+
+ when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE);
+ when(protonDelivery.isPartial()).thenReturn(true);
+
when(message.getHeadersAndPropertiesEncodeSize()).thenReturn(headersBytes.length);
+
+ // Provides the simulated encoded core headers and properties
+ doAnswer(invocation -> {
+ final ByteBuf buffer = invocation.getArgument(0);
+
+ buffer.writeBytes(headersBytes);
+
+ return null;
+ }).when(message).encodeHeadersAndProperties(any(ByteBuf.class));
+
+ when(bodyReader.getSize()).thenReturn((long) payloadBytes.length);
+
+ final ByteBuf encodedByteBuf = Unpooled.buffer();
+ final NettyWritable encodedBytes = new NettyWritable(encodedByteBuf);
+
+ // Answer back with the amount of writable bytes
+ doAnswer(invocation -> {
+ final ByteBuffer buffer = invocation.getArgument(0);
+
+ final int readSize = Math.min(buffer.remaining(), payloadBytes.length
- payloadReaderPosition.get());
+
+ buffer.put(payloadBytes, payloadReaderPosition.get(), readSize);
+
+ payloadReaderPosition.addAndGet(readSize);
+
+ return readSize;
+ }).when(bodyReader).readInto(any());
+
+ final AtomicInteger estimatedFrameCount = new AtomicInteger();
+
+ // Capture the write for comparison, this avoid issues with released
netty buffers
+ doAnswer(invocation -> {
+ final ReadableBuffer buffer = invocation.getArgument(0);
+
+ encodedBytes.put(buffer);
+
+ // Should call send multiple times to dispatch the split frame writes.
+ estimatedFrameCount.incrementAndGet();
+
+ return null;
+ }).when(protonSender).send(any());
+
+ try {
+ writer.writeBytes(reference);
+ } catch (IllegalStateException e) {
+ fail("Should not throw as the delivery is completed so no data should
be written.");
+ }
+
+ verify(message).usageUp();
+ verify(message).getLargeBodyReader();
+ verify(message).getHeadersAndPropertiesEncodeSize();
+ verify(message).encodeHeadersAndProperties(any(ByteBuf.class));
+ verify(reference).getMessage();
+ verify(reference).getProtocolData(any());
+ verify(protonSender).getSession();
+ verify(protonDelivery).getTag();
+ verify(protonSender, atLeastOnce()).getLocalState();
+ verify(protonSender, atLeastOnce()).send(any(ReadableBuffer.class));
+
+ assertTrue(estimatedFrameCount.get() > 1);
+ assertTrue(encodedByteBuf.isReadable());
+ assertEquals(expectedEncoding.readableBytes(),
encodedByteBuf.readableBytes());
+ assertEquals(expectedEncoding, encodedByteBuf);
+
+ verifyNoMoreInteractions(message);
+ verifyNoMoreInteractions(reference);
+ verifyNoMoreInteractions(protonDelivery);
+ }
+
@Test
public void testLargeMessageUsageLoweredOnCloseWhenWriteNotCompleted()
throws Exception {
AMQPTunneledCoreLargeMessageWriter writer = new
AMQPTunneledCoreLargeMessageWriter(serverSender);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact