This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new bd2a7402cb ARTEMIS-5579 Address is unavailable to create queues during
page cleanup
bd2a7402cb is described below
commit bd2a7402cbd8ea591b64507aa0fd7199583a0f70
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Jul 17 13:04:00 2025 -0400
ARTEMIS-5579 Address is unavailable to create queues during page cleanup
---
.../paging/cursor/impl/PageCursorProviderImpl.java | 83 +++++++--------
.../artemis/tests/db/paging/PagingTest.java | 118 +++++++++++++++++++++
2 files changed, 156 insertions(+), 45 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 050f3a1314..c52a127304 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -84,16 +84,13 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
throw new IllegalStateException("Cursor " + cursorID + " had already
been created");
}
-
PageSubscriptionCounter subscriptionCounter =
createPageCounter(cursorID, persistent);
PageSubscription activeCursor = new PageSubscriptionImpl(this,
pagingStore, storageManager, filter, cursorID, persistent, subscriptionCounter);
-
activeCursors.put(cursorID, activeCursor);
return activeCursor;
}
-
private PageSubscriptionCounter createPageCounter(long cursorID, boolean
persistent) {
return new PageSubscriptionCounterImpl(storageManager, cursorID);
}
@@ -272,7 +269,6 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
// I tried to simplify the locks but each PageStore has its own lock, so
this was the best option
// I found in order to fix
https://issues.apache.org/jira/browse/ARTEMIS-3054
try (ArtemisCloseable readLock = storageManager.closeableReadLock()) {
-
while (true) {
if (pagingStore.writeLock(1_000)) {
break;
@@ -283,57 +279,54 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
logger.trace(">>>> Cleanup {}", this.pagingStore.getAddress());
- synchronized (this) {
- try {
- if (!pagingStore.isStarted()) {
- logger.trace("Paging store is not started");
- return;
- }
+ try {
+ if (!pagingStore.isStarted()) {
+ logger.trace("Paging store is not started");
+ return;
+ }
- if (!pagingStore.isPaging()) {
- logger.trace("Paging Store was not paging, so no reason to
retry the cleanup");
- return;
- }
+ if (!pagingStore.isPaging()) {
+ logger.trace("Paging Store was not paging, so no reason to
retry the cleanup");
+ return;
+ }
- List<PageSubscription> cursorList = cloneSubscriptions();
+ List<PageSubscription> cursorList = cloneSubscriptions();
- long minPage = checkMinPage(cursorList);
- final long firstPage = pagingStore.getFirstPage();
- deliverIfNecessary(cursorList, minPage);
+ long minPage = checkMinPage(cursorList);
+ final long firstPage = pagingStore.getFirstPage();
+ deliverIfNecessary(cursorList, minPage);
- if (logger.isTraceEnabled()) {
- logger.trace("firstPage={}, minPage={},
currentWritingPage={}", firstPage, minPage,
pagingStore.getCurrentWritingPage());
- }
+ if (logger.isTraceEnabled()) {
+ logger.trace("firstPage={}, minPage={}, currentWritingPage={}",
firstPage, minPage, pagingStore.getCurrentWritingPage());
+ }
- // First we cleanup regular streaming, at the beginning of set
of files
- cleanupRegularStream(depagedPages, depagedPagesSet, cursorList,
minPage, firstPage);
+ // First we cleanup regular streaming, at the beginning of set of
files
+ cleanupRegularStream(depagedPages, depagedPagesSet, cursorList,
minPage, firstPage);
- // Then we do some check on eventual pages that can be already
removed but they are away from the streaming
- cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList,
minPage, firstPage);
+ // Then we do some check on eventual pages that can be already
removed but they are away from the streaming
+ cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList,
minPage, firstPage);
- if (pagingStore.isPageFull()) {
- checkClearPageLimit();
- }
+ if (pagingStore.isPageFull()) {
+ checkClearPageLimit();
+ }
- assert pagingStore.getNumberOfPages() >= 0;
+ assert pagingStore.getNumberOfPages() >= 0;
- if (!pagingStore.hasPendingIO() &&
(pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 &&
(pagingStore.getCurrentPage() == null ||
pagingStore.getCurrentPage().getNumberOfMessages() == 0))) {
- logger.trace("StopPaging being called on {}, pending={}",
pagingStore, pagingStore.hasPendingIO());
- pagingStore.stopPaging();
- } else {
- if (logger.isTraceEnabled()) {
- logger.trace("Couldn't cleanup page on address {} as
numberOfPages == {} and currentPage.numberOfMessages = {}",
- pagingStore.getAddress(),
pagingStore.getNumberOfPages(),
pagingStore.getCurrentPage().getNumberOfMessages());
- }
+ if (!pagingStore.hasPendingIO() && (pagingStore.getNumberOfPages()
== 0 || pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() ==
null || pagingStore.getCurrentPage().getNumberOfMessages() == 0))) {
+ logger.trace("StopPaging being called on {}, pending={}",
pagingStore, pagingStore.hasPendingIO());
+ pagingStore.stopPaging();
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Couldn't cleanup page on address {} as
numberOfPages == {} and currentPage.numberOfMessages = {}",
pagingStore.getAddress(), pagingStore.getNumberOfPages(),
pagingStore.getCurrentPage().getNumberOfMessages());
}
- } catch (Throwable ex) {
-
ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(pagingStore.getAddress(),
ex);
- logger.warn(ex.getMessage(), ex);
- return;
- } finally {
- logger.trace("<<<< Cleanup end on {}",
pagingStore.getAddress());
- pagingStore.writeUnlock();
}
+ } catch (Throwable ex) {
+
ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(pagingStore.getAddress(),
ex);
+ logger.warn(ex.getMessage(), ex);
+ return;
+ } finally {
+ logger.trace("<<<< Cleanup end on {}", pagingStore.getAddress());
+ pagingStore.writeUnlock();
}
}
finishCleanup(depagedPages);
@@ -344,7 +337,7 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
* This cleanup process will calculate the min page for every cursor and
then we remove the pages based on that. if
* we knew ahead all the queues belonging to every page we could remove
this process.
*/
- private void cleanupRegularStream(List<Page> depagedPages,
+ protected void cleanupRegularStream(List<Page> depagedPages,
LongHashSet depagedPagesSet,
List<PageSubscription> cursorList,
long minPage,
diff --git
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
index 5f8e057849..633086ece1 100644
---
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
+++
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
@@ -52,6 +52,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -96,6 +97,7 @@ import
org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import
org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import
org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderTestAccessor;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
@@ -124,6 +126,7 @@ import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
@@ -3827,6 +3830,121 @@ public class PagingTest extends ParameterDBTestBase {
}
+ // The pages are complete, and this is simulating a scenario where the
server crashed before deleting the pages.
+ @TestTemplate
+ public void testCreateQueueWhileCleanup() throws Exception {
+
+ Configuration config = createDefaultInVMConfig();
+
+ CountDownLatch letGoLatch = new CountDownLatch(1);
+ runAfter(letGoLatch::countDown);
+ CountDownLatch startFlag = new CountDownLatch(1);
+ class InterruptedCursorProvider extends PageCursorProviderImpl {
+
+ InterruptedCursorProvider(PagingStore pagingStore, StorageManager
storageManager) {
+ super(pagingStore, storageManager);
+ }
+
+ @Override
+ protected void cleanupRegularStream(List<Page> depagedPages,
+ LongHashSet depagedPagesSet,
+ List<PageSubscription> cursorList,
+ long minPage,
+ long firstPage) throws Exception {
+ super.cleanupRegularStream(depagedPages, depagedPagesSet,
cursorList, minPage, firstPage);
+ startFlag.countDown();
+ for (int i = 0; i < 60; i++) {
+ if (!letGoLatch.await(1, TimeUnit.SECONDS)) {
+ logger.warn("letGoLatch still unreleased");
+ }
+ }
+ }
+ }
+
+ server = new ActiveMQServerImpl(config,
ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
+ @Override
+ protected PagingStoreFactoryNIO getPagingStoreFactory() {
+ return new PagingStoreFactoryNIO(this.getStorageManager(),
this.getConfiguration().getPagingLocation(),
this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(),
this.getExecutorFactory(),
this.getConfiguration().isJournalSyncNonTransactional(), null) {
+ @Override
+ public PageCursorProvider newCursorProvider(PagingStore store,
+ StorageManager
storageManager,
+ AddressSettings
addressSettings,
+ ArtemisExecutor
executor) {
+ return new InterruptedCursorProvider(store, storageManager);
+ }
+ };
+ }
+
+ };
+
+ addServer(server);
+
+ AddressSettings defaultSetting = new
AddressSettings().setPageSizeBytes(PagingTest.PAGE_SIZE).setMaxSizeBytes(0).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxReadPageBytes(-1);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ server.start();
+
+
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(true, true, 0);
+
+ String queue1Name = "Queue1";
+ String queue2Name = "Queue2";
+
+
+ server.addAddressInfo(new
AddressInfo(ADDRESS).addRoutingType(RoutingType.MULTICAST));
+
+
session.createQueue(QueueConfiguration.of(queue1Name).setAddress(ADDRESS).setRoutingType(RoutingType.MULTICAST));
+
+ final Queue queue1 = server.locateQueue(queue1Name);
+
+ queue1.getPageSubscription().getPagingStore().startPaging();
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message;
+
+ for (int i = 0; i < 20; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(new byte[100 * 4]);
+
+ message.putIntProperty(SimpleString.of("idi"), i);
+
+ producer.send(message);
+ session.commit();
+ }
+
+ Wait.assertTrue(queue1.getPagingStore()::isPaging);
+ Future<Boolean> doneCleanup =
queue1.getPagingStore().getCursorProvider().scheduleCleanup();
+
+ assertTrue(startFlag.await(5, TimeUnit.SECONDS));
+
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ runAfter(executorService::shutdownNow);
+ CountDownLatch created = new CountDownLatch(1);
+ executorService.execute(() -> {
+ try {
+
server.createQueue(QueueConfiguration.of(queue2Name).setAddress(ADDRESS));
+ created.countDown();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ });
+
+ assertTrue(created.await(5, TimeUnit.SECONDS));
+
+ assertNotNull(server.locateQueue(queue2Name));
+ letGoLatch.countDown();
+ assertTrue(doneCleanup.get(10, TimeUnit.SECONDS));
+
+ server.stop();
+ }
+
@TestTemplate
public void testPartialConsume() throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact