gemmellr commented on code in PR #5950:
URL: https://github.com/apache/activemq-artemis/pull/5950#discussion_r2401423584


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java:
##########
@@ -259,11 +257,10 @@ private boolean trySendHeadersAndProperties(ByteBuf 
frameBuffer, NettyReadable f
          }
 
          final ByteBuf headerBuffer = getOrCreateMessageHeaderBuffer();
-         final int readSize = (int) Math.min(frameBuffer.writableBytes(), 
headerBuffer.readableBytes() - position);
-
-         position += readSize;
+         final int readSize = Math.min(frameBuffer.writableBytes(), 
headerBuffer.readableBytes());
 
-         headerBuffer.readBytes(frameBuffer, readSize);
+         
headerBuffer.readBytes(frameBuffer.internalNioBuffer(frameBuffer.writerIndex(), 
readSize));
+         frameBuffer.writerIndex(frameBuffer.writerIndex() + readSize);

Review Comment:
   Similarly



##########
artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java:
##########
@@ -273,6 +276,173 @@ private void 
doTestMessageEncodingWrittenToDeliveryWithAnnotations(boolean deliv
       verifyNoMoreInteractions(protonDelivery);
    }
 
+   @Test
+   public void 
testMessageEncodingWrittenToDeliveryWithDeliveryAnnotationsThatExceedFrameSize()
 throws Exception {
+      final byte[] headersBytes = new byte[4];
+
+      headersBytes[0] = 4;
+      headersBytes[1] = 5;
+      headersBytes[2] = 6;
+      headersBytes[3] = 7;
+
+      final byte[] payloadBytes = new byte[4];
+
+      payloadBytes[0] = 1;
+      payloadBytes[1] = 2;
+      payloadBytes[2] = 3;
+      payloadBytes[3] = 4;
+
+      final DeliveryAnnotations annotations = new DeliveryAnnotations(new 
HashMap<>());
+
+      annotations.getValue().put(Symbol.valueOf("a"), "a".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("b"), "b".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("c"), "c".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("d"), "d".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("e"), "e".repeat(1024));
+
+      doTestMessageEncodingForTunneledCoreLargeMessage(annotations, 
headersBytes, payloadBytes);
+   }
+
+   @Test
+   public void 
testMessageEncodingWrittenToDeliveryWithCoreHeaderEncodingThatExceedsFrameSize()
 throws Exception {
+      final byte[] headersBytes = 
"A".repeat(4097).getBytes(StandardCharsets.US_ASCII);
+      final byte[] payloadBytes = new byte[4];
+
+      payloadBytes[0] = 1;
+      payloadBytes[1] = 2;
+      payloadBytes[2] = 3;
+      payloadBytes[3] = 4;
+
+      final DeliveryAnnotations annotations = new DeliveryAnnotations(new 
HashMap<>());
+
+      annotations.getValue().put(Symbol.valueOf("a"), "a");
+      annotations.getValue().put(Symbol.valueOf("b"), "b");
+      annotations.getValue().put(Symbol.valueOf("c"), "c");
+      annotations.getValue().put(Symbol.valueOf("d"), "d");
+      annotations.getValue().put(Symbol.valueOf("e"), "e");
+
+      doTestMessageEncodingForTunneledCoreLargeMessage(annotations, 
headersBytes, payloadBytes);
+   }
+
+   @Test
+   public void 
testMessageEncodingWrittenToDeliveryWithBothDAandCoreHeadersExceedingFrameSize()
 throws Exception {
+      final byte[] headersBytes = 
"A".repeat(4097).getBytes(StandardCharsets.US_ASCII);
+      final byte[] payloadBytes = new byte[4];
+
+      payloadBytes[0] = 1;
+      payloadBytes[1] = 2;
+      payloadBytes[2] = 3;
+      payloadBytes[3] = 4;
+
+      final DeliveryAnnotations annotations = new DeliveryAnnotations(new 
HashMap<>());
+
+      annotations.getValue().put(Symbol.valueOf("a"), "a".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("b"), "b".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("c"), "c".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("d"), "d".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("e"), "e".repeat(1024));
+
+      doTestMessageEncodingForTunneledCoreLargeMessage(annotations, 
headersBytes, payloadBytes);
+   }
+
+   @Test
+   public void 
testMessageEncodingWrittenToDeliveryWithAllSectionsExceedFrameSize() throws 
Exception {
+      final byte[] headersBytes = 
"A".repeat(4097).getBytes(StandardCharsets.US_ASCII);
+      final byte[] payloadBytes = 
"B".repeat(4097).getBytes(StandardCharsets.US_ASCII);
+      final DeliveryAnnotations annotations = new DeliveryAnnotations(new 
HashMap<>());
+
+      annotations.getValue().put(Symbol.valueOf("a"), "a".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("b"), "b".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("c"), "c".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("d"), "d".repeat(1024));
+      annotations.getValue().put(Symbol.valueOf("e"), "e".repeat(1024));
+
+      doTestMessageEncodingForTunneledCoreLargeMessage(annotations, 
headersBytes, payloadBytes);
+   }
+
+   private void 
doTestMessageEncodingForTunneledCoreLargeMessage(DeliveryAnnotations 
annotations, byte[] headersBytes, byte[] payloadBytes) throws Exception {
+      when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(4096);

Review Comment:
   Making the 4096 a constant, and having the other tests assert the expected 
components exceed it as they are prepared before calling this method, would 
make the tests more easily understood later and more robust in case some 
numskull happens to change the value later.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java:
##########
@@ -219,15 +219,14 @@ private boolean trySendDeliveryAnnotations(ByteBuf 
frameBuffer, NettyReadable fr
       for (; protonSender.getLocalState() != EndpointState.CLOSED && state == 
State.STREAMING_DELIVERY_ANNOTATIONS; ) {
          if (annotations != null && annotations.getValue() != null && 
!annotations.getValue().isEmpty()) {
             if (!connection.flowControl(this::resume)) {
-               break; // Resume will restart writing the headers section from 
where we left off.
+               break; // Resume will restart writing the delivery annotations 
section from where we left off.
             }
 
             final ByteBuf annotationsBuffer = 
getOrCreateDeliveryAnnotationsBuffer();
-            final int readSize = (int) Math.min(frameBuffer.writableBytes(), 
annotationsBuffer.readableBytes() - position);
+            final int readSize = Math.min(frameBuffer.writableBytes(), 
annotationsBuffer.readableBytes());
 
-            position += readSize;
-
-            annotationsBuffer.readBytes(frameBuffer, readSize);
+            
annotationsBuffer.readBytes(frameBuffer.internalNioBuffer(frameBuffer.writerIndex(),
 readSize));
+            frameBuffer.writerIndex(frameBuffer.writerIndex() + readSize);

Review Comment:
   Why use the internal buffer here? What am I missing hehe? It appears to be 
asking for a _readSize_ section of the _frameBuffer_'s  internal buffer, from 
the point of its current 'external' writer index, then writes into that from 
annotationsBuffer, updating that buffers reader index. Since that operation 
doesnt adjust frameBuffers 'external' writer index, that is then manually 
updated afterwards to bump it by readSize.  I'm not seeing how that is 
different from the previous _annotationsBuffer.readBytes(frameBuffer, 
readSize)_ call that would transfer readSize bytes and update both the 
annotationsBuffer reader index and the frameBuffer writer index?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to