This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 4fe4220ff004e3c279dc9c68a839d7a7b5fd88b6
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue May 5 14:23:11 2020 -0400

    ARTEMIS-2372 / ARTEMIS-2740 Improving Message Annotations support in AMQP
    
    - when sending messages to DLQ or Expiry we now use x-opt legal names
    - we now support filtering thorugh annotations if using m. as a prefix.
    - enabling hyphenated_props: to allow m. as a prefix
---
 .../artemis/utils/collections/TypedProperties.java |   5 +
 .../apache/activemq/artemis/api/core/Message.java  |  27 +++-
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |  38 ++++-
 .../amqp/converter/AMQPMessageSupport.java         |  19 +++
 .../amqp/converter/AnnotationNameConveterTest.java |  36 +++++
 .../artemis/core/server/impl/QueueImpl.java        |   2 +-
 docs/user-manual/en/amqp.md                        |  39 +++++
 .../integration/amqp/AmqpExpiredMessageTest.java   |  66 ++++++++-
 .../integration/amqp/AmqpSendReceiveTest.java      |   6 +-
 .../amqp/DLQAfterExpiredMessageTest.java           | 162 +++++++++++++++++++++
 10 files changed, 377 insertions(+), 23 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index 81755ae..3f1baf1 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -148,6 +148,11 @@ public class TypedProperties {
       otherProps.forEachInternal(this::doPutValue);
    }
 
+   public TypedProperties putProperty(final SimpleString key, final Object 
value) {
+      setObjectProperty(key, value, this);
+      return this;
+   }
+
    public Object getProperty(final SimpleString key) {
       return doGetProperty(key);
    }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 568cdda..b8f6303 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -463,24 +463,24 @@ public interface Message {
    }
 
    default void referenceOriginalMessage(final Message original, String 
originalQueue) {
-      String queueOnMessage = 
original.getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
+      Object queueOnMessage = 
original.getBrokerProperty(Message.HDR_ORIGINAL_QUEUE);
 
       if (queueOnMessage != null) {
-         setAnnotation(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
+         setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
       } else if (originalQueue != null) {
-         setAnnotation(Message.HDR_ORIGINAL_QUEUE, originalQueue);
+         setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
       }
 
-      Object originalID = original.getAnnotation(Message.HDR_ORIG_MESSAGE_ID);
+      Object originalID = 
original.getBrokerProperty(Message.HDR_ORIG_MESSAGE_ID);
 
       if (originalID != null) {
-         setAnnotation(Message.HDR_ORIGINAL_ADDRESS, 
original.getAnnotationString(Message.HDR_ORIGINAL_ADDRESS));
+         setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, 
original.getBrokerProperty(Message.HDR_ORIGINAL_ADDRESS));
 
-         setAnnotation(Message.HDR_ORIG_MESSAGE_ID, originalID);
+         setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, originalID);
       } else {
-         setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
+         setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, 
original.getAddress());
 
-         setAnnotation(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
+         setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, 
original.getMessageID());
       }
 
       // reset expiry
@@ -641,6 +641,17 @@ public interface Message {
       return this;
    }
 
+   /** To be called by the broker on ocasions such as DLQ and expiry.
+    * When the broker is adding additional properties. */
+   default Message setBrokerProperty(SimpleString key, Object value) {
+      putObjectProperty(key, value);
+      return this;
+   }
+
+   default Object getBrokerProperty(SimpleString key) {
+      return getObjectProperty(key);
+   }
+
    Short getShortProperty(SimpleString key) throws 
ActiveMQPropertyConversionException;
 
    Float getFloatProperty(SimpleString key) throws 
ActiveMQPropertyConversionException;
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index e0cd94b..6e903a3 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -104,6 +104,8 @@ import org.jboss.logging.Logger;
  */
 public abstract class AMQPMessage extends RefCountMessage implements 
