This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 9dd026118e19e9a07b89661412c4ab48a08f73ad
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Jun 16 22:53:07 2022 -0400

    ARTEMIS-3862 Short lived subscriptiong makes address size inconsistent
---
 .../artemis/core/server/impl/QueueImpl.java        |  46 ++++++
 .../client/RemoveSubscriptionRaceTest.java         | 172 +++++++++++++++++++++
 .../tests/integration/paging/PagingTest.java       |   2 +-
 3 files changed, 219 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e17fd6cb92..4068eafbcc 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2910,6 +2910,37 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       }
    }
 
+   /** This method is to only be used during deliveryAsync when the queue was 
destroyed
+    and the async process left more messages to be delivered
+    This is a race between destroying the queue and async sends that came after
+    the deleteQueue already happened. */
+   private void removeMessagesWhileDelivering() throws Exception {
+      assert queueDestroyed : "Method to be used only when the queue was 
destroyed";
+      Transaction tx = new TransactionImpl(storageManager);
+      int txCount = 0;
+
+      try (LinkedListIterator<MessageReference> iter = iterator()) {
+         while (iter.hasNext()) {
+            MessageReference ref = iter.next();
+
+            if (ref.isPaged()) {
+               // this means the queue is being removed
+               // hence paged references are just going away through
+               // page cleanup
+               continue;
+            }
+            acknowledge(tx, ref, AckReason.KILLED, null);
+            iter.remove();
+            refRemoved(ref);
+            txCount++;
+         }
+
+         if (txCount > 0) {
+            tx.commit();
+         }
+      }
+   }
+
    /**
     * This method will deliver as many messages as possible until all 
consumers are busy or there
     * are no more matching or available messages.
@@ -2960,6 +2991,18 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
          synchronized (this) {
 
+            if (queueDestroyed) {
+               if (messageReferences.size() == 0) {
+                  return false;
+               }
+               try {
+                  removeMessagesWhileDelivering();
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+               return false;
+            }
+
             // Need to do these checks inside the synchronized
             if (isPaused() || !canDispatch()) {
                return false;
@@ -3109,6 +3152,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    private void checkDepage() {
+      if (queueDestroyed) {
+         return;
+      }
       if (pageIterator != null && pageSubscription.isPaging() && 
!depagePending && needsDepage() && pageIterator.tryNext() != 
PageIterator.NextResult.noElements) {
          scheduleDepage(false);
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java
new file mode 100644
index 0000000000..0911d69e5b
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoveSubscriptionRaceTest extends ActiveMQTestBase {
+
+
+   private static final String SUB_NAME = "SubscriptionStressTest";
+
+   ActiveMQServer server;
+
+   @Before
+   public void setServer() throws Exception {
+   }
+
+   @Test
+   public void testCreateSubscriptionCoreNoFiles() throws Exception {
+      internalTest("core", false, 5, 1000);
+   }
+
+   @Test
+   public void testCreateSubscriptionAMQPNoFiles() throws Exception {
+      internalTest("amqp", false, 5, 1000);
+   }
+
+   @Test
+   public void testCreateSubscriptionCoreRealFiles() throws Exception {
+      internalTest("core", true, 2, 200);
+   }
+
+   @Test
+   public void testCreateSubscriptionAMQPRealFiles() throws Exception {
+      internalTest("amqp", true, 2, 200);
+   }
+
+   public void internalTest(String protocol, boolean realFiles, int threads, 
int numberOfMessages) throws Exception {
+      server = createServer(realFiles, true);
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(SUB_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().addQueueConfiguration(new 
QueueConfiguration().setName("Sub_1").setAddress(SUB_NAME).setRoutingType(RoutingType.MULTICAST));
+      server.start();
+
+      CountDownLatch runningLatch = new CountDownLatch(threads);
+      AtomicBoolean running = new AtomicBoolean(true);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(Math.max(1, threads)); // I'm using the max here, 
because I may set threads=0 while hacking the test
+
+      runAfter(() -> executorService.shutdownNow());
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+
+      CyclicBarrier flagStart = new CyclicBarrier(threads + 1);
+
+      for (int i = 0; i < threads; i++) {
+         executorService.execute(() -> {
+            try {
+               flagStart.await(10, TimeUnit.SECONDS);
+               for (int n = 0; n < numberOfMessages && running.get(); n++) {
+                  Connection connection = factory.createConnection();
+                  connection.start();
+                  Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                  Topic topic = session.createTopic(SUB_NAME);
+                  MessageConsumer consumer = session.createConsumer(topic);
+                  Message message = consumer.receiveNoWait();
+                  if (message != null) {
+                     message.acknowledge();
+                  }
+                  connection.close();
+               }
+            } catch (Throwable e) {
+               e.printStackTrace();
+               errors.incrementAndGet();
+
+            } finally {
+               runningLatch.countDown();
+            }
+         });
+      }
+
+      Connection connection = factory.createConnection();
+      connection.start();
+
+      Queue queue = server.locateQueue("Sub_1");
+      Assert.assertNotNull(queue);
+
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Topic topic = session.createTopic(SUB_NAME);
+      MessageProducer producer = session.createProducer(topic);
+      MessageConsumer consumer = 
session.createConsumer(session.createQueue(SUB_NAME + "::" + "Sub_1"));
+
+      flagStart.await(10, TimeUnit.SECONDS);
+      try {
+         for (int i = 0; i < numberOfMessages; i++) {
+            producer.send(session.createTextMessage("a"));
+            Assert.assertNotNull(consumer.receive(5000));
+         }
+         connection.close();
+      } finally {
+         running.set(false);
+         Assert.assertTrue(runningLatch.await(10, TimeUnit.SECONDS));
+      }
+
+      Wait.assertEquals(0, this::countAddMessage, 5000, 100);
+
+      Wait.assertEquals(0L, queue.getPagingStore()::getAddressSize, 2000, 100);
+   }
+
+   int countAddMessage() throws Exception {
+      StorageManager manager = server.getStorageManager();
+
+      if (manager instanceof JournalStorageManager) {
+         JournalStorageManager journalStorageManager = (JournalStorageManager) 
manager;
+         
journalStorageManager.getMessageJournal().scheduleCompactAndBlock(5_000);
+      } else {
+         return 0;
+      }
+
+      HashMap<Integer, AtomicInteger> journalCounts = 
countJournal(server.getConfiguration());
+      AtomicInteger value = journalCounts.get((int) 
JournalRecordIds.ADD_MESSAGE_PROTOCOL);
+      if (value == null) {
+         return 0;
+      }
+      return value.get();
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 6e1549294f..1214df64f1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -2738,7 +2738,7 @@ public class PagingTest extends ActiveMQTestBase {
       for (int msgCount = 0; msgCount < numberOfMessages; msgCount++) {
          log.debug("Received " + msgCount);
          msgReceived++;
-         ClientMessage msg = consumer.receiveImmediate();
+         ClientMessage msg = consumer.receive(5000);
          if (msg == null) {
             log.debug("It's null. leaving now");
             sessionConsumer.commit();

Reply via email to