This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 766f88c ARTEMIS-2629 ensure queue auto-delete after expiration
new 32829d6 This closes #2992
766f88c is described below
commit 766f88c22af032fdcae08c0f525f73d528b4a162
Author: Justin Bertram <[email protected]>
AuthorDate: Fri Feb 21 17:48:16 2020 -0600
ARTEMIS-2629 ensure queue auto-delete after expiration
---
.../core/postoffice/impl/PostOfficeImpl.java | 6 +++-
.../artemis/core/server/impl/QueueImpl.java | 10 ++++++
.../artemis/core/server/impl/QueueManagerImpl.java | 9 +++--
.../integration/client/AutoDeleteQueueTest.java | 38 +++++++++++++++++++++-
4 files changed, 56 insertions(+), 7 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index bbebfae..f3f0c5d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1735,7 +1735,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
@Override
public void run() {
for (Queue queue : getLocalQueues()) {
- if (!queue.isInternalQueue() &&
QueueManagerImpl.isAutoDelete(queue) &&
QueueManagerImpl.consumerCountCheck(queue) &&
QueueManagerImpl.delayCheck(queue) &&
QueueManagerImpl.messageCountCheck(queue)) {
+ if (!queue.isInternalQueue() &&
QueueManagerImpl.isAutoDelete(queue) &&
QueueManagerImpl.consumerCountCheck(queue) &&
QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue)
&& queueWasUsed(queue)) {
QueueManagerImpl.performAutoDeleteQueue(server, queue);
}
}
@@ -1760,6 +1760,10 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
}
}
}
+
+ private boolean queueWasUsed(Queue queue) {
+ return queue.getMessagesExpired() > 0 ||
queue.getMessagesAcknowledged() > 0 || queue.getMessagesKilled() > 0 ||
queue.getConsumerRemovedTimestamp() != -1;
+ }
}
private List<Queue> getLocalQueues() {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 53b701e..9ce735d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1859,6 +1859,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
acknowledge(ref, AckReason.EXPIRED, consumer);
}
+ // potentially auto-delete this queue if this expired the last message
+ refCountForConsumers.check();
+
if (server != null && server.hasBrokerMessagePlugins()) {
final SimpleString expiryAddress = messageExpiryAddress;
server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref,
expiryAddress, consumer));
@@ -3366,6 +3369,13 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
expiryLogger.addExpiry(address, ref);
}
+ // potentially auto-delete this queue if this expired the last message
+ tx.addOperation(new TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ refCountForConsumers.check();
+ }
+ });
}
private class ExpiryLogger extends TransactionOperationAbstract {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
index b20b537..549686f 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
@@ -35,7 +35,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil
implements QueueManag
//the queue may already have been deleted and this is a result of that
if (queue == null) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQServerLogger.LOGGER.debug("no queue to delete \"" +
queueName + ".\"");
+ ActiveMQServerLogger.LOGGER.debug("no queue to delete \"" +
queueName + "\".");
}
return;
}
@@ -52,7 +52,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil
implements QueueManag
long messageCount = queue.getMessageCount();
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQServerLogger.LOGGER.debug("purging queue \"" +
queue.getName() + ".\" consumerCount = " + consumerCount + "; messageCount = "
+ messageCount);
+ ActiveMQServerLogger.LOGGER.debug("purging queue \"" +
queue.getName() + "\": consumerCount = " + consumerCount + "; messageCount = "
+ messageCount);
}
try {
queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null,
AckReason.KILLED);
@@ -65,7 +65,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil
implements QueueManag
SimpleString queueName = queue.getName();
AddressSettings settings =
server.getAddressSettingsRepository().getMatch(queue.getAddress().toString());
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQServerLogger.LOGGER.info("deleting auto-created queue \"" +
queueName + ".\" consumerCount = " + queue.getConsumerCount() + "; messageCount
= " + queue.getMessageCount() + "; isAutoDelete = " + queue.isAutoDelete());
+ ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" +
queueName + "\": consumerCount = " + queue.getConsumerCount() + "; messageCount
= " + queue.getMessageCount() + "; isAutoDelete = " + queue.isAutoDelete());
}
try {
@@ -84,8 +84,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil
implements QueueManag
}
public static boolean delayCheck(Queue queue) {
- long consumerRemovedTimestamp = queue.getConsumerRemovedTimestamp();
- return consumerRemovedTimestamp != -1 && System.currentTimeMillis() -
consumerRemovedTimestamp >= queue.getAutoDeleteDelay();
+ return System.currentTimeMillis() - queue.getConsumerRemovedTimestamp()
>= queue.getAutoDeleteDelay();
}
public static boolean consumerCountCheck(Queue queue) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
index 8603e7b..5bce2cf 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
@@ -17,6 +17,8 @@
package org.apache.activemq.artemis.tests.integration.client;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -42,13 +44,15 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
super.setUp();
locator = createInVMNonHALocator();
server = createServer(false);
+ server.getConfiguration().setAddressQueueScanPeriod(500);
+ server.getConfiguration().setMessageExpiryScanPeriod(500);
server.start();
cf = createSessionFactory(locator);
}
@Test
- public void testAutoDeleteAutoCreatedQueue() throws Exception {
+ public void testAutoDeleteAutoCreatedQueueOnLastConsumerClose() throws
Exception {
// auto-delete-queues defaults to true
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null,
true, false, false, false, true, 1, false, true);
assertNotNull(server.locateQueue(queueA));
@@ -57,6 +61,30 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
}
@Test
+ public void
testAutoDeleteAutoCreatedQueueOnLastMessageRemovedWithoutConsumer() throws
Exception {
+ // auto-delete-queues defaults to true
+ server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null,
true, false, false, false, true, 1, false, true);
+ assertNotNull(server.locateQueue(queueA));
+ ClientSession session = cf.createSession();
+ ClientProducer producer = session.createProducer(addressA);
+ producer.send(session.createMessage(true));
+ Wait.assertEquals(1, server.locateQueue(queueA)::getMessageCount);
+ server.locateQueue(queueA).deleteAllReferences();
+ Wait.assertTrue(() -> server.locateQueue(queueA) == null, 2000, 100);
+ }
+
+ @Test
+ public void testAutoDeleteAutoCreatedQueueOnLastMessageExpired() throws
Exception {
+ // auto-delete-queues defaults to true
+ server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null,
true, false, false, false, true, 1, false, true);
+ assertNotNull(server.locateQueue(queueA));
+ ClientSession session = cf.createSession();
+ ClientProducer producer = session.createProducer(addressA);
+
producer.send(session.createMessage(true).setExpiration(System.currentTimeMillis()));
+ Wait.assertTrue(() -> server.locateQueue(queueA) == null, 2000, 100);
+ }
+
+ @Test
public void testNegativeAutoDeleteAutoCreatedQueue() throws Exception {
server.getAddressSettingsRepository().addMatch(addressA.toString(), new
AddressSettings().setAutoDeleteQueues(false));
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null,
true, false, false, false, true, 1, false, true);
@@ -64,4 +92,12 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
cf.createSession().createConsumer(queueA).close();
assertNotNull(server.locateQueue(queueA));
}
+
+ @Test
+ public void testNegativeAutoDeleteAutoCreatedQueue2() throws Exception {
+ server.getAddressSettingsRepository().addMatch(addressA.toString(), new
AddressSettings());
+ server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null,
true, false, false, false, true, 1, false, true);
+ assertNotNull(server.locateQueue(queueA));
+ assertFalse(Wait.waitFor(() -> server.locateQueue(queueA) == null, 5000,
100));
+ }
}