This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.19.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit c11a0c000475d2769d9c91df36c56b25a407fda2 Author: Clebert Suconic <[email protected]> AuthorDate: Tue Oct 12 12:03:29 2021 -0400 ARTEMIS-3525 Empty Auto Created queues should be removed on restart (cherry picked from commit 2383aa0125320713b9a753668b203a878c24b2e0) --- .../artemis/core/postoffice/AddressManager.java | 5 ++ .../core/postoffice/impl/PostOfficeImpl.java | 20 +++-- .../core/postoffice/impl/SimpleAddressManager.java | 9 ++- .../postoffice/impl/PostOfficeTestAccessor.java | 2 +- .../tests/integration/client/AutoCreateTest.java | 91 ++++++++++++++++++++++ 5 files changed, 118 insertions(+), 9 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java index f60b9a1..e7720da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java @@ -85,6 +85,11 @@ public interface AddressManager { boolean checkAutoRemoveAddress(SimpleString address, AddressInfo addressInfo, + AddressSettings settings, + boolean ignoreDelay) throws Exception; + + boolean checkAutoRemoveAddress(SimpleString address, + AddressInfo addressInfo, AddressSettings settings) throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index feaa7b1..ba15fc7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1779,6 +1779,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public synchronized void startAddressQueueScanner() { + reapAddresses(true); // we need to check for empty auto-created queues before the acceptors are on + // empty auto-created queues and addresses should be removed right away if (addressQueueReaperPeriod > 0) { if (addressQueueReaperRunnable != null) addressQueueReaperRunnable.stop(); @@ -1845,7 +1847,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public void run() { - reapAddresses(); + reapAddresses(false); } } @@ -1855,12 +1857,16 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding /** To be used by the AddressQueueReaper. * It is also exposed for tests through PostOfficeTestAccessor */ - void reapAddresses() { + void reapAddresses(boolean initialCheck) { getLocalQueues().forEach(queue -> { - if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) { - if (queue.isSwept()) { + if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && (initialCheck || QueueManagerImpl.delayCheck(queue)) && QueueManagerImpl.messageCountCheck(queue) && (initialCheck || queueWasUsed(queue))) { + if (initialCheck || queue.isSwept()) { if (logger.isDebugEnabled()) { - logger.debug("Removing queue " + queue.getName() + " after it being swept twice on reaping process"); + if (initialCheck) { + logger.debug("Removing queue " + queue.getName() + " during the reload check"); + } else { + logger.debug("Removing queue " + queue.getName() + " after it being swept twice on reaping process"); + } } QueueManagerImpl.performAutoDeleteQueue(server, queue); } else { @@ -1878,8 +1884,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); try { - if (addressManager.checkAutoRemoveAddress(address, addressInfo, settings)) { - if (addressInfo.isSwept()) { + if (addressManager.checkAutoRemoveAddress(address, addressInfo, settings, initialCheck)) { + if (initialCheck || addressInfo.isSwept()) { server.autoRemoveAddressInfo(address, null); } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 5a4a94a..2e92300 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -369,7 +369,14 @@ public class SimpleAddressManager implements AddressManager { public boolean checkAutoRemoveAddress(SimpleString address, AddressInfo addressInfo, AddressSettings settings) throws Exception { - return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(address) && addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay()); + return checkAutoRemoveAddress(address, addressInfo, settings, false); + } + + @Override + public boolean checkAutoRemoveAddress(SimpleString address, + AddressInfo addressInfo, + AddressSettings settings, boolean ignoreDelay) throws Exception { + return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(address) && (ignoreDelay || addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay())); } @Override diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeTestAccessor.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeTestAccessor.java index 6236de9..9e3ecb0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeTestAccessor.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeTestAccessor.java @@ -20,7 +20,7 @@ package org.apache.activemq.artemis.core.postoffice.impl; public class PostOfficeTestAccessor { public static void reapAddresses(PostOfficeImpl postOffice) { - postOffice.reapAddresses(); + postOffice.reapAddresses(false); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java index fad1357..9a7db67 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java @@ -22,6 +22,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.Topic; import java.util.concurrent.CountDownLatch; @@ -42,6 +43,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.jboss.logging.Logger; import org.junit.After; @@ -327,6 +329,95 @@ public class AutoCreateTest extends ActiveMQTestBase { } } + @Test + public void testCleanupAfterRebootOpenWire() throws Exception { + testCleanupAfterReboot("OPENWIRE", false); + } + + @Test + public void testCleanupAfterRebootCore() throws Exception { + // there is no need to duplicate the test between usedelay and not. + // doing it in one of the protocols should be enough + testCleanupAfterReboot("CORE", true); + } + + @Test + public void testCleanupAfterRebootAMQP() throws Exception { + testCleanupAfterReboot("AMQP", false); + } + + public void testCleanupAfterReboot(String protocol, boolean useDelay) throws Exception { + + if (useDelay) { + // setting up a delay, to make things a bit more challenging + server.getAddressSettingsRepository().addMatch(getName(), new AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddressesDelay(TimeUnit.DAYS.toMillis(1)).setAutoDeleteQueuesDelay(TimeUnit.DAYS.toMillis(1))); + } + + AssertionLoggerHandler.startCapture(); + server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually + server.start(); + String QUEUE_NAME = getName(); + + ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + } + + AddressInfo info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME)); + Assert.assertNotNull(info); + Assert.assertTrue(info.isAutoCreated()); + + server.stop(); + server.start(); + + Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113")); + Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112")); + + AssertionLoggerHandler.clear(); + + String randomString = "random " + RandomUtil.randomString(); + + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage(randomString)); + } + + info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME)); + Assert.assertNotNull(info); + Assert.assertTrue(info.isAutoCreated()); + + server.stop(); + server.start(); + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); // this time around the queue had messages, it has to exist + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112")); + + info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME)); + Assert.assertNotNull(info); + Assert.assertTrue(info.isAutoCreated()); + + { // just a namespace + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(QUEUE_NAME); + Wait.assertEquals(1, serverQueue::getMessageCount); + } + + + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + Queue queue = session.createQueue(QUEUE_NAME); + MessageConsumer consumer = session.createConsumer(queue); + TextMessage message = (TextMessage)consumer.receive(5000); + Assert.assertEquals(randomString, message.getText()); + } + + } + }
