Repository: activemq-artemis
Updated Branches:
  refs/heads/master f60d50c8a -> 969983cf1


ARTEMIS-2045 Add support for setting delivery annotations on outgoing message


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d3233e45
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d3233e45
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d3233e45

Branch: refs/heads/master
Commit: d3233e45f6adf258d2d02be70f3f5e297caa3e8f
Parents: f60d50c
Author: Carsten Lohmann <[email protected]>
Authored: Tue Oct 2 16:45:18 2018 +0200
Committer: Justin Bertram <[email protected]>
Committed: Mon Dec 17 10:30:12 2018 -0600

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       | 63 ++++++++++++----
 .../protocol/amqp/broker/AMQPMessageTest.java   | 76 ++++++++++++++++++++
 2 files changed, 126 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d3233e45/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 990ec3a..14606a4 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -60,6 +60,7 @@ import 
org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.DroppingWritableBuffer;
 import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.TypeConstructor;
@@ -121,6 +122,7 @@ public class AMQPMessage extends RefCountMessage {
    private String connectionID;
    private final CoreMessageObjectPools coreMessageObjectPools;
    private Set<Object> rejectedConsumers;
+   private DeliveryAnnotations deliveryAnnotationsForSendBuffer;
 
    // These are properties set at the broker level and carried only internally 
by broker storage.
    private volatile TypedProperties extraProperties;
@@ -240,6 +242,21 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    /**
+    * Sets the delivery annotations to be included when encoding the message 
for sending it on the wire.
+    *
+    * The broker can add additional message annotations as long as the 
annotations being added follow the
+    * rules from the spec. If the user adds something that the remote doesn't 
understand and it is not
+    * prefixed with "x-opt" the remote can just kill the link. See:
+    *
+    *     
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-annotations
+    *
+    * @param deliveryAnnotations delivery annotations used in the sendBuffer() 
method
+    */
+   public void setDeliveryAnnotationsForSendBuffer(DeliveryAnnotations 
deliveryAnnotations) {
+      this.deliveryAnnotationsForSendBuffer = deliveryAnnotations;
+   }
+
+   /**
     * Returns a copy of the DeliveryAnnotations in the message if present or 
null.  Changes to the
     * returned MessageAnnotations instance do not affect the original Message.
     *
@@ -545,26 +562,27 @@ public class AMQPMessage extends RefCountMessage {
 
       if (deliveryCount > 1) {
          return createCopyWithNewDeliveryCount(deliveryCount);
-      } else if (deliveryAnnotationsPosition != VALUE_NOT_PRESENT) {
-         return createCopyWithoutDeliveryAnnotations();
+      } else if (deliveryAnnotationsPosition != VALUE_NOT_PRESENT
+         || (deliveryAnnotationsForSendBuffer != null && 
!deliveryAnnotationsForSendBuffer.getValue().isEmpty())) {
+         return createCopyWithSkippedOrExplicitlySetDeliveryAnnotations();
       } else {
-         // Common case message has no delivery annotations and this is the 
first delivery
-         // so no re-encoding or section skipping needed.
+         // Common case message has no delivery annotations, no delivery 
annotations for the send buffer were set
+         // and this is the first delivery so no re-encoding or section 
skipping needed.
          return data.duplicate();
       }
    }
 
-   private ReadableBuffer createCopyWithoutDeliveryAnnotations() {
-      assert deliveryAnnotationsPosition != VALUE_NOT_PRESENT;
-
-      // The original message had delivery annotations and so we must copy 
into a new
-      // buffer skipping the delivery annotations section as that is not meant 
to survive
-      // beyond this hop.
+   private ReadableBuffer 
createCopyWithSkippedOrExplicitlySetDeliveryAnnotations() {
+      // The original message had delivery annotations, or delivery 
annotations for the send buffer are set.
+      // That means we must copy into a new buffer skipping the original 
delivery annotations section
+      // (not meant to survive beyond this hop) and including the delivery 
annotations for the send buffer if set.
       ReadableBuffer duplicate = data.duplicate();
 
       final ByteBuf result = 
PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
       result.writeBytes(duplicate.limit(encodedHeaderSize).byteBuffer());
+      writeDeliveryAnnotationsForSendBuffer(result);
       duplicate.clear();
+      // skip existing delivery annotations of the original message
       duplicate.position(encodedHeaderSize + encodedDeliveryAnnotationsSize);
       result.writeBytes(duplicate.byteBuffer());
 
@@ -594,8 +612,8 @@ public class AMQPMessage extends RefCountMessage {
       TLSEncode.getEncoder().writeObject(header);
       TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
 
-      // This will skip any existing delivery annotations that might have been 
present
-      // in the original message.
+      writeDeliveryAnnotationsForSendBuffer(result);
+      // skip existing delivery annotations of the original message
       data.position(encodedHeaderSize + encodedDeliveryAnnotationsSize);
       result.writeBytes(data.byteBuffer());
       data.position(0);
@@ -603,6 +621,25 @@ public class AMQPMessage extends RefCountMessage {
       return new NettyReadable(result);
    }
 
+   private void writeDeliveryAnnotationsForSendBuffer(ByteBuf result) {
+      if (deliveryAnnotationsForSendBuffer != null && 
!deliveryAnnotationsForSendBuffer.getValue().isEmpty()) {
+         TLSEncode.getEncoder().setByteBuffer(new NettyWritable(result));
+         TLSEncode.getEncoder().writeObject(deliveryAnnotationsForSendBuffer);
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+      }
+   }
+
+   private int getDeliveryAnnotationsForSendBufferSize() {
+      if (deliveryAnnotationsForSendBuffer == null || 
deliveryAnnotationsForSendBuffer.getValue().isEmpty()) {
+         return 0;
+      }
+      DroppingWritableBuffer droppingWritableBuffer = new 
DroppingWritableBuffer();
+      TLSEncode.getEncoder().setByteBuffer(droppingWritableBuffer);
+      TLSEncode.getEncoder().writeObject(deliveryAnnotationsForSendBuffer);
+      TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+      return droppingWritableBuffer.position() + 1;
+   }
+
    @Override
    public void messageChanged() {
       modified = true;
@@ -632,7 +669,7 @@ public class AMQPMessage extends RefCountMessage {
    public int getEncodeSize() {
       ensureDataIsValid();
       // The encoded size will exclude any delivery annotations that are 
present as we will clip them.
-      return data.remaining() - encodedDeliveryAnnotationsSize;
+      return data.remaining() - encodedDeliveryAnnotationsSize + 
getDeliveryAnnotationsForSendBufferSize();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d3233e45/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
index 12b91ea..88414b4 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -1920,6 +1920,82 @@ public class AMQPMessageTest {
       }
    }
 
+   @Test
+   public void testGetSendBufferWithoutDeliveryAnnotations() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      Header header = new Header();
+      header.setDeliveryCount(new UnsignedInteger(1));
+      protonMessage.setHeader(header);
+      Properties properties = new Properties();
+      properties.setTo("someNiceLocal");
+      protonMessage.setProperties(properties);
+      protonMessage.setBody(new AmqpValue("Sample payload"));
+
+      DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new 
HashMap<>());
+      final String annotationKey = "annotationKey";
+      final String annotationValue = "annotationValue";
+      deliveryAnnotations.getValue().put(Symbol.getSymbol(annotationKey), 
annotationValue);
+      protonMessage.setDeliveryAnnotations(deliveryAnnotations);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      ReadableBuffer sendBuffer = decoded.getSendBuffer(1);
+      assertEquals(decoded.getEncodeSize(), sendBuffer.capacity());
+      AMQPMessage msgFromSendBuffer = new AMQPMessage(0, sendBuffer, null, 
null);
+      assertEquals("someNiceLocal", msgFromSendBuffer.getAddress());
+      assertNull(msgFromSendBuffer.getDeliveryAnnotations());
+
+      // again with higher deliveryCount
+      ReadableBuffer sendBuffer2 = decoded.getSendBuffer(5);
+      assertEquals(decoded.getEncodeSize(), sendBuffer2.capacity());
+      AMQPMessage msgFromSendBuffer2 = new AMQPMessage(0, sendBuffer2, null, 
null);
+      assertEquals("someNiceLocal", msgFromSendBuffer2.getAddress());
+      assertNull(msgFromSendBuffer2.getDeliveryAnnotations());
+   }
+
+   @Test
+   public void testGetSendBufferWithDeliveryAnnotations() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      Header header = new Header();
+      header.setDeliveryCount(new UnsignedInteger(1));
+      protonMessage.setHeader(header);
+      Properties properties = new Properties();
+      properties.setTo("someNiceLocal");
+      protonMessage.setProperties(properties);
+      protonMessage.setBody(new AmqpValue("Sample payload"));
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      DeliveryAnnotations newDeliveryAnnotations = new DeliveryAnnotations(new 
HashMap<>());
+      final String annotationKey = "annotationKey";
+      final String annotationValue = "annotationValue";
+      newDeliveryAnnotations.getValue().put(Symbol.getSymbol(annotationKey), 
annotationValue);
+      decoded.setDeliveryAnnotationsForSendBuffer(newDeliveryAnnotations);
+
+      ReadableBuffer sendBuffer = decoded.getSendBuffer(1);
+      assertEquals(decoded.getEncodeSize(), sendBuffer.capacity());
+      AMQPMessage msgFromSendBuffer = new AMQPMessage(0, sendBuffer, null, 
null);
+      assertEquals("someNiceLocal", msgFromSendBuffer.getAddress());
+      assertNotNull(msgFromSendBuffer.getDeliveryAnnotations());
+      assertEquals(1, 
msgFromSendBuffer.getDeliveryAnnotations().getValue().size());
+      assertEquals(annotationValue, 
msgFromSendBuffer.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey)));
+
+      // again with higher deliveryCount
+      DeliveryAnnotations newDeliveryAnnotations2 = new 
DeliveryAnnotations(new HashMap<>());
+      final String annotationKey2 = "annotationKey2";
+      final String annotationValue2 = "annotationValue2";
+      newDeliveryAnnotations2.getValue().put(Symbol.getSymbol(annotationKey2), 
annotationValue2);
+      decoded.setDeliveryAnnotationsForSendBuffer(newDeliveryAnnotations2);
+
+      ReadableBuffer sendBuffer2 = decoded.getSendBuffer(5);
+      assertEquals(decoded.getEncodeSize(), sendBuffer2.capacity());
+      AMQPMessage msgFromSendBuffer2 = new AMQPMessage(0, sendBuffer2, null, 
null);
+      assertEquals("someNiceLocal", msgFromSendBuffer2.getAddress());
+      assertNotNull(msgFromSendBuffer2.getDeliveryAnnotations());
+      assertEquals(1, 
msgFromSendBuffer2.getDeliveryAnnotations().getValue().size());
+      assertEquals(annotationValue2, 
msgFromSendBuffer2.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey2)));
+   }
+
    //----- Test Support 
------------------------------------------------------//
 
    private MessageImpl createProtonMessage() {

Reply via email to