This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 43336b3e57 ARTEMIS-5696 Ensure that staged bytes are not lost during
flow control
43336b3e57 is described below
commit 43336b3e5770e00f21a36dde500a3ccb90cbe8d7
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Oct 7 16:35:00 2025 -0400
ARTEMIS-5696 Ensure that staged bytes are not lost during flow control
If the connection enters flow control and there are pending bytes in the
frame
buffer of the tunneled core large message writer we need to move them into
the
sender before releasing the frame buffer to avoid losing them.
---
.../proton/AMQPTunneledCoreLargeMessageWriter.java | 23 +-
.../AMQPTunneledCoreLargeMessageWriterTest.java | 271 ++++++++++++++++++++-
2 files changed, 288 insertions(+), 6 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
index 649eb82138..b5da502f4c 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
@@ -31,6 +31,7 @@ import
org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
+import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
@@ -218,8 +219,8 @@ public class AMQPTunneledCoreLargeMessageWriter implements
MessageWriter {
private boolean trySendDeliveryAnnotations(ByteBuf frameBuffer,
NettyReadable frameView) {
for (; protonSender.getLocalState() != EndpointState.CLOSED && state ==
State.STREAMING_DELIVERY_ANNOTATIONS; ) {
if (annotations != null && annotations.getValue() != null &&
!annotations.getValue().isEmpty()) {
- if (!connection.flowControl(this::resume)) {
- break; // Resume will restart writing the delivery annotations
section from where we left off.
+ if (isFlowControlled(frameBuffer, frameView)) {
+ break;
}
final ByteBuf annotationsBuffer =
getOrCreateDeliveryAnnotationsBuffer();
@@ -251,8 +252,8 @@ public class AMQPTunneledCoreLargeMessageWriter implements
MessageWriter {
// data could be sent due to a flow control event.
private boolean trySendHeadersAndProperties(ByteBuf frameBuffer,
NettyReadable frameView) {
for (; protonSender.getLocalState() != EndpointState.CLOSED && state ==
State.STREAMING_CORE_HEADERS; ) {
- if (!connection.flowControl(this::resume)) {
- break; // Resume will restart writing the headers section from
where we left off.
+ if (isFlowControlled(frameBuffer, frameView)) {
+ break;
}
final ByteBuf headerBuffer = getOrCreateMessageHeaderBuffer();
@@ -287,7 +288,7 @@ public class AMQPTunneledCoreLargeMessageWriter implements
MessageWriter {
final long bodySize = context.getSize();
for (; protonSender.getLocalState() != EndpointState.CLOSED && state
== State.STREAMING_BODY; ) {
- if (!connection.flowControl(this::resume)) {
+ if (isFlowControlled(frameBuffer, frameView)) {
break;
}
@@ -364,6 +365,18 @@ public class AMQPTunneledCoreLargeMessageWriter implements
MessageWriter {
}
}
+ private boolean isFlowControlled(ByteBuf frameBuffer, ReadableBuffer
frameView) {
+ if (!connection.flowControl(this::resume)) {
+ if (frameBuffer.isReadable()) {
+ protonSender.send(frameView); // Store inflight data in the sender
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
private void writeDataSectionTypeInfo(ByteBuf buffer, int encodedSize) {
buffer.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
buffer.writeByte(EncodingCodes.SMALLULONG);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
index 5e9651aa5c..41b724eefb 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
@@ -18,6 +18,7 @@
package org.apache.activemq.artemis.protocol.amqp.proton;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -32,7 +33,9 @@ import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
@@ -41,6 +44,7 @@ import
org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.codec.EncoderImpl;
@@ -71,7 +75,8 @@ import io.netty.buffer.Unpooled;
public class AMQPTunneledCoreLargeMessageWriterTest {
private static final byte DATA_DESCRIPTOR = 0x75;
- private final int SMALL_OUTGOING_FRAME_SIZE_LIMIT = 2048;
+ private static final int SMALL_OUTGOING_FRAME_SIZE_LIMIT = 2048;
+ private static final int LARGE_OUTGOING_FRAME_SIZE_LIMIT = 65535;
@Mock
ProtonServerSenderContext serverSender;
@@ -494,6 +499,270 @@ public class AMQPTunneledCoreLargeMessageWriterTest {
verifyNoMoreInteractions(protonDelivery);
}
+ @Test
+ public void
testFlowControlEncounteredDuringCoreHeaderEncodeIntoFrameBufferWithDataPending()
throws Exception {
+
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(LARGE_OUTGOING_FRAME_SIZE_LIMIT);
+
+ final byte[] headersBytes = new byte[4];
+
+ headersBytes[0] = 4;
+ headersBytes[1] = 5;
+ headersBytes[2] = 6;
+ headersBytes[3] = 7;
+
+ final byte[] payloadBytes = new byte[4];
+
+ payloadBytes[0] = 1;
+ payloadBytes[1] = 2;
+ payloadBytes[2] = 3;
+ payloadBytes[3] = 4;
+
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(new
HashMap<>());
+
+ annotations.getValue().put(Symbol.valueOf("a"), "a");
+ annotations.getValue().put(Symbol.valueOf("b"), "b");
+ annotations.getValue().put(Symbol.valueOf("c"), "c");
+
+ AMQPTunneledCoreLargeMessageWriter writer = new
AMQPTunneledCoreLargeMessageWriter(serverSender);
+
+ writer.open(Mockito.mock(MessageReference.class));
+
+ final ByteBuf expectedEncoding = Unpooled.buffer();
+ final AtomicInteger payloadReaderPosition = new AtomicInteger();
+
+ writeDeliveryAnnotations(expectedEncoding, annotations);
+
+ when(reference.getProtocolData(any())).thenReturn(annotations);
+
+ writeDataSection(expectedEncoding, headersBytes);
+ writeDataSection(expectedEncoding, payloadBytes);
+
+ when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE);
+ when(protonDelivery.isPartial()).thenReturn(true);
+
when(message.getHeadersAndPropertiesEncodeSize()).thenReturn(headersBytes.length);
+
+ // Provides the simulated encoded core headers and properties
+ doAnswer(invocation -> {
+ final ByteBuf buffer = invocation.getArgument(0);
+
+ buffer.writeBytes(headersBytes);
+
+ return null;
+ }).when(message).encodeHeadersAndProperties(any(ByteBuf.class));
+
+ when(bodyReader.getSize()).thenReturn((long) payloadBytes.length);
+
+ final ByteBuf encodedByteBuf = Unpooled.buffer();
+ final NettyWritable encodedBytes = new NettyWritable(encodedByteBuf);
+
+ // Answer back with the amount of writable bytes
+ doAnswer(invocation -> {
+ final ByteBuffer buffer = invocation.getArgument(0);
+ final int readSize = Math.min(buffer.remaining(), payloadBytes.length
- payloadReaderPosition.get());
+
+ buffer.put(payloadBytes, payloadReaderPosition.get(), readSize);
+ payloadReaderPosition.addAndGet(readSize);
+
+ return readSize;
+ }).when(bodyReader).readInto(any());
+
+ final AtomicInteger flowControlCalls = new AtomicInteger();
+ final AtomicBoolean hasFlowControlled = new AtomicBoolean();
+
+ // Capture the write for comparison, this avoid issues with released
netty buffers
+ doAnswer(invocation -> {
+ final ReadableBuffer buffer = invocation.getArgument(0);
+
+ encodedBytes.put(buffer);
+
+ return null;
+ }).when(protonSender).send(any());
+
+ final AtomicReference<ReadyListener> flowControlResume = new
AtomicReference<>();
+
+ // To recover from flow control we need to run the callback
+ doAnswer(invocation -> {
+ final Runnable recovery = invocation.getArgument(0);
+ recovery.run();
+ return false;
+ }).when(connectionContext).runNow(any());
+
+ doAnswer(invocation -> {
+
+ flowControlResume.set(invocation.getArgument(0));
+
+ // Flow control on second call which is the write of Core headers and
properties which
+ // has the delivery annotations sitting in wait as they fit into the
frame buffer without
+ // need for a flush yet.
+ if (flowControlCalls.incrementAndGet() == 2 &&
!hasFlowControlled.getAndSet(true)) {
+ return false;
+ } else {
+ return true;
+ }
+ }).when(connectionContext).flowControl(any());
+
+ try {
+ writer.writeBytes(reference);
+ } catch (IllegalStateException e) {
+ fail("Should not throw from flow controlled write.");
+ }
+
+ assertNotNull(flowControlResume.get());
+
+ flowControlResume.get().readyForWriting();
+
+ verify(message).usageUp();
+ verify(message).getLargeBodyReader();
+ verify(message).getHeadersAndPropertiesEncodeSize();
+ verify(message).encodeHeadersAndProperties(any(ByteBuf.class));
+ verify(reference).getMessage();
+ verify(reference).getProtocolData(any());
+ verify(protonSender).getSession();
+ verify(protonDelivery).getTag();
+ verify(protonSender, atLeastOnce()).getLocalState();
+ verify(protonSender, atLeastOnce()).send(any(ReadableBuffer.class));
+
+ assertTrue(encodedByteBuf.isReadable());
+ assertEquals(expectedEncoding.readableBytes(),
encodedByteBuf.readableBytes());
+ assertEquals(expectedEncoding, encodedByteBuf);
+
+ verifyNoMoreInteractions(message);
+ verifyNoMoreInteractions(reference);
+ verifyNoMoreInteractions(protonDelivery);
+ }
+
+ @Test
+ public void
testFlowControlEncounteredDuringCoreBodyEncodeIntoFrameBufferWithDataPending()
throws Exception {
+
when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(LARGE_OUTGOING_FRAME_SIZE_LIMIT);
+
+ final byte[] headersBytes = new byte[4];
+
+ headersBytes[0] = 4;
+ headersBytes[1] = 5;
+ headersBytes[2] = 6;
+ headersBytes[3] = 7;
+
+ final byte[] payloadBytes = new byte[4];
+
+ payloadBytes[0] = 1;
+ payloadBytes[1] = 2;
+ payloadBytes[2] = 3;
+ payloadBytes[3] = 4;
+
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(new
HashMap<>());
+
+ annotations.getValue().put(Symbol.valueOf("a"), "a");
+ annotations.getValue().put(Symbol.valueOf("b"), "b");
+ annotations.getValue().put(Symbol.valueOf("c"), "c");
+
+ AMQPTunneledCoreLargeMessageWriter writer = new
AMQPTunneledCoreLargeMessageWriter(serverSender);
+
+ writer.open(Mockito.mock(MessageReference.class));
+
+ final ByteBuf expectedEncoding = Unpooled.buffer();
+ final AtomicInteger payloadReaderPosition = new AtomicInteger();
+
+ writeDeliveryAnnotations(expectedEncoding, annotations);
+
+ when(reference.getProtocolData(any())).thenReturn(annotations);
+
+ writeDataSection(expectedEncoding, headersBytes);
+ writeDataSection(expectedEncoding, payloadBytes);
+
+ when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE);
+ when(protonDelivery.isPartial()).thenReturn(true);
+
when(message.getHeadersAndPropertiesEncodeSize()).thenReturn(headersBytes.length);
+
+ // Provides the simulated encoded core headers and properties
+ doAnswer(invocation -> {
+ final ByteBuf buffer = invocation.getArgument(0);
+
+ buffer.writeBytes(headersBytes);
+
+ return null;
+ }).when(message).encodeHeadersAndProperties(any(ByteBuf.class));
+
+ when(bodyReader.getSize()).thenReturn((long) payloadBytes.length);
+
+ final ByteBuf encodedByteBuf = Unpooled.buffer();
+ final NettyWritable encodedBytes = new NettyWritable(encodedByteBuf);
+
+ // Answer back with the amount of writable bytes
+ doAnswer(invocation -> {
+ final ByteBuffer buffer = invocation.getArgument(0);
+ final int readSize = Math.min(buffer.remaining(), payloadBytes.length
- payloadReaderPosition.get());
+
+ buffer.put(payloadBytes, payloadReaderPosition.get(), readSize);
+ payloadReaderPosition.addAndGet(readSize);
+
+ return readSize;
+ }).when(bodyReader).readInto(any());
+
+ final AtomicInteger flowControlCalls = new AtomicInteger();
+ final AtomicBoolean hasFlowControlled = new AtomicBoolean();
+
+ // Capture the write for comparison, this avoid issues with released
netty buffers
+ doAnswer(invocation -> {
+ final ReadableBuffer buffer = invocation.getArgument(0);
+
+ encodedBytes.put(buffer);
+
+ return null;
+ }).when(protonSender).send(any());
+
+ final AtomicReference<ReadyListener> flowControlResume = new
AtomicReference<>();
+
+ // To recover from flow control we need to run the callback
+ doAnswer(invocation -> {
+ final Runnable recovery = invocation.getArgument(0);
+ recovery.run();
+ return false;
+ }).when(connectionContext).runNow(any());
+
+ doAnswer(invocation -> {
+
+ flowControlResume.set(invocation.getArgument(0));
+
+ // Flow control on third call which is the first step into the body
write from file which
+ // has the delivery annotations and core header encoding sitting in
wait as they fit into
+ // the frame buffer without need for a flush yet.
+ if (flowControlCalls.incrementAndGet() == 3 &&
!hasFlowControlled.getAndSet(true)) {
+ return false;
+ } else {
+ return true;
+ }
+ }).when(connectionContext).flowControl(any());
+
+ try {
+ writer.writeBytes(reference);
+ } catch (IllegalStateException e) {
+ fail("Should not throw from flow controlled write.");
+ }
+
+ assertNotNull(flowControlResume.get());
+
+ flowControlResume.get().readyForWriting();
+
+ verify(message).usageUp();
+ verify(message, atLeastOnce()).getLargeBodyReader();
+ verify(message).getHeadersAndPropertiesEncodeSize();
+ verify(message).encodeHeadersAndProperties(any(ByteBuf.class));
+ verify(reference).getMessage();
+ verify(reference).getProtocolData(any());
+ verify(protonSender).getSession();
+ verify(protonDelivery).getTag();
+ verify(protonSender, atLeastOnce()).getLocalState();
+ verify(protonSender, atLeastOnce()).send(any(ReadableBuffer.class));
+
+ assertTrue(encodedByteBuf.isReadable());
+ assertEquals(expectedEncoding.readableBytes(),
encodedByteBuf.readableBytes());
+ assertEquals(expectedEncoding, encodedByteBuf);
+
+ verifyNoMoreInteractions(message);
+ verifyNoMoreInteractions(reference);
+ verifyNoMoreInteractions(protonDelivery);
+ }
+
private void writeDeliveryAnnotations(ByteBuf buffer, DeliveryAnnotations
annotations) {
final EncoderImpl encoder = TLSEncode.getEncoder();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact