This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new ec54576323 ARTEMIS-4281 Queue Reaper should not remove non empty 
queues on initial check
ec54576323 is described below

commit ec54576323ff819205a4c2e0eaf4e2bc1d8fd710
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue May 16 17:09:16 2023 -0400

    ARTEMIS-4281 Queue Reaper should not remove non empty queues on initial 
check
---
 .../core/postoffice/impl/PostOfficeImpl.java       |   4 +-
 .../integration/client/PreserveOnRestartTest.java  | 104 +++++++++++++++++++++
 2 files changed, 107 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 2b397434e2..7b7e68c336 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1955,7 +1955,9 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       getLocalQueues().forEach(queue -> {
          AddressSettings settings = 
addressSettingsRepository.getMatch(queue.getAddress().toString());
          if (!queue.isInternalQueue() && queue.isAutoDelete() && 
QueueManagerImpl.consumerCountCheck(queue) && (initialCheck || 
QueueManagerImpl.delayCheck(queue, settings)) && 
QueueManagerImpl.messageCountCheck(queue) && (initialCheck || 
queueWasUsed(queue, settings))) {
-            if (initialCheck || queue.isSwept()) {
+            // we only reap queues on the initialCheck if they are actually 
empty
+            boolean validInitialCheck = initialCheck && 
queue.getMessageCount() == 0 && !queue.getPagingStore().isPaging();
+            if (validInitialCheck || queue.isSwept()) {
                if (logger.isDebugEnabled()) {
                   if (initialCheck) {
                      logger.debug("Removing queue {} during the reload check", 
queue.getName());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PreserveOnRestartTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PreserveOnRestartTest.java
new file mode 100644
index 0000000000..0c0bc679f2
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PreserveOnRestartTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PreserveOnRestartTest extends ActiveMQTestBase {
+
+
+   private ActiveMQServer server;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, true);
+      server.getConfiguration().setAddressQueueScanPeriod(500);
+      server.getConfiguration().setMessageExpiryScanPeriod(500);
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(360000).setAutoDeleteQueuesDelay(360000).setAutoDeleteQueuesMessageCount(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeMessages(20));
+      server.start();
+   }
+
+   @Test
+   public void testRestartQueueNoPaging() throws Exception {
+      testRestartQueue(false);
+   }
+
+   @Test
+   public void testRestartQueuePaging() throws Exception {
+      testRestartQueue(true);
+   }
+
+   public void testRestartQueue(boolean paging) throws Exception {
+      int NUMBER_OF_MESSAGES = paging ? 100 : 10;
+      String queueName = getName();
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            producer.send(session.createTextMessage("hello" + i));
+         }
+         session.commit();
+      }
+      Queue serverQueue = server.locateQueue(queueName);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue::getMessageCount);
+
+      server.stop();
+      server.start();
+
+      serverQueue = server.locateQueue(queueName);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue::getMessageCount);
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
+         connection.start();
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            TextMessage message = (TextMessage)consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals("hello" + i, message.getText());
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+         session.commit();
+      }
+
+      Wait.assertEquals(0, serverQueue::getMessageCount);
+      Wait.assertFalse(serverQueue.getPagingStore()::isPaging);
+
+      server.stop();
+      server.start();
+      Assert.assertNull(server.locateQueue(queueName));
+   }
+}

Reply via email to