This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ec54576323 ARTEMIS-4281 Queue Reaper should not remove non empty
queues on initial check
ec54576323 is described below
commit ec54576323ff819205a4c2e0eaf4e2bc1d8fd710
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue May 16 17:09:16 2023 -0400
ARTEMIS-4281 Queue Reaper should not remove non empty queues on initial
check
---
.../core/postoffice/impl/PostOfficeImpl.java | 4 +-
.../integration/client/PreserveOnRestartTest.java | 104 +++++++++++++++++++++
2 files changed, 107 insertions(+), 1 deletion(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 2b397434e2..7b7e68c336 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1955,7 +1955,9 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
getLocalQueues().forEach(queue -> {
AddressSettings settings =
addressSettingsRepository.getMatch(queue.getAddress().toString());
if (!queue.isInternalQueue() && queue.isAutoDelete() &&
QueueManagerImpl.consumerCountCheck(queue) && (initialCheck ||
QueueManagerImpl.delayCheck(queue, settings)) &&
QueueManagerImpl.messageCountCheck(queue) && (initialCheck ||
queueWasUsed(queue, settings))) {
- if (initialCheck || queue.isSwept()) {
+ // we only reap queues on the initialCheck if they are actually
empty
+ boolean validInitialCheck = initialCheck &&
queue.getMessageCount() == 0 && !queue.getPagingStore().isPaging();
+ if (validInitialCheck || queue.isSwept()) {
if (logger.isDebugEnabled()) {
if (initialCheck) {
logger.debug("Removing queue {} during the reload check",
queue.getName());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PreserveOnRestartTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PreserveOnRestartTest.java
new file mode 100644
index 0000000000..0c0bc679f2
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PreserveOnRestartTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PreserveOnRestartTest extends ActiveMQTestBase {
+
+
+ private ActiveMQServer server;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setAddressQueueScanPeriod(500);
+ server.getConfiguration().setMessageExpiryScanPeriod(500);
+ server.getConfiguration().addAddressSetting("#", new
AddressSettings().setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(360000).setAutoDeleteQueuesDelay(360000).setAutoDeleteQueuesMessageCount(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeMessages(20));
+ server.start();
+ }
+
+ @Test
+ public void testRestartQueueNoPaging() throws Exception {
+ testRestartQueue(false);
+ }
+
+ @Test
+ public void testRestartQueuePaging() throws Exception {
+ testRestartQueue(true);
+ }
+
+ public void testRestartQueue(boolean paging) throws Exception {
+ int NUMBER_OF_MESSAGES = paging ? 100 : 10;
+ String queueName = getName();
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueName));
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(session.createTextMessage("hello" + i));
+ }
+ session.commit();
+ }
+ Queue serverQueue = server.locateQueue(queueName);
+ Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue::getMessageCount);
+
+ server.stop();
+ server.start();
+
+ serverQueue = server.locateQueue(queueName);
+ Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue::getMessageCount);
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName));
+ connection.start();
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = (TextMessage)consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("hello" + i, message.getText());
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ session.commit();
+ }
+
+ Wait.assertEquals(0, serverQueue::getMessageCount);
+ Wait.assertFalse(serverQueue.getPagingStore()::isPaging);
+
+ server.stop();
+ server.start();
+ Assert.assertNull(server.locateQueue(queueName));
+ }
+}