This is an automated email from the ASF dual-hosted git repository.

brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 673481369f ARTEMIS-4241 paging + FQQN is broken
673481369f is described below

commit 673481369f2a197098b728ef4c55ea16d3faa070
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Apr 13 22:06:48 2023 -0500

    ARTEMIS-4241 paging + FQQN is broken
---
 .../core/paging/impl/PagingManagerImpl.java        |  6 +-
 .../tests/integration/paging/PagingTest.java       | 74 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 2 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 237bb1e944..18d96168db 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -41,6 +41,7 @@ import 
org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.SizeAwareMetric;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.collections.LongHashSet;
@@ -393,7 +394,7 @@ public final class PagingManagerImpl implements 
PagingManager {
    public void deletePageStore(final SimpleString storeName) throws Exception {
       syncLock.readLock().lock();
       try {
-         PagingStore store = stores.remove(storeName);
+         PagingStore store = 
stores.remove(CompositeAddress.extractAddressName(storeName));
          if (store != null) {
             store.stop();
             store.destroy();
@@ -407,7 +408,8 @@ public final class PagingManagerImpl implements 
PagingManager {
     * This method creates a new store if not exist.
     */
    @Override
-   public PagingStore getPageStore(final SimpleString storeName) throws 
Exception {
+   public PagingStore getPageStore(final SimpleString rawStoreName) throws 
Exception {
+      final SimpleString storeName = 
CompositeAddress.extractAddressName(rawStoreName);
       if (managementAddress != null && 
storeName.startsWith(managementAddress)) {
          return null;
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 0da8db5dd5..fd48e412fd 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -115,6 +115,7 @@ import 
org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.RetryRule;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.junit.After;
@@ -1393,6 +1394,79 @@ public class PagingTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testFqqn() throws Exception {
+      final SimpleString queue = RandomUtil.randomSimpleString();
+      SimpleString fqqn = CompositeAddress.toFullyQualified(ADDRESS, queue);
+      boolean persistentMessages = true;
+
+      clearDataRecreateServerDirs();
+
+      Configuration config = 
createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      server = createServer(true, config, PagingTest.PAGE_SIZE, 
PagingTest.PAGE_MAX, -1, -1);
+
+      server.start();
+
+      final int numberOfMessages = 1000;
+
+      locator = 
createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+      sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, false, false);
+
+      session.createQueue(new 
QueueConfiguration(fqqn).setRoutingType(RoutingType.ANYCAST));
+
+      ClientProducer producer = session.createProducer(fqqn);
+
+      ClientMessage message = null;
+
+      byte[] body = new byte[MESSAGE_SIZE];
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         message = session.createMessage(persistentMessages);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         message.putIntProperty(new SimpleString("id"), i);
+
+         producer.send(message);
+         if (i % 1000 == 0) {
+            session.commit();
+         }
+      }
+
+      session.commit();
+
+      
Wait.assertTrue(server.getPagingManager().getPageStore(ADDRESS)::isPaging, 
5000, 100);
+      assertEquals(ADDRESS, 
server.getPagingManager().getPageStore(ADDRESS).getAddress());
+
+      session.start();
+
+      ClientConsumer consumer = session.createConsumer(fqqn);
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         message = consumer.receive(5000);
+         assertNotNull(message);
+         message.acknowledge();
+
+         assertEquals(i, message.getIntProperty("id").intValue());
+         if (i % 1000 == 0) {
+            session.commit();
+         }
+      }
+
+      session.commit();
+
+      
Wait.assertFalse(server.getPagingManager().getPageStore(ADDRESS)::isPaging, 
5000, 100);
+
+      server.getPagingManager().deletePageStore(fqqn);
+      
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(ADDRESS));
+   }
+
    @Test
    public void testPurge() throws Exception {
       clearDataRecreateServerDirs();

Reply via email to