org.apache.activemq.artemis.api.core.Message {
 
+   private static final SimpleString ANNOTATION_AREA_PREFIX = 
SimpleString.toSimpleString("m.");
+
    protected static final Logger logger = Logger.getLogger(AMQPMessage.class);
 
    public static final SimpleString ADDRESS_PROPERTY = 
SimpleString.toSimpleString("_AMQ_AD");
@@ -275,14 +277,18 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
 
    @Override
    public Object getObjectPropertyForFilter(SimpleString key) {
-      Object value = getObjectProperty(key);
-      if (value == null) {
-         value = getMessageAnnotation(key.toString());
+      if (key.startsWith(ANNOTATION_AREA_PREFIX)) {
+         key = key.subSeq(ANNOTATION_AREA_PREFIX.length(), key.length());
+         return getAnnotation(key);
       }
+
+      Object value = getObjectProperty(key);
       if (value == null) {
-         value = getExtraBytesProperty(key);
+         TypedProperties extra = getExtraProperties();
+         if (extra != null) {
+            value = extra.getProperty(key);
+         }
       }
-
       return value;
    }
 
@@ -498,6 +504,9 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
    }
 
    protected void setMessageAnnotation(Symbol annotation, Object value) {
+      if (value instanceof SimpleString) {
+         value = value.toString();
+      }
       getMessageAnnotationsMap(true).put(annotation, value);
    }
 
@@ -1278,6 +1287,25 @@ public abstract class AMQPMessage extends 
RefCountMessage implements org.apache.
       return this;
    }
 
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message 
setBrokerProperty(SimpleString key, Object value) {
+      // Annotation names have to start with x-opt
+      
setMessageAnnotation(AMQPMessageSupport.toAnnotationName(key.toString()), 
value);
+      createExtraProperties().putProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public Object getBrokerProperty(SimpleString key) {
+      TypedProperties extra = getExtraProperties();
+      if (extra == null) {
+         return null;
+      }
+      return extra.getProperty(key);
+   }
+
+
    // JMS Style property access methods.  These can result in additional 
decode of AMQP message
    // data from Application properties.  Updates to application properties 
puts the message in a
    // dirty state and requires a re-encode of the data to update all buffer 
state data otherwise
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index dd81c4c..abc6392 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -35,6 +35,7 @@ import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@@ -63,6 +64,9 @@ public final class AMQPMessageSupport {
 
    private static final Logger logger = 
Logger.getLogger(AMQPMessageSupport.class);
 
+
+   public static SimpleString HDR_ORIGINAL_ADDRESS_ANNOTATION = 
SimpleString.toSimpleString("x-opt-ORIG-ADDRESS");
+
    public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = 
"x-opt-jms-reply-to";
 
    // Message Properties used to map AMQP to JMS and back
@@ -178,6 +182,9 @@ public final class AMQPMessageSupport {
    public static final Binary EMPTY_BINARY = new Binary(new byte[0]);
    public static final Data EMPTY_BODY = new Data(EMPTY_BINARY);
 
+   public static final String X_OPT_PREFIX = "x-opt-";
+   public static final String AMQ_PROPERTY_PREFIX = "_AMQ_";
+
    public static final short AMQP_UNKNOWN = 0;
    public static final short AMQP_NULL = 1;
    public static final short AMQP_DATA = 2;
@@ -285,6 +292,18 @@ public final class AMQPMessageSupport {
       }
    }
 
+   public static String toAnnotationName(String key) {
+      if (!key.startsWith(X_OPT_PREFIX.toString())) {
+         if (key.startsWith(AMQ_PROPERTY_PREFIX)) {
+            return 
X_OPT_PREFIX.concat(key.substring(AMQ_PROPERTY_PREFIX.length()).replace('_', 
'-'));
+         }
+
+         return key;
+      }
+      return  key;
+   }
+
+
    public static String toAddress(Destination destination) {
       try {
          if (destination instanceof ActiveMQDestination) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/AnnotationNameConveterTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/AnnotationNameConveterTest.java
new file mode 100644
index 0000000..2de0ed3
--- /dev/null
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/AnnotationNameConveterTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AnnotationNameConveterTest {
+
+   @Test
+   public void testAnnotationName() {
+      try {
+         Assert.assertEquals("x-opt-ORIG-QUEUE", 
AMQPMessageSupport.toAnnotationName(Message.HDR_ORIGINAL_QUEUE.toString()));
+         Assert.assertEquals("x-opt-ORIG-MESSAGE-ID", 
AMQPMessageSupport.toAnnotationName(Message.HDR_ORIG_MESSAGE_ID.toString()));
+      } catch (Exception e) {
+         e.printStackTrace();
+      }
+   }
+
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 82e8870..0caf7ad 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3384,7 +3384,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       copy.setExpiration(0);
 
       if (expiry) {
-         copy.setAnnotation(Message.HDR_ACTUAL_EXPIRY_TIME, 
System.currentTimeMillis());
+         copy.setBrokerProperty(Message.HDR_ACTUAL_EXPIRY_TIME, 
System.currentTimeMillis());
       }
 
       copy.reencode();
diff --git a/docs/user-manual/en/amqp.md b/docs/user-manual/en/amqp.md
index 5cff656..f88594c 100644
--- a/docs/user-manual/en/amqp.md
+++ b/docs/user-manual/en/amqp.md
@@ -128,6 +128,45 @@ message for later delivery:
 If both annotations are present in the same message then the broker will prefer
 the more specific `x-opt-delivery-time` value.
 
+## DLQ and Expiry transfer
+
+AMQP Messages will be copied before transferred to a DLQ or ExpiryQueue and 
will receive properties and annotations during this process.
+
+The broker also keeps an internal only property (called extra property) that 
is not exposed to the clients, and those will also be filled during this 
process.
+
+Here is a list of Annotations and Property names AMQP Messages will receive 
when transferred:
+
+|Annotation name| Internal Property Name|Description|
+|---------------|-----------------------|-----------|
+|x-opt-ORIG-MESSAGE-ID|_AMQ_ORIG_MESSAGE_ID|The original message ID before the 
transfer|
+|x-opt-ACTUAL-EXPIRY|_AMQ_ACTUAL_EXPIRY|When the expiry took place. 
Milliseconds since epoch times|
+|x-opt-ORIG-QUEUE|_AMQ_ORIG_QUEUE|The original queue name before the transfer|
+|x-opt-ORIG-ADDRESS|_AMQ_ORIG_ADDRESS|The original address name before the 
transfer|
+
+## Filtering on Message Annotations
+
+It is possible to filter on messaging annotations if you use the prefix "m." 
before the annotation name.
+
+For example if you want to filter messages sent to a specific destination, you 
could create your filter accordingly to this:
+
+```java
+ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:5672");
+Connection connection = factory.createConnection();
+Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+connection.start();
+javax.jms.Queue queue = session.createQueue("my-DLQ");
+MessageConsumer consumer = session.createConsumer(queue, 
"\"m.x-opt-ORIG-ADDRESS\"='ORIGINAL_PLACE'");
+Message message = consumer.receive();
+```
+
+The broker will set internal properties. If you intend to filter after DLQ or 
Expiry you may choose the internal property names:
+
+```java
+// Replace the consumer creation on the previous example:
+MessageConsumer consumer = session.createConsumer(queue, 
"_AMQ_ORIG_ADDRESS='ORIGINAL_PLACE'");
+```
+
+
 ## Configuring AMQP Idle Timeout
 
 It is possible to configure the AMQP Server's IDLE Timeout by setting the 
property amqpIdleTimeout in milliseconds on the acceptor.
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
index 121137f..30b3f7a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
@@ -16,10 +16,18 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -168,10 +176,20 @@ public class AmqpExpiredMessageTest extends 
AmqpClientTestSupport {
       connection = addConnection(client.connect());
       session = connection.createSession();
 
-      AmqpReceiver receiverDLQ = 
session.createReceiver(getDeadLetterAddress(), "_AMQ_ORIG_ADDRESS='" + 
getQueueName() + "'");
+      AmqpReceiver receiverDLQ = 
session.createReceiver(getDeadLetterAddress(), "\"m.x-opt-ORIG-ADDRESS\"='" + 
getQueueName() + "'");
       receiverDLQ.flow(1);
       received = receiverDLQ.receive(5, TimeUnit.SECONDS);
       Assert.assertNotNull(received);
+      Assert.assertEquals(getQueueName(), 
received.getMessageAnnotation("x-opt-ORIG-ADDRESS"));
+      // close without accepting on purpose, it will issue a redelivery on the 
second filter
+      receiverDLQ.close();
+
+      // Redo the selection, however now using the extra-properties, since the 
broker will store these as extra properties on AMQP Messages
+      receiverDLQ = session.createReceiver(getDeadLetterAddress(), 
"_AMQ_ORIG_ADDRESS='" + getQueueName() + "'");
+      receiverDLQ.flow(1);
+      received = receiverDLQ.receive(5, TimeUnit.SECONDS);
+      Assert.assertEquals(getQueueName(), 
received.getMessageAnnotation("x-opt-ORIG-ADDRESS"));
+      Assert.assertNotNull(received);
       received.accept();
 
       assertNotNull("Should have read message from DLQ", received);
@@ -182,6 +200,44 @@ public class AmqpExpiredMessageTest extends 
AmqpClientTestSupport {
       connection.close();
    }
 
+   /** This test is validating a broker feature where the message copy through 
the DLQ will receive an annotation.
+    *  It is also testing filter on that annotation. */
+   @Test(timeout = 60000)
+   public void testExpiryQpidJMS() throws Exception {
+      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
getBrokerAmqpConnectionURI().toString());
+      Connection connection = factory.createConnection();
+      try {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(getQueueName());
+         MessageProducer sender = session.createProducer(queue);
+
+         // Get the Queue View early to avoid racing the delivery.
+         final Queue queueView = getProxyToQueue(getQueueName());
+         assertNotNull(queueView);
+
+         sender.setTimeToLive(1);
+         TextMessage message = session.createTextMessage("Test-Message");
+         message.setStringProperty("key1", "Value1");
+         sender.send(message);
+         sender.close();
+
+         Wait.assertEquals(1, queueView::getMessagesExpired);
+         final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
+         assertNotNull(dlqView);
+         Wait.assertEquals(1, dlqView::getMessageCount);
+
+         connection.start();
+         javax.jms.Queue queueDLQ = 
session.createQueue(getDeadLetterAddress());
+         MessageConsumer receiverDLQ = session.createConsumer(queueDLQ, 
"\"m.x-opt-ORIG-ADDRESS\"='" + getQueueName() + "'");
+         Message received = receiverDLQ.receive(5000);
+         Assert.assertNotNull(received);
+         receiverDLQ.close();
+      } finally {
+         connection.close();
+      }
+
+   }
+
    @Test(timeout = 60000)
    public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws 
Exception {
       AmqpClient client = createAmqpClient();
@@ -261,14 +317,12 @@ public class AmqpExpiredMessageTest extends 
AmqpClientTestSupport {
       AmqpMessage message = new AmqpMessage();
       message.setAbsoluteExpiryTime(0);
       // AET should override any TTL set
-      message.setTimeToLive(1000);
+      message.setTimeToLive(100);
       message.setText("Test-Message");
       sender.send(message);
       sender.close();
 
-      Wait.assertEquals(1, queueView::getMessageCount);
-
-      Thread.sleep(1000);
+      Wait.assertEquals(1L, queueView::getMessagesExpired, 10000, 10);
 
       // Now try and get the message
       AmqpReceiver receiver = session.createReceiver(getQueueName());
@@ -426,7 +480,7 @@ public class AmqpExpiredMessageTest extends 
AmqpClientTestSupport {
 
          message = receiver.receive(5, TimeUnit.SECONDS);
          assertNotNull(message);
-         assertEquals(getQueueName(), 
message.getMessageAnnotation(org.apache.activemq.artemis.api.core.Message.HDR_ORIGINAL_ADDRESS.toString()));
+         assertEquals(getQueueName(), 
message.getMessageAnnotation("x-opt-ORIG-QUEUE"));
          assertNull(message.getDeliveryAnnotation("shouldDisappear"));
          assertNull(receiver.receiveNoWait());
       } finally {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 85c304c..b75a293 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -396,19 +396,19 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
       AmqpMessage message = new AmqpMessage();
 
       message.setMessageId("msg" + 1);
-      message.setMessageAnnotation("serialNo", 1);
+      message.setMessageAnnotation("x-opt-serialNo", 1);
       message.setText("Test-Message");
       sender.send(message);
 
       message = new AmqpMessage();
       message.setMessageId("msg" + 2);
-      message.setMessageAnnotation("serialNo", 2);
+      message.setMessageAnnotation("x-opt-serialNo", 2);
       message.setText("Test-Message 2");
       sender.send(message);
       sender.close();
 
       LOG.debug("Attempting to read message with receiver");
-      AmqpReceiver receiver = session.createReceiver(getQueueName(), 
"serialNo=2");
+      AmqpReceiver receiver = session.createReceiver(getQueueName(), 
"\"m.x-opt-serialNo\"=2");
       receiver.flow(2);
       AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
       assertNotNull("Should have read message", received);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java
new file mode 100644
index 0000000..ce8fd56
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This is testing a double transfer (copy).
+ * First messages will expire, then DLQ.
+ * This will validate the data added to the queues.
+ */
+public class DLQAfterExpiredMessageTest extends AmqpClientTestSupport {
+   private static final Logger log = 
Logger.getLogger(DLQAfterExpiredMessageTest.class);
+
+   protected String getExpiryQueue() {
+      return "ActiveMQ.Expiry";
+   }
+
+   @Override
+   protected void createAddressAndQueues(ActiveMQServer server) throws 
Exception {
+      // Default Queue
+      server.addAddressInfo(new 
AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST));
+
+      // Default DLQ
+      server.addAddressInfo(new 
AddressInfo(SimpleString.toSimpleString(getDeadLetterAddress()), 
RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(getDeadLetterAddress()).setRoutingType(RoutingType.ANYCAST));
+
+      // Expiry
+      server.addAddressInfo(new 
AddressInfo(SimpleString.toSimpleString(getExpiryQueue()), 
RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(getExpiryQueue()).setRoutingType(RoutingType.ANYCAST));
+
+      // Default Topic
+      server.addAddressInfo(new 
AddressInfo(SimpleString.toSimpleString(getTopicName()), 
RoutingType.MULTICAST));
+      server.createQueue(new QueueConfiguration(getTopicName()));
+
+      // Additional Test Queues
+      for (int i = 0; i < getPrecreatedQueueSize(); ++i) {
+         server.addAddressInfo(new 
AddressInfo(SimpleString.toSimpleString(getQueueName(i)), RoutingType.ANYCAST));
+         server.createQueue(new 
QueueConfiguration(getQueueName(i)).setRoutingType(RoutingType.ANYCAST));
+      }
+   }
+
+   @Override
+   protected void configureAddressPolicy(ActiveMQServer server) {
+      // Address configuration
+      AddressSettings addressSettings = new AddressSettings();
+
+      
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      addressSettings.setAutoCreateQueues(isAutoCreateQueues());
+      addressSettings.setAutoCreateAddresses(isAutoCreateAddresses());
+      
addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
+      
addressSettings.setExpiryAddress(SimpleString.toSimpleString(getExpiryQueue()));
+      addressSettings.setMaxDeliveryAttempts(1);
+      server.getConfiguration().getAddressesSettings().put("#", 
addressSettings);
+      server.getConfiguration().getAddressesSettings().put(getExpiryQueue(), 
addressSettings);
+   }
+
+   @Test
+   public void testDoubleTransfer() throws Throwable {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender(getQueueName());
+
+         // Get the Queue View early to avoid racing the delivery.
+         final Queue queueView = getProxyToQueue(getQueueName());
+         assertNotNull(queueView);
+
+         AmqpMessage message = new AmqpMessage();
+         message.setTimeToLive(1);
+         message.setText("Test-Message");
+         message.setDurable(true);
+         message.setApplicationProperty("key1", "Value1");
+         sender.send(message);
+         sender.close();
+
+         Wait.assertEquals(1, queueView::getMessagesExpired);
+         Wait.assertEquals(0, queueView::getConsumerCount);
+
+         final Queue expiryView = getProxyToQueue(getExpiryQueue());
+         assertNotNull(expiryView);
+         Wait.assertEquals(1, expiryView::getMessageCount);
+
+         HashMap<String, Object> annotations = new HashMap<>();
+
+         AmqpReceiver receiverDLQ = session.createReceiver(getExpiryQueue(), 
"\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION + "\"='" + 
getQueueName() + "'");
+         receiverDLQ.flow(1);
+         AmqpMessage received = receiverDLQ.receive(5, TimeUnit.SECONDS);
+         Assert.assertNotNull(received);
+         Map<Symbol, Object> avAnnotations = 
received.getWrappedMessage().getMessageAnnotations().getValue();
+         avAnnotations.forEach((key, value) -> {
+            annotations.put(key.toString(), value);
+         });
+         received.reject();
+         receiverDLQ.close();
+
+
+         // Redo the selection
+         receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m." + 
AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION + "\"='" + getQueueName() + 
"'");
+         receiverDLQ.flow(1);
+         received = receiverDLQ.receive(5, TimeUnit.SECONDS);
+         Assert.assertNotNull(received);
+         received.accept();
+
+         /** When moving to DLQ, the original headers shoudln't be touched. */
+         for (Map.Entry<String, Object> entry : annotations.entrySet()) {
+            log.debug("Checking " + entry.getKey() + " = " + entry.getValue());
+            Assert.assertEquals(entry.getKey() + " should be = " + 
entry.getValue(), entry.getValue(), 
received.getMessageAnnotation(entry.getKey()));
+         }
+
+         assertEquals(0, received.getTimeToLive());
+         assertNotNull(received);
+         assertEquals("Value1", received.getApplicationProperty("key1"));
+      } catch (Throwable e) {
+         e.printStackTrace();
+         throw e;
+      } finally {
+         connection.close();
+      }
+   }
+}

Reply via email to