This is an automated email from the ASF dual-hosted git repository.
brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 673481369f ARTEMIS-4241 paging + FQQN is broken
673481369f is described below
commit 673481369f2a197098b728ef4c55ea16d3faa070
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Apr 13 22:06:48 2023 -0500
ARTEMIS-4241 paging + FQQN is broken
---
.../core/paging/impl/PagingManagerImpl.java | 6 +-
.../tests/integration/paging/PagingTest.java | 74 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 2 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 237bb1e944..18d96168db 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -41,6 +41,7 @@ import
org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
@@ -393,7 +394,7 @@ public final class PagingManagerImpl implements
PagingManager {
public void deletePageStore(final SimpleString storeName) throws Exception {
syncLock.readLock().lock();
try {
- PagingStore store = stores.remove(storeName);
+ PagingStore store =
stores.remove(CompositeAddress.extractAddressName(storeName));
if (store != null) {
store.stop();
store.destroy();
@@ -407,7 +408,8 @@ public final class PagingManagerImpl implements
PagingManager {
* This method creates a new store if not exist.
*/
@Override
- public PagingStore getPageStore(final SimpleString storeName) throws
Exception {
+ public PagingStore getPageStore(final SimpleString rawStoreName) throws
Exception {
+ final SimpleString storeName =
CompositeAddress.extractAddressName(rawStoreName);
if (managementAddress != null &&
storeName.startsWith(managementAddress)) {
return null;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 0da8db5dd5..fd48e412fd 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -115,6 +115,7 @@ import
org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.RetryRule;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.junit.After;
@@ -1393,6 +1394,79 @@ public class PagingTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testFqqn() throws Exception {
+ final SimpleString queue = RandomUtil.randomSimpleString();
+ SimpleString fqqn = CompositeAddress.toFullyQualified(ADDRESS, queue);
+ boolean persistentMessages = true;
+
+ clearDataRecreateServerDirs();
+
+ Configuration config =
createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX, -1, -1);
+
+ server.start();
+
+ final int numberOfMessages = 1000;
+
+ locator =
createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(new
QueueConfiguration(fqqn).setRoutingType(RoutingType.ANYCAST));
+
+ ClientProducer producer = session.createProducer(fqqn);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[MESSAGE_SIZE];
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ message = session.createMessage(persistentMessages);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0) {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+
Wait.assertTrue(server.getPagingManager().getPageStore(ADDRESS)::isPaging,
5000, 100);
+ assertEquals(ADDRESS,
server.getPagingManager().getPageStore(ADDRESS).getAddress());
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(fqqn);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ assertEquals(i, message.getIntProperty("id").intValue());
+ if (i % 1000 == 0) {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+
Wait.assertFalse(server.getPagingManager().getPageStore(ADDRESS)::isPaging,
5000, 100);
+
+ server.getPagingManager().deletePageStore(fqqn);
+
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(ADDRESS));
+ }
+
@Test
public void testPurge() throws Exception {
clearDataRecreateServerDirs();