This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.19.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit b4188b7dada4ac8709bac016574b3bc3b6e9a3e4
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Nov 3 14:47:35 2021 -0500

    ARTEMIS-3552 NPE on message expiration
    
    (cherry picked from commit 5fe42dd0c478f7f7433ef37fb77ea5f6b2332a71)
---
 .../artemis/core/server/ActiveMQServerLogger.java  |  2 +-
 .../artemis/core/server/impl/QueueImpl.java        |  5 +-
 .../integration/amqp/AmqpExpiredMessageTest.java   | 67 ++++++++++++++++++++++
 3 files changed, 71 insertions(+), 3 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 27d3d3d..657cb66 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1050,7 +1050,7 @@ public interface ActiveMQServerLogger extends BasicLogger 
{
    void errorFlushingExecutorsOnQueue();
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 222145, value = "Error expiring reference {0} 0n queue", 
format = Message.Format.MESSAGE_FORMAT)
+   @Message(id = 222145, value = "Error expiring reference {0} on queue", 
format = Message.Format.MESSAGE_FORMAT)
    void errorExpiringReferencesOnQueue(@Cause Exception e, MessageReference 
ref);
 
    @LogMessage(level = Logger.Level.WARN)
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 6eb4e3f..e572c90 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3391,8 +3391,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       copyMessage.setAddress(toAddress);
 
-      if (ref.getMessage().getAnnotationString(Message.HDR_ORIG_ROUTING_TYPE) 
!= null) {
-         
copyMessage.setRoutingType(RoutingType.getType(ref.getMessage().getByteProperty(Message.HDR_ORIG_ROUTING_TYPE)));
+      Object originalRoutingType = 
ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
+      if (originalRoutingType != null && originalRoutingType instanceof Byte) {
+         copyMessage.setRoutingType(RoutingType.getType((Byte) 
originalRoutingType));
       }
 
       if (queueIDs != null && queueIDs.length > 0) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
index a9c5d5c..f98ea80 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
@@ -27,12 +27,18 @@ import java.util.HashMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -539,6 +545,67 @@ public class AmqpExpiredMessageTest extends 
AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
+   public void testExpirationAfterDivert() throws Throwable {
+      final String FORWARDING_ADDRESS = RandomUtil.randomString();
+      server.createQueue(new 
QueueConfiguration(FORWARDING_ADDRESS).setRoutingType(RoutingType.ANYCAST));
+      server.deployDivert(new DivertConfiguration()
+                             .setName(RandomUtil.randomString())
+                             .setAddress(getQueueName())
+                             .setForwardingAddress(FORWARDING_ADDRESS)
+                             .setTransformerConfiguration(new 
TransformerConfiguration(MyTransformer.class.getName()))
+                             .setExclusive(true));
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = client.connect();
+
+      try {
+
+         // Normal Session which won't create an TXN itself
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getQueueName());
+
+         AmqpMessage message = new AmqpMessage();
+         message.setDurable(true);
+         message.setText("Test-Message");
+         message.setDeliveryAnnotation("shouldDisappear", 1);
+         message.setMessageAnnotation("x-opt-routing-type", (byte) 1);
+         sender.send(message);
+
+         Queue forward = getProxyToQueue(FORWARDING_ADDRESS);
+         assertTrue("Message not diverted", Wait.waitFor(() -> 
forward.getMessageCount() > 0, 7000, 500));
+
+         Queue dlq = getProxyToQueue(getDeadLetterAddress());
+         assertTrue("Message not moved to DLQ", Wait.waitFor(() -> 
dlq.getMessageCount() > 0, 7000, 500));
+
+         connection.close();
+
+         connection = client.connect();
+         session = connection.createSession();
+
+         // Read all messages from the Queue
+         AmqpReceiver receiver = 
session.createReceiver(getDeadLetterAddress());
+         receiver.flow(20);
+
+         message = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull(message);
+         assertEquals(FORWARDING_ADDRESS, 
message.getMessageAnnotation("x-opt-ORIG-QUEUE"));
+         assertNull(message.getDeliveryAnnotation("shouldDisappear"));
+         assertNull(receiver.receiveNoWait());
+      } finally {
+         connection.close();
+      }
+   }
+
+   public static class MyTransformer implements Transformer {
+      public MyTransformer() {
+      }
+
+      @Override
+      public org.apache.activemq.artemis.api.core.Message 
transform(org.apache.activemq.artemis.api.core.Message message) {
+         return message.setExpiration(System.currentTimeMillis() + 250);
+      }
+   }
+
+   @Test(timeout = 60000)
    public void testDLQdMessageCanBeRedeliveredMultipleTimes() throws Throwable 
{
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = client.connect();

Reply via email to