This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new e1d8f42 ARTEMIS-2798 expiration for AMQP msgs not reloaded
new 0a2c439 This closes #3176
e1d8f42 is described below
commit e1d8f42cc18e16f13fcc5d1c0ab0f4d3b176ef9c
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Jun 9 12:19:01 2020 -0500
ARTEMIS-2798 expiration for AMQP msgs not reloaded
---
.../artemis/protocol/amqp/broker/AMQPMessage.java | 1 +
.../integration/client/MessageExpirationTest.java | 57 +++++++++++++++++++++-
2 files changed, 57 insertions(+), 1 deletion(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 073da32..c89ba5c 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -958,6 +958,7 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
@Override
public final long getExpiration() {
+ ensureMessageDataScanned();
return expiration;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java
index f4b4a35..e15abff 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java
@@ -16,8 +16,15 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -29,7 +36,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -71,6 +80,50 @@ public class MessageExpirationTest extends ActiveMQTestBase {
}
@Test
+ public void testAmqpJmsReloaded() throws Exception {
+ SimpleString queue = RandomUtil.randomSimpleString();
+ SimpleString expiry = RandomUtil.randomSimpleString();
+
+ server.createQueue(new
QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new QueueConfiguration(expiry));
+ server.getAddressSettingsRepository().addMatch(queue.toString(), new
AddressSettings().setExpiryAddress(expiry));
+
+ ConnectionFactory cf = new
JmsConnectionFactory("amqp://localhost:61616");
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession();
+ MessageProducer producer =
session.createProducer(session.createQueue(queue.toString()));
+ producer.setTimeToLive(EXPIRATION);
+
+ for (int i = 0; i < 20; i++) {
+ javax.jms.Message message = session.createMessage();
+ producer.send(message);
+ }
+ connection.close();
+ Wait.assertEquals(20L, () ->
server.locateQueue(queue).getMessageCount(), 2000, 100);
+ Wait.assertEquals(0L, () ->
server.locateQueue(expiry).getMessageCount(), 2000, 100);
+
+ server.stop();
+ server.start();
+
+ Thread.sleep(EXPIRATION * 2);
+
+ Wait.assertEquals(0L, () -> server.locateQueue(queue).getMessageCount(),
2000, 100);
+ Wait.assertEquals(20L, () ->
server.locateQueue(expiry).getMessageCount(), 2000, 100);
+
+ connection = cf.createConnection();
+ session = connection.createSession();
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(queue.toString()));
+ connection.start();
+
+ for (int i = 0; i < 20; i++) {
+ javax.jms.Message message2 = consumer.receiveNoWait();
+ Assert.assertNull(message2);
+ }
+
+ consumer.close();
+ }
+
+ @Test
public void testMessageExpiredWithoutExpiryAddressWithExpiryDelayOverride()
throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
@@ -235,7 +288,9 @@ public class MessageExpirationTest extends ActiveMQTestBase
{
public void setUp() throws Exception {
super.setUp();
- server = createServer(false);
+ server = createServer(true);
+ server.getConfiguration().addAcceptorConfiguration("amqp",
"tcp://127.0.0.1:61616");
+ server.getConfiguration().setMessageExpiryScanPeriod(200);
server.start();
locator = createInVMNonHALocator();
sf = createSessionFactory(locator);