gemmellr commented on code in PR #5950:
URL: https://github.com/apache/activemq-artemis/pull/5950#discussion_r2401423584
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java:
##########
@@ -259,11 +257,10 @@ private boolean trySendHeadersAndProperties(ByteBuf
frameBuffer, NettyReadable f
}
final ByteBuf headerBuffer = getOrCreateMessageHeaderBuffer();
- final int readSize = (int) Math.min(frameBuffer.writableBytes(),
headerBuffer.readableBytes() - position);
-
- position += readSize;
+ final int readSize = Math.min(frameBuffer.writableBytes(),
headerBuffer.readableBytes());
- headerBuffer.readBytes(frameBuffer, readSize);
+
headerBuffer.readBytes(frameBuffer.internalNioBuffer(frameBuffer.writerIndex(),
readSize));
+ frameBuffer.writerIndex(frameBuffer.writerIndex() + readSize);
Review Comment:
Similarly
##########
artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java:
##########
@@ -273,6 +276,173 @@ private void
doTestMessageEncodingWrittenToDeliveryWithAnnotations(boolean deliv
verifyNoMoreInteractions(protonDelivery);
}
+ @Test
+ public void
testMessageEncodingWrittenToDeliveryWithDeliveryAnnotationsThatExceedFrameSize()
throws Exception {
+ final byte[] headersBytes = new byte[4];
+
+ headersBytes[0] = 4;
+ headersBytes[1] = 5;
+ headersBytes[2] = 6;
+ headersBytes[3] = 7;
+
+ final byte[] payloadBytes = new byte[4];
+
+ payloadBytes[0] = 1;
+ payloadBytes[1] = 2;
+ payloadBytes[2] = 3;
+ payloadBytes[3] = 4;
+
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(new
HashMap<>());
+
+ annotations.getValue().put(Symbol.valueOf("a"), "a".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("b"), "b".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("c"), "c".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("d"), "d".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("e"), "e".repeat(1024));
+
+ doTestMessageEncodingForTunneledCoreLargeMessage(annotations,
headersBytes, payloadBytes);
+ }
+
+ @Test
+ public void
testMessageEncodingWrittenToDeliveryWithCoreHeaderEncodingThatExceedsFrameSize()
throws Exception {
+ final byte[] headersBytes =
"A".repeat(4097).getBytes(StandardCharsets.US_ASCII);
+ final byte[] payloadBytes = new byte[4];
+
+ payloadBytes[0] = 1;
+ payloadBytes[1] = 2;
+ payloadBytes[2] = 3;
+ payloadBytes[3] = 4;
+
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(new
HashMap<>());
+
+ annotations.getValue().put(Symbol.valueOf("a"), "a");
+ annotations.getValue().put(Symbol.valueOf("b"), "b");
+ annotations.getValue().put(Symbol.valueOf("c"), "c");
+ annotations.getValue().put(Symbol.valueOf("d"), "d");
+ annotations.getValue().put(Symbol.valueOf("e"), "e");
+
+ doTestMessageEncodingForTunneledCoreLargeMessage(annotations,
headersBytes, payloadBytes);
+ }
+
+ @Test
+ public void
testMessageEncodingWrittenToDeliveryWithBothDAandCoreHeadersExceedingFrameSize()
throws Exception {
+ final byte[] headersBytes =
"A".repeat(4097).getBytes(StandardCharsets.US_ASCII);
+ final byte[] payloadBytes = new byte[4];
+
+ payloadBytes[0] = 1;
+ payloadBytes[1] = 2;
+ payloadBytes[2] = 3;
+ payloadBytes[3] = 4;
+
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(new
HashMap<>());
+
+ annotations.getValue().put(Symbol.valueOf("a"), "a".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("b"), "b".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("c"), "c".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("d"), "d".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("e"), "e".repeat(1024));
+
+ doTestMessageEncodingForTunneledCoreLargeMessage(annotations,
headersBytes, payloadBytes);
+ }
+
+ @Test
+ public void
testMessageEncodingWrittenToDeliveryWithAllSectionsExceedFrameSize() throws
Exception {
+ final byte[] headersBytes =
"A".repeat(4097).getBytes(StandardCharsets.US_ASCII);
+ final byte[] payloadBytes =
"B".repeat(4097).getBytes(StandardCharsets.US_ASCII);
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(new
HashMap<>());
+
+ annotations.getValue().put(Symbol.valueOf("a"), "a".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("b"), "b".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("c"), "c".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("d"), "d".repeat(1024));
+ annotations.getValue().put(Symbol.valueOf("e"), "e".repeat(1024));
+
+ doTestMessageEncodingForTunneledCoreLargeMessage(annotations,
headersBytes, payloadBytes);
+ }
+
+ private void
doTestMessageEncodingForTunneledCoreLargeMessage(DeliveryAnnotations
annotations, byte[] headersBytes, byte[] payloadBytes) throws Exception {
+ when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(4096);
Review Comment:
Making the 4096 a constant, and having the other tests assert the expected
components exceed it as they are prepared before calling this method, would
make the tests more easily understood later and more robust in case some
numskull happens to change the value later.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java:
##########
@@ -219,15 +219,14 @@ private boolean trySendDeliveryAnnotations(ByteBuf
frameBuffer, NettyReadable fr
for (; protonSender.getLocalState() != EndpointState.CLOSED && state ==
State.STREAMING_DELIVERY_ANNOTATIONS; ) {
if (annotations != null && annotations.getValue() != null &&
!annotations.getValue().isEmpty()) {
if (!connection.flowControl(this::resume)) {
- break; // Resume will restart writing the headers section from
where we left off.
+ break; // Resume will restart writing the delivery annotations
section from where we left off.
}
final ByteBuf annotationsBuffer =
getOrCreateDeliveryAnnotationsBuffer();
- final int readSize = (int) Math.min(frameBuffer.writableBytes(),
annotationsBuffer.readableBytes() - position);
+ final int readSize = Math.min(frameBuffer.writableBytes(),
annotationsBuffer.readableBytes());
- position += readSize;
-
- annotationsBuffer.readBytes(frameBuffer, readSize);
+
annotationsBuffer.readBytes(frameBuffer.internalNioBuffer(frameBuffer.writerIndex(),
readSize));
+ frameBuffer.writerIndex(frameBuffer.writerIndex() + readSize);
Review Comment:
Why use the internal buffer here? What am I missing hehe? It appears to be
asking for a _readSize_ section of the _frameBuffer_'s internal buffer, from
the point of its current 'external' writer index, then writes into that from
annotationsBuffer, updating that buffers reader index. Since that operation
doesnt adjust frameBuffers 'external' writer index, that is then manually
updated afterwards to bump it by readSize. I'm not seeing how that is
different from the previous _annotationsBuffer.readBytes(frameBuffer,
readSize)_ call that would transfer readSize bytes and update both the
annotationsBuffer reader index and the frameBuffer writer index?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact