This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 329d963717d5e73e22d181e251fae20fc5c73809
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Tue Apr 16 14:02:35 2024 -0400

    ARTEMIS-4725 Fix AMQP outgoing encoding if da encoded before header
    
    Fix the AMQP message scanning to account for the header not being at the
    front of the buffer which also accounts for odd case of broker storing
    message with delivery annotations ahead of the header.
---
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |   4 +-
 .../protocol/amqp/broker/AMQPMessageTest.java      | 103 +++++++++++++++++++++
 2 files changed, 106 insertions(+), 1 deletion(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index e10ae2b9e2..a4922c2379 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -700,7 +700,7 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
             if (Header.class.equals(constructor.getTypeClass())) {
                header = (Header) constructor.readValue();
                headerPosition = constructorPos;
-               encodedHeaderSize = data.position();
+               encodedHeaderSize = data.position() - constructorPos;
                if (header.getTtl() != null) {
                   if (!expirationReload) {
                      expiration = System.currentTimeMillis() + 
header.getTtl().intValue();
@@ -778,6 +778,7 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
     * @return a Netty ByteBuf containing the encoded bytes of this Message 
instance.
     */
    public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference 
reference) {
+      ensureMessageDataScanned();
       ensureDataIsValid();
 
       DeliveryAnnotations daToWrite = reference != null ? 
reference.getProtocolData(DeliveryAnnotations.class) : null;
@@ -825,6 +826,7 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
       }
 
       writeDeliveryAnnotationsForSendBuffer(result, deliveryAnnotations);
+
       // skip existing delivery annotations of the original message
       duplicate.position(encodedHeaderSize + encodedDeliveryAnnotationsSize);
       result.writeBytes(duplicate.byteBuffer());
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
index 9114ef0bd1..a695141f32 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -48,6 +48,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import 
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
@@ -73,14 +74,18 @@ import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.EncodingCodes;
 import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -113,6 +118,12 @@ public class AMQPMessageTest {
 
    private byte[] encodedProtonMessage;
 
+   private final DecoderImpl decoder = new DecoderImpl();
+   private final EncoderImpl encoder = new EncoderImpl(decoder);
+   {
+      AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+   }
+
    @Before
    public void setUp() {
       encodedProtonMessage = encodeMessage(createProtonMessage());
@@ -2538,6 +2549,98 @@ public class AMQPMessageTest {
       assertEquals(map.get("secondLong"), 1234567L);
    }
 
+   @Test
+   public void testEncodedAMQPMessageHasReversedHeaderAndDA() throws Exception 
{
+      final Header header = new Header();
+      header.setDurable(true);
+
+      final DeliveryAnnotations deliveryAnnotations = new 
DeliveryAnnotations(new HashMap<>());
+      deliveryAnnotations.getValue().put(Symbol.valueOf("test-da"), "test-da");
+
+      final MessageAnnotations messageAnnotations = new MessageAnnotations(new 
HashMap<>());
+      messageAnnotations.getValue().put(Symbol.valueOf("test-ma"), "test-ma");
+
+      final ByteBuf nettyBuffer = Unpooled.buffer(1500);
+      WritableBuffer buffer = new NettyWritable(nettyBuffer);
+
+      final MessageReference reference = Mockito.mock(MessageReference.class);
+
+      try {
+         encoder.setByteBuffer(buffer);
+         encoder.writeObject(deliveryAnnotations);
+         encoder.writeObject(header);
+         encoder.writeObject(messageAnnotations);
+      } finally {
+         encoder.setByteBuffer((WritableBuffer) null);
+      }
+
+      final byte[] bytes = new byte[nettyBuffer.writerIndex()];
+      nettyBuffer.readBytes(bytes);
+
+      final AMQPMessage message = new AMQPStandardMessage(0, bytes, null);
+      final ReadableBuffer encoded = message.getSendBuffer(0, reference);
+
+      final Message protonMessage = Proton.message();
+      protonMessage.decode(encoded);
+
+      final Header readHeader = protonMessage.getHeader();
+      final DeliveryAnnotations readDeliveryAnnotations = 
protonMessage.getDeliveryAnnotations();
+      final MessageAnnotations readMessageAnnotations = 
protonMessage.getMessageAnnotations();
+
+      assertTrue(readHeader.getDurable());
+      assertNull(readDeliveryAnnotations);
+      assertNotNull(readMessageAnnotations);
+      assertEquals("test-ma", 
readMessageAnnotations.getValue().get(Symbol.valueOf("test-ma")));
+   }
+
+   @Test
+   public void 
testEncodedAMQPMessageHasReversedHeaderAndDAWithOutgoingDeliveryAnnotations() 
throws Exception {
+      final Header header = new Header();
+      header.setDurable(true);
+
+      final DeliveryAnnotations deliveryAnnotations = new 
DeliveryAnnotations(new HashMap<>());
+      deliveryAnnotations.getValue().put(Symbol.valueOf("test-da"), "test-da");
+
+      final MessageAnnotations messageAnnotations = new MessageAnnotations(new 
HashMap<>());
+      messageAnnotations.getValue().put(Symbol.valueOf("test-ma"), "test-ma");
+
+      final ByteBuf nettyBuffer = Unpooled.buffer(1500);
+      WritableBuffer buffer = new NettyWritable(nettyBuffer);
+
+      final MessageReference reference = Mockito.mock(MessageReference.class);
+      final DeliveryAnnotations deliveryAnnotationsOut = new 
DeliveryAnnotations(new HashMap<>());
+      deliveryAnnotationsOut.getValue().put(Symbol.valueOf("new-da"), 
"new-da");
+      
Mockito.when(reference.getProtocolData(DeliveryAnnotations.class)).thenReturn(deliveryAnnotationsOut);
+
+      try {
+         encoder.setByteBuffer(buffer);
+         encoder.writeObject(deliveryAnnotations);
+         encoder.writeObject(header);
+         encoder.writeObject(messageAnnotations);
+      } finally {
+         encoder.setByteBuffer((WritableBuffer) null);
+      }
+
+      final byte[] bytes = new byte[nettyBuffer.writerIndex()];
+      nettyBuffer.readBytes(bytes);
+
+      final AMQPMessage message = new AMQPStandardMessage(0, bytes, null);
+      final ReadableBuffer encoded = message.getSendBuffer(0, reference);
+
+      final Message protonMessage = Proton.message();
+      protonMessage.decode(encoded);
+
+      final Header readHeader = protonMessage.getHeader();
+      final DeliveryAnnotations readDeliveryAnnotations = 
protonMessage.getDeliveryAnnotations();
+      final MessageAnnotations readMessageAnnotations = 
protonMessage.getMessageAnnotations();
+
+      assertTrue(readHeader.getDurable());
+      assertNotNull(readDeliveryAnnotations);
+      assertEquals("new-da", 
readDeliveryAnnotations.getValue().get(Symbol.valueOf("new-da")));
+      assertNotNull(readMessageAnnotations);
+      assertEquals("test-ma", 
readMessageAnnotations.getValue().get(Symbol.valueOf("test-ma")));
+   }
+
    //----- Test Support 
------------------------------------------------------//
 
    private MessageImpl createProtonMessage() {

Reply via email to