This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new e1d8f42  ARTEMIS-2798 expiration for AMQP msgs not reloaded
     new 0a2c439  This closes #3176
e1d8f42 is described below

commit e1d8f42cc18e16f13fcc5d1c0ab0f4d3b176ef9c
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Jun 9 12:19:01 2020 -0500

    ARTEMIS-2798 expiration for AMQP msgs not reloaded
---
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |  1 +
 .../integration/client/MessageExpirationTest.java  | 57 +++++++++++++++++++++-
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 073da32..c89ba5c 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -958,6 +958,7 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
 
    @Override
    public final long getExpiration() {
+      ensureMessageDataScanned();
       return expiration;
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java
index f4b4a35..e15abff 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java
@@ -16,8 +16,15 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -29,7 +36,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -71,6 +80,50 @@ public class MessageExpirationTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testAmqpJmsReloaded() throws Exception {
+      SimpleString queue = RandomUtil.randomSimpleString();
+      SimpleString expiry = RandomUtil.randomSimpleString();
+
+      server.createQueue(new 
QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new QueueConfiguration(expiry));
+      server.getAddressSettingsRepository().addMatch(queue.toString(), new 
AddressSettings().setExpiryAddress(expiry));
+
+      ConnectionFactory cf = new 
JmsConnectionFactory("amqp://localhost:61616");
+      Connection connection = cf.createConnection();
+      Session session = connection.createSession();
+      MessageProducer producer = 
session.createProducer(session.createQueue(queue.toString()));
+      producer.setTimeToLive(EXPIRATION);
+
+      for (int i = 0; i < 20; i++) {
+         javax.jms.Message message = session.createMessage();
+         producer.send(message);
+      }
+      connection.close();
+      Wait.assertEquals(20L, () -> 
server.locateQueue(queue).getMessageCount(), 2000, 100);
+      Wait.assertEquals(0L, () -> 
server.locateQueue(expiry).getMessageCount(), 2000, 100);
+
+      server.stop();
+      server.start();
+
+      Thread.sleep(EXPIRATION * 2);
+
+      Wait.assertEquals(0L, () -> server.locateQueue(queue).getMessageCount(), 
2000, 100);
+      Wait.assertEquals(20L, () -> 
server.locateQueue(expiry).getMessageCount(), 2000, 100);
+
+      connection = cf.createConnection();
+      session = connection.createSession();
+      MessageConsumer consumer = 
session.createConsumer(session.createQueue(queue.toString()));
+      connection.start();
+
+      for (int i = 0; i < 20; i++) {
+         javax.jms.Message message2 = consumer.receiveNoWait();
+         Assert.assertNull(message2);
+      }
+
+      consumer.close();
+   }
+
+   @Test
    public void testMessageExpiredWithoutExpiryAddressWithExpiryDelayOverride() 
throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
@@ -235,7 +288,9 @@ public class MessageExpirationTest extends ActiveMQTestBase 
{
    public void setUp() throws Exception {
       super.setUp();
 
-      server = createServer(false);
+      server = createServer(true);
+      server.getConfiguration().addAcceptorConfiguration("amqp", 
"tcp://127.0.0.1:61616");
+      server.getConfiguration().setMessageExpiryScanPeriod(200);
       server.start();
       locator = createInVMNonHALocator();
       sf = createSessionFactory(locator);

Reply via email to