This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit e47d8ea7c15e9dbdf450478b79db948e18f04f31 Author: Clebert Suconic <[email protected]> AuthorDate: Wed May 15 23:26:30 2024 -0400 ARTEMIS-4774 Fixing PageCounters out of sync after AckMnager retries --- .../protocol/amqp/connect/mirror/AckManager.java | 38 ++++--- .../artemis/core/paging/PagingManager.java | 4 + .../core/paging/cursor/PageSubscription.java | 13 +-- .../cursor/impl/PageCounterRebuildManager.java | 11 +- .../cursor/impl/PageSubscriptionCounterImpl.java | 6 +- .../paging/cursor/impl/PageSubscriptionImpl.java | 79 +++---------- .../core/paging/impl/PagingManagerImpl.java | 13 +++ .../core/server/ScheduledDeliveryHandler.java | 8 ++ .../artemis/core/server/impl/QueueImpl.java | 18 ++- .../core/server/impl/QueueMessageMetrics.java | 125 ++++++++++++--------- .../server/impl/ScheduledDeliveryHandlerImpl.java | 20 ++++ .../integration/amqp/connect/AckManagerTest.java | 106 ++++++++++++++++- .../metrics/JournalPendingMessageTest.java | 12 +- .../mirror/SingleMirrorSoakTest.java | 52 ++++++--- .../{parameters.sh => longrun-parameters.sh} | 16 ++- tests/soak-tests/src/test/scripts/parameters.sh | 13 ++- 16 files changed, 336 insertions(+), 198 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 51f88edad8..7bae95cad2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -202,9 +202,8 @@ public class AckManager implements ActiveMQComponent { // to be used with the same executor as the PagingStore executor - public boolean retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) { + public void retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) { MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse(); - boolean retriedPaging = false; logger.trace("retrying address {} on server {}", address, server); try { AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController); @@ -219,14 +218,12 @@ public class AckManager implements ActiveMQComponent { logger.trace("Retry stopped while reading page {} on address {} as the outcome is now empty, server={}", pageId, address, server); break; } - Page page = store.usePage(pageId, true, false); + Page page = openPage(store, pageId); if (page == null) { continue; } try { - if (retryPage(acksToRetry, page, key)) { - retriedPaging = true; - } + retryPage(acksToRetry, address, page, key); } finally { page.usageDown(); } @@ -241,7 +238,17 @@ public class AckManager implements ActiveMQComponent { } finally { AMQPMirrorControllerTarget.setControllerInUse(previousController); } - return retriedPaging; + } + + private Page openPage(PagingStore store, long pageID) throws Throwable { + Page page = store.newPageObject(pageID); + if (page.getFile().exists()) { + page.getMessages(); + return page; + } else { + return null; + } + } private void validateExpiredSet(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) { @@ -265,10 +272,11 @@ public class AckManager implements ActiveMQComponent { } } - private boolean retryPage(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry, + private void retryPage(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry, + SimpleString address, Page page, AckRetry key) throws Exception { - AtomicBoolean retriedPaging = new AtomicBoolean(false); + logger.debug("scanning for acks on page {} on address {}", page.getPageId(), address); TransactionImpl transaction = new TransactionImpl(server.getStorageManager()).setAsync(true); // scan each page for acks page.getMessages().forEach(pagedMessage -> { @@ -297,8 +305,8 @@ public class AckManager implements ActiveMQComponent { if (!subscription.isAcked(pagedMessage)) { PagedReference reference = retries.getContext().getPagingStore().getCursorProvider().newReference(pagedMessage, subscription); try { - subscription.ackTx(transaction, reference); - retriedPaging.set(true); + subscription.ackTx(transaction, reference, false); + subscription.getQueue().postAcknowledge(reference, ackRetry.getReason(), false); } catch (Exception e) { logger.warn(e.getMessage(), e); if (ioCriticalErrorListener != null) { @@ -325,8 +333,6 @@ public class AckManager implements ActiveMQComponent { ioCriticalErrorListener.onIOException(e, e.getMessage(), null); } } - - return retriedPaging.get(); } /** returns true if there are retries ready to be scanned on paging */ @@ -430,8 +436,6 @@ public class AckManager implements ActiveMQComponent { Iterator<Map.Entry<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>>> retryIterator; - boolean retriedPaging = false; - MultiStepProgress(HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> retryList) { this.retryList = retryList; @@ -453,9 +457,7 @@ public class AckManager implements ActiveMQComponent { PagingStore pagingStore = server.getPagingManager().getPageStore(entry.getKey()); pagingStore.execute(() -> { - if (AckManager.this.retryAddress(entry.getKey(), entry.getValue())) { - retriedPaging = true; - } + AckManager.this.retryAddress(entry.getKey(), entry.getValue()); nextStep(); }); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java index faddee07eb..e17fb80334 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java @@ -177,4 +177,8 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository default void forEachTransaction(BiConsumer<Long, PageTransactionInfo> transactionConsumer) { } + default boolean isRebuildingCounters() { + return false; + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java index 970c545b86..5cccd69fb7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java @@ -87,10 +87,13 @@ public interface PageSubscription { // for internal (cursor) classes void confirmPosition(PagePosition ref) throws Exception; - void ackTx(Transaction tx, PagedReference position) throws Exception; + void ackTx(Transaction tx, PagedReference position, boolean fromDelivery) throws Exception; + default void ackTx(Transaction tx, PagedReference position) throws Exception { + ackTx(tx, position, true); + } // for internal (cursor) classes - void confirmPosition(Transaction tx, PagePosition position) throws Exception; + void confirmPosition(Transaction tx, PagePosition position, boolean fromDelivery) throws Exception; /** * @return the first page in use or MAX_LONG if none is in use @@ -158,12 +161,6 @@ public interface PageSubscription { */ void onDeletePage(Page deletedPage) throws Exception; - long getDeliveredCount(); - - long getDeliveredSize(); - - void incrementDeliveredSize(long size); - void removePendingDelivery(PagedMessage pagedMessage); ConsumedPage locatePageInfo(long pageNr); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java index e4cac16264..1f60af52af 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java @@ -44,7 +44,6 @@ import java.util.function.BiConsumer; /** this class will copy current data from the Subscriptions, count messages while the server is already active * performing other activity */ -// TODO: Rename this as RebuildManager in a future major version public class PageCounterRebuildManager implements Runnable { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -90,7 +89,7 @@ public class PageCounterRebuildManager implements Runnable { Page currentPage = store.getCurrentPage(); limitPageId = store.getCurrentWritingPage(); limitMessageNr = currentPage.getNumberOfMessages(); - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { logger.trace("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr); } } catch (Exception e) { @@ -139,14 +138,6 @@ public class PageCounterRebuildManager implements Runnable { } } - private synchronized PageSubscriptionCounter getCounter(long queueID) { - CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID); - if (copiedSubscription != null) { - return copiedSubscription.subscriptionCounter; - } else { - return null; - } - } private CopiedSubscription getSubscription(long queueID) { return copiedSubscriptionMap.get(queueID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java index cb4b9750c7..b77ad7e589 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java @@ -183,10 +183,10 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter { recordedSizeUpdater.set(this, size); valueUpdater.set(this, value); persistentSizeUpdater.set(this, size); - addedUpdater.set(this, size); + addedUpdater.set(this, value); } - private void process(int add, long size) { + private void process(final int add, final long size) { if (logger.isTraceEnabled()) { logger.trace("process subscription={} add={}, size={}", subscriptionID, add, size); } @@ -203,7 +203,7 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter { } if (isRebuilding()) { - recordedValueUpdater.addAndGet(this, value); + recordedValueUpdater.addAndGet(this, add); recordedSizeUpdater.addAndGet(this, size); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index b043b08ebc..c8e2b70a70 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -28,7 +28,6 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -105,9 +104,6 @@ public final class PageSubscriptionImpl implements PageSubscription { private final PageSubscriptionCounter counter; - private final AtomicLong deliveredCount = new AtomicLong(0); - - private final AtomicLong deliveredSize = new AtomicLong(0); PageSubscriptionImpl(final PageCursorProvider cursorProvider, final PagingStore pageStore, @@ -186,7 +182,7 @@ public final class PageSubscriptionImpl implements PageSubscription { if (empty) { return 0; } else { - return counter.getValue() - deliveredCount.get(); + return counter.getValue(); } } @@ -202,7 +198,7 @@ public final class PageSubscriptionImpl implements PageSubscription { } else { //A negative value could happen if an old journal was loaded that didn't have //size metrics for old records - long messageSize = counter.getPersistentSize() - deliveredSize.get(); + long messageSize = counter.getPersistentSize(); return messageSize > 0 ? messageSize : 0; } } @@ -399,29 +395,21 @@ public final class PageSubscriptionImpl implements PageSubscription { } @Override - public void confirmPosition(final Transaction tx, final PagePosition position) throws Exception { + public void confirmPosition(final Transaction tx, final PagePosition position, boolean fromDelivery) throws Exception { // if the cursor is persistent if (persistent) { store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position); } - installTXCallback(tx, position); + installTXCallback(tx, position, fromDelivery); } - private void confirmPosition(final Transaction tx, final PagePosition position, final long persistentSize) throws Exception { - // if the cursor is persistent - if (persistent) { - store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position); - } - installTXCallback(tx, position, persistentSize); - } - @Override - public void ackTx(final Transaction tx, final PagedReference reference) throws Exception { + public void ackTx(final Transaction tx, final PagedReference reference, boolean fromDelivery) throws Exception { //pre-calculate persistentSize final long persistentSize = getPersistentSize(reference); - confirmPosition(tx, reference.getPagedMessage().newPositionObject(), persistentSize); + confirmPosition(tx, reference.getPagedMessage().newPositionObject(), true); counter.increment(tx, -1, -persistentSize); @@ -584,8 +572,7 @@ public final class PageSubscriptionImpl implements PageSubscription { @Override public void reloadPreparedACK(final Transaction tx, final PagePosition position) { - deliveredCount.incrementAndGet(); - installTXCallback(tx, position); + installTXCallback(tx, position, true); try { counter.increment(tx, -1, -position.getPersistentSize()); @@ -838,16 +825,7 @@ public final class PageSubscriptionImpl implements PageSubscription { return info; } - private void installTXCallback(final Transaction tx, final PagePosition position) { - installTXCallback(tx, position, -1); - } - - /** - * @param tx - * @param position - * @param persistentSize if negative it needs to be calculated on the fly - */ - private void installTXCallback(final Transaction tx, final PagePosition position, final long persistentSize) { + private void installTXCallback(final Transaction tx, final PagePosition position, final boolean fromDelivery) { if (position.getRecordID() >= 0) { // It needs to persist, otherwise the cursor will return to the fist page position tx.setContainsPersistent(); @@ -862,7 +840,7 @@ public final class PageSubscriptionImpl implements PageSubscription { PageCursorTX cursorTX = (PageCursorTX) tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS); if (cursorTX == null) { - cursorTX = new PageCursorTX(); + cursorTX = new PageCursorTX(fromDelivery); tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS, cursorTX); tx.addOperation(cursorTX); } @@ -1118,6 +1096,12 @@ public final class PageSubscriptionImpl implements PageSubscription { private final class PageCursorTX extends TransactionOperationAbstract { + private boolean fromDelivery; + + PageCursorTX(boolean fromDelivery) { + this.fromDelivery = fromDelivery; + } + private final Map<PageSubscriptionImpl, List<PagePosition>> pendingPositions = new HashMap<>(); private void addPositionConfirmation(final PageSubscriptionImpl cursor, final PagePosition position) { @@ -1140,8 +1124,6 @@ public final class PageSubscriptionImpl implements PageSubscription { for (PagePosition confirmed : positions) { cursor.processACK(confirmed); - cursor.deliveredCount.decrementAndGet(); - cursor.deliveredSize.addAndGet(-confirmed.getPersistentSize()); } } @@ -1435,7 +1417,6 @@ public final class PageSubscriptionImpl implements PageSubscription { @Override public void remove() { - deliveredCount.incrementAndGet(); PagedReference delivery = currentDelivery; if (delivery != null) { PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(delivery.getPagedMessage().getPageNumber()); @@ -1455,36 +1436,6 @@ public final class PageSubscriptionImpl implements PageSubscription { } } - /** - * @return the deliveredCount - */ - @Override - public long getDeliveredCount() { - return deliveredCount.get(); - } - - /** - * @return the deliveredSize - */ - @Override - public long getDeliveredSize() { - return deliveredSize.get(); - } - - @Override - public void incrementDeliveredSize(long size) { - deliveredSize.addAndGet(size); - } - - private long getPersistentSize(PagedMessage msg) { - try { - return msg != null && msg.getPersistentSize() > 0 ? msg.getPersistentSize() : 0; - } catch (ActiveMQException e) { - logger.warn("Error computing persistent size of message: {}", msg, e); - return 0; - } - } - private long getPersistentSize(PagedReference ref) { try { return ref != null && ref.getPersistentSize() > 0 ? ref.getPersistentSize() : 0; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 2a62108e9d..eed6b2afea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -479,6 +479,14 @@ public final class PagingManagerImpl implements PagingManager { return started; } + private volatile boolean rebuildingPageCounters; + + + @Override + public boolean isRebuildingCounters() { + return rebuildingPageCounters; + } + @Override public void start() throws Exception { lock(); @@ -589,6 +597,9 @@ public final class PagingManagerImpl implements PagingManager { @Override public Future<Object> rebuildCounters(Set<Long> storedLargeMessages) { + if (rebuildingPageCounters) { + logger.debug("Rebuild page counters is already underway, ignoring call"); + } Map<Long, PageTransactionInfo> transactionsSet = new LongObjectHashMap(); // making a copy transactions.forEach((a, b) -> { @@ -617,6 +628,8 @@ public final class PagingManagerImpl implements PagingManager { FutureTask<Object> task = new FutureTask<>(() -> null); managerExecutor.execute(task); + managerExecutor.execute(() -> rebuildingPageCounters = false); + return task; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java index f8a5005676..1967a406f6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java @@ -28,12 +28,20 @@ public interface ScheduledDeliveryHandler { int getScheduledCount(); + int getNonPagedScheduledCount(); + long getScheduledSize(); + long getNonPagedScheduledSize(); + int getDurableScheduledCount(); + int getNonPagedDurableScheduledCount(); + long getDurableScheduledSize(); + long getNonPagedDurableScheduledSize(); + MessageReference peekFirstScheduledMessage(); List<MessageReference> getScheduledReferences(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index a2c77fd7af..226ffe02ee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1820,11 +1820,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (pageSubscription != null) { // messageReferences will have depaged messages which we need to discount from the counter as they are // counted on the pageSubscription as well - long returnValue = (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount(); - if (logger.isTraceEnabled()) { - logger.trace("Queue={}/{} returning getMessageCount \n\treturning {}. \n\tpendingMetrics.getMessageCount() = {}, \n\tgetScheduledCount() = {}, \n\tpageSubscription.getMessageCount()={}, \n\tpageSubscription.getCounter().getValue()={}, \n\tpageSubscription.getDeliveredCount()={}", - name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(), pageSubscription.getCounter().getValue(), - pageSubscription.getDeliveredCount()); + long returnValue = (long) pendingMetrics.getNonPagedMessageCount() + scheduledDeliveryHandler.getNonPagedScheduledCount() + deliveringMetrics.getNonPagedMessageCount() + pageSubscription.getMessageCount(); + if (logger.isDebugEnabled()) { + logger.debug("Queue={}/{} returning getMessageCount \n\treturning {}. \n\tpendingMetrics.getMessageCount() = {}, \n\tgetScheduledCount() = {}, \n\tpageSubscription.getMessageCount()={}, \n\tpageSubscription.getCounter().getValue()={}", + name, id, returnValue, pendingMetrics.getMessageCount(), scheduledDeliveryHandler.getNonPagedScheduledCount(), pageSubscription.getMessageCount(), pageSubscription.getCounter().getValue()); } return returnValue; } else { @@ -1837,7 +1836,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (pageSubscription != null) { // messageReferences will have depaged messages which we need to discount from the counter as they are // counted on the pageSubscription as well - return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize() + pageSubscription.getPersistentSize(); + return pendingMetrics.getNonPagedPersistentSize() + scheduledDeliveryHandler.getNonPagedScheduledSize() + deliveringMetrics.getNonPagedPersistentSize() + pageSubscription.getPersistentSize(); } else { return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize(); } @@ -1847,7 +1846,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public long getDurableMessageCount() { if (isDurable()) { if (pageSubscription != null) { - return (long) pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount() + pageSubscription.getMessageCount(); + return (long) pendingMetrics.getNonPagedDurableMessageCount() + scheduledDeliveryHandler.getNonPagedDurableScheduledCount() + deliveringMetrics.getNonPagedDurableMessageCount() + pageSubscription.getMessageCount(); } else { return (long) pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount(); } @@ -1859,7 +1858,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public long getDurablePersistentSize() { if (isDurable()) { if (pageSubscription != null) { - return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize() + pageSubscription.getPersistentSize(); + return pendingMetrics.getDurablePersistentSize() + scheduledDeliveryHandler.getNonPagedDurableScheduledSize() + deliveringMetrics.getNonPagedDurablePersistentSize() + pageSubscription.getPersistentSize(); } else { return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize(); } @@ -3496,9 +3495,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } addTail(reference, false); pageIterator.remove(); - - //We have to increment this here instead of in the iterator so we have access to the reference from next() - pageSubscription.incrementDeliveredSize(getPersistentSize(reference)); } if (logger.isDebugEnabled()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueMessageMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueMessageMetrics.java index 20e68d40cc..28f7599be9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueMessageMetrics.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueMessageMetrics.java @@ -35,23 +35,43 @@ public class QueueMessageMetrics { private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, "messageCount"); + private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> COUNT_UPDATER_PAGED = + AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, "messageCountPaged"); + private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> DURABLE_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, "durableMessageCount"); + private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> DURABLE_COUNT_UPDATER_PAGED = + AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, "durableMessageCountPaged"); + private static final AtomicLongFieldUpdater<QueueMessageMetrics> SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, "persistentSize"); + private static final AtomicLongFieldUpdater<QueueMessageMetrics> SIZE_UPDATER_PAGED = + AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, "persistentSizePaged"); + private static final AtomicLongFieldUpdater<QueueMessageMetrics> DURABLE_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, "durablePersistentSize"); + private static final AtomicLongFieldUpdater<QueueMessageMetrics> DURABLE_SIZE_UPDATER_PAGED = + AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, "durablePersistentSizePaged"); + private volatile int messageCount; + private volatile int messageCountPaged; + private volatile long persistentSize; + private volatile long persistentSizePaged; + private volatile int durableMessageCount; + private volatile int durableMessageCountPaged; + private volatile long durablePersistentSize; + private volatile long durablePersistentSizePaged; + private final Queue queue; private final String name; @@ -64,89 +84,99 @@ public class QueueMessageMetrics { public void incrementMetrics(final MessageReference reference) { long size = getPersistentSize(reference); - COUNT_UPDATER.incrementAndGet(this); - if (logger.isDebugEnabled()) { - logger.debug("{} increment messageCount to {}: {}", this, messageCount, reference); - } - SIZE_UPDATER.addAndGet(this, size); - if (queue.isDurable() && reference.isDurable()) { - DURABLE_COUNT_UPDATER.incrementAndGet(this); - DURABLE_SIZE_UPDATER.addAndGet(this, size); + if (reference.isPaged()) { + COUNT_UPDATER_PAGED.incrementAndGet(this); + if (logger.isDebugEnabled()) { + logger.debug("{} paged messageCountPaged to {}: {}", this, messageCountPaged, reference); + } + SIZE_UPDATER_PAGED.addAndGet(this, size); + if (queue.isDurable() && reference.isDurable()) { + DURABLE_COUNT_UPDATER_PAGED.incrementAndGet(this); + DURABLE_SIZE_UPDATER_PAGED.addAndGet(this, size); + } + } else { + COUNT_UPDATER.incrementAndGet(this); + if (logger.isDebugEnabled()) { + logger.debug("{} increment messageCount to {}: {}", this, messageCount, reference); + } + SIZE_UPDATER.addAndGet(this, size); + if (queue.isDurable() && reference.isDurable()) { + DURABLE_COUNT_UPDATER.incrementAndGet(this); + DURABLE_SIZE_UPDATER.addAndGet(this, size); + } } } public void decrementMetrics(final MessageReference reference) { long size = -getPersistentSize(reference); - COUNT_UPDATER.decrementAndGet(this); - if (logger.isDebugEnabled()) { - logger.debug("{} decrement messageCount to {}: {}", this, messageCount, reference); - } - SIZE_UPDATER.addAndGet(this, size); - if (queue.isDurable() && reference.isDurable()) { - DURABLE_COUNT_UPDATER.decrementAndGet(this); - DURABLE_SIZE_UPDATER.addAndGet(this, size); + if (reference.isPaged()) { + COUNT_UPDATER_PAGED.decrementAndGet(this); + if (logger.isDebugEnabled()) { + logger.debug("{} decrement messageCount to {}: {}", this, messageCountPaged, reference); + } + SIZE_UPDATER_PAGED.addAndGet(this, size); + if (queue.isDurable() && reference.isDurable()) { + DURABLE_COUNT_UPDATER_PAGED.decrementAndGet(this); + DURABLE_SIZE_UPDATER_PAGED.addAndGet(this, size); + } + } else { + COUNT_UPDATER.decrementAndGet(this); + if (logger.isDebugEnabled()) { + logger.debug("{} decrement messageCount to {}: {}", this, messageCount, reference); + } + SIZE_UPDATER.addAndGet(this, size); + if (queue.isDurable() && reference.isDurable()) { + DURABLE_COUNT_UPDATER.decrementAndGet(this); + DURABLE_SIZE_UPDATER.addAndGet(this, size); + } } } - - /** - * @return the messageCount - */ - public int getMessageCount() { + public int getNonPagedMessageCount() { return messageCount; } /** - * @param messageCount the messageCount to set + * @return the messageCount */ - public void setMessageCount(int messageCount) { - this.messageCount = messageCount; + public int getMessageCount() { + return messageCount + messageCountPaged; } - /** * @return the persistentSize */ public long getPersistentSize() { + return persistentSize + persistentSizePaged; + } + + public long getNonPagedPersistentSize() { return persistentSize; } - /** - * @param persistentSize the persistentSize to set - */ - public void setPersistentSize(long persistentSize) { - this.persistentSize = persistentSize; + public long getNonPagedDurablePersistentSize() { + return durablePersistentSize; } /** * @return the durableMessageCount */ public int getDurableMessageCount() { - return durableMessageCount; + return durableMessageCount + durableMessageCountPaged; } - /** - * @param durableMessageCount the durableMessageCount to set - */ - public void setDurableMessageCount(int durableMessageCount) { - this.durableMessageCount = durableMessageCount; + public int getNonPagedDurableMessageCount() { + return durableMessageCount; } /** * @return the durablePersistentSize */ public long getDurablePersistentSize() { - return durablePersistentSize; - } - - /** - * @param durablePersistentSize the durablePersistentSize to set - */ - public void setDurablePersistentSize(long durablePersistentSize) { - this.durablePersistentSize = durablePersistentSize; + return durablePersistentSize + durablePersistentSizePaged; } - private long getPersistentSize(final MessageReference reference) { + private static long getPersistentSize(final MessageReference reference) { long size = 0; try { @@ -158,9 +188,4 @@ public class QueueMessageMetrics { return size; } - @Override - public String toString() { - return "QueuePendingMessageMetrics[queue=" + queue.getName() + ", name=" + name + "]"; - } - } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java index b8e1418a79..1e848bc793 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java @@ -96,21 +96,41 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { return metrics.getMessageCount(); } + @Override + public int getNonPagedScheduledCount() { + return metrics.getNonPagedMessageCount(); + } + @Override public int getDurableScheduledCount() { return metrics.getDurableMessageCount(); } + @Override + public int getNonPagedDurableScheduledCount() { + return metrics.getNonPagedDurableMessageCount(); + } + @Override public long getScheduledSize() { return metrics.getPersistentSize(); } + @Override + public long getNonPagedScheduledSize() { + return metrics.getNonPagedPersistentSize(); + } + @Override public long getDurableScheduledSize() { return metrics.getDurablePersistentSize(); } + @Override + public long getNonPagedDurableScheduledSize() { + return metrics.getNonPagedPersistentSize(); + } + @Override public List<MessageReference> getScheduledReferences() { List<MessageReference> refs = new LinkedList<>(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java index 077764b4e4..ddcd76c1a4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java @@ -76,7 +76,7 @@ public class AckManagerTest extends ActiveMQTestBase { super.setUp(); server1 = createServer(true, createDefaultConfig(0, true), 100024, -1, -1, -1); - server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1)); + server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1).setMaxReadPageMessages(20)); server1.getConfiguration().getAcceptorConfigurations().clear(); server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616"); server1.start(); @@ -289,6 +289,110 @@ public class AckManagerTest extends ActiveMQTestBase { } + + @Test + public void testRetryFromPaging() throws Throwable { + + String protocol = "AMQP"; + + SimpleString TOPIC_NAME = SimpleString.toSimpleString("tp" + RandomUtil.randomString()); + + server1.addAddressInfo(new AddressInfo(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST)); + + ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + + // creating 5 subscriptions + for (int i = 0; i < 2; i++) { + try (Connection connection = connectionFactory.createConnection()) { + connection.setClientID("c" + i); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(TOPIC_NAME.toString()); + session.createDurableSubscriber(topic, "s" + i); + } + } + + int numberOfMessages = 15000; + int numberOfAcksC0 = 100; + int numberOfAcksC1 = 14999; + + String c0s0Name = "c0.s0"; + String c1s1Name = "c1.s1"; + + final Queue c0s0 = server1.locateQueue(c0s0Name); + Assert.assertNotNull(c0s0); + final Queue c1s1 = server1.locateQueue(c1s1Name); + Assert.assertNotNull(c1s1); + + PagingStore store = server1.getPagingManager().getPageStore(TOPIC_NAME); + store.startPaging(); + + try (Connection connection = connectionFactory.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME.toString()); + MessageProducer producer = session.createProducer(topic); + + for (int i = 0; i < numberOfMessages; i++) { + Message m = session.createTextMessage("hello " + i); + m.setIntProperty("i", i); + producer.send(m); + if ((i + 1) % 100 == 0) { + c1s1.pause(); + c0s0.pause(); + session.commit(); + } + } + session.commit(); + } + + ReferenceIDSupplier referenceIDSupplier = new ReferenceIDSupplier(server1); + + { + AckManager ackManager = AckManagerProvider.getManager(server1); + ackManager.stop(); + + AtomicInteger counter = new AtomicInteger(0); + + for (long pageID = store.getFirstPage(); pageID <= store.getCurrentWritingPage(); pageID++) { + Page page = store.usePage(pageID); + try { + page.getMessages().forEach(pagedMessage -> { + int increment = counter.incrementAndGet(); + if (increment <= numberOfAcksC0) { + ackManager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), c0s0, referenceIDSupplier.getID(pagedMessage.getMessage()), AckReason.NORMAL); + } + if (increment <= numberOfAcksC1) { + ackManager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), c1s1, referenceIDSupplier.getID(pagedMessage.getMessage()), AckReason.NORMAL); + } + }); + } finally { + page.usageDown(); + } + } + } + + server1.stop(); + + server1.start(); + + + Queue c0s0AfterRestart = server1.locateQueue(c0s0Name); + Assert.assertNotNull(c0s0AfterRestart); + Queue c1s1AfterRestart = server1.locateQueue(c1s1Name); + Assert.assertNotNull(c1s1AfterRestart); + + Wait.assertEquals(numberOfMessages - numberOfAcksC1, c1s1AfterRestart::getMessageCount, 10_000); + Wait.assertEquals(numberOfAcksC1, c1s1AfterRestart::getMessagesAcknowledged, 10_000); + Wait.assertEquals(numberOfMessages - numberOfAcksC0, c0s0AfterRestart::getMessageCount, 10_000); + Wait.assertEquals(numberOfAcksC0, c0s0AfterRestart::getMessagesAcknowledged, 10_000); + + server1.stop(); + + Assert.assertEquals(0, AckManagerProvider.getSize()); + } + + + + private int getCounter(byte typeRecord, HashMap<Integer, AtomicInteger> values) { AtomicInteger value = values.get((int) typeRecord); if (value == null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java index 26328fd102..073c805056 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java @@ -185,8 +185,8 @@ public class JournalPendingMessageTest extends AbstractPersistentStatTestSupport AtomicLong publishedMessageSize = new AtomicLong(); - publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize); - verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get()); + publishTestQueueMessages(10, DeliveryMode.NON_PERSISTENT, publishedMessageSize); + verifyPendingStats(defaultQueueName, 10, publishedMessageSize.get()); verifyPendingDurableStats(defaultQueueName, 0, 0); } @@ -196,10 +196,10 @@ public class JournalPendingMessageTest extends AbstractPersistentStatTestSupport AtomicLong publishedNonPersistentMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong(); - publishTestQueueMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize); - publishTestQueueMessages(100, DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize); - verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get()); - verifyPendingDurableStats(defaultQueueName, 100, publishedMessageSize.get()); + publishTestQueueMessages(5, DeliveryMode.PERSISTENT, publishedMessageSize); + publishTestQueueMessages(10, DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize); + verifyPendingStats(defaultQueueName, 15, publishedMessageSize.get() + publishedNonPersistentMessageSize.get()); + verifyPendingDurableStats(defaultQueueName, 5, publishedMessageSize.get()); } @Test diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java index 196bbc4767..fd75f66c9f 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java @@ -29,6 +29,7 @@ import java.io.StringWriter; import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -61,11 +62,16 @@ public class SingleMirrorSoakTest extends SoakTestBase { // Set this to true and log4j will be configured with some relevant log.trace for the AckManager at the server's private static final boolean TRACE_LOGS = Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", "false")); - private static final int NUMBER_MESSAGES = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_500); + private static final int NUMBER_MESSAGES = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_000); + + // By default consuming 90% of the messages + private static final int NUMBER_MESSAGES_RECEIVE = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES_RECEIVE", 1_800); private static final int RECEIVE_COMMIT = TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT", 100); private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100); - private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500); - private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000); + + // If -1 means to never kill the target broker + private static final int KILL_INTERVAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 1_000); + private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 300_000); private static final int GENERAL_WAIT_TIMEOUT = TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000); /* @@ -134,14 +140,14 @@ public class SingleMirrorSoakTest extends SoakTestBase { brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString()); brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false"); brokerProperties.put("largeMessageSync", "false"); - brokerProperties.put("mirrorAckManagerPageAttempts", "10"); - brokerProperties.put("mirrorAckManagerRetryDelay", "1000"); + //brokerProperties.put("mirrorAckManagerPageAttempts", "20"); + //brokerProperties.put("mirrorAckManagerRetryDelay", "100"); if (paging) { brokerProperties.put("addressSettings.#.maxSizeMessages", "1"); - brokerProperties.put("addressSettings.#.maxReadPageMessages", "1000"); + brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000"); brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1"); - brokerProperties.put("addressSettings.#.prefetchPageMessages", "100"); + brokerProperties.put("addressSettings.#.prefetchPageMessages", "500"); // un-comment this line if you want to rather use the work around without the fix on the PostOfficeImpl // brokerProperties.put("addressSettings.#.iDCacheSize", "1000"); } @@ -162,6 +168,12 @@ public class SingleMirrorSoakTest extends SoakTestBase { + "logger.ack.level=TRACE\n" + "logger.config.name=org.apache.activemq.artemis.core.config.impl.ConfigurationImpl\n" + "logger.config.level=TRACE\n" + + "logger.counter.name=org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl\n" + + "logger.counter.level=DEBUG\n" + + "logger.queue.name=org.apache.activemq.artemis.core.server.impl.QueueImpl\n" + + "logger.queue.level=DEBUG\n" + + "logger.rebuild.name=org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager\n" + + "logger.rebuild.level=DEBUG\n" + "appender.console.filter.threshold.type = ThresholdFilter\n" + "appender.console.filter.threshold.level = info")); } @@ -187,7 +199,7 @@ public class SingleMirrorSoakTest extends SoakTestBase { startServers(); - Assert.assertTrue(KILL_INTERNAL > SEND_COMMIT); + Assert.assertTrue(KILL_INTERVAL > SEND_COMMIT || KILL_INTERVAL < 0); String clientIDA = "nodeA"; String clientIDB = "nodeB"; @@ -212,18 +224,23 @@ public class SingleMirrorSoakTest extends SoakTestBase { ExecutorService executorService = Executors.newFixedThreadPool(3); runAfter(executorService::shutdownNow); + CountDownLatch consumerDone = new CountDownLatch(2); executorService.execute(() -> { try { - consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, NUMBER_MESSAGES, true, false, RECEIVE_COMMIT); + consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT); } catch (Exception e) { logger.warn(e.getMessage(), e); + } finally { + consumerDone.countDown(); } }); executorService.execute(() -> { try { - consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, NUMBER_MESSAGES, true, false, RECEIVE_COMMIT); + consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT); } catch (Exception e) { logger.warn(e.getMessage(), e); + } finally { + consumerDone.countDown(); } }); @@ -243,7 +260,7 @@ public class SingleMirrorSoakTest extends SoakTestBase { logger.info("Sent {} messages", i); session.commit(); } - if (i > 0 && i % KILL_INTERNAL == 0) { + if (KILL_INTERVAL > 0 && i > 0 && i % KILL_INTERVAL == 0) { restartExeuctor.execute(() -> { if (running.get()) { try { @@ -265,12 +282,14 @@ public class SingleMirrorSoakTest extends SoakTestBase { running.set(false); } + consumerDone.await(SNF_TIMEOUT, TimeUnit.MILLISECONDS); + Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT); Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT); - Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); - Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); - Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); - Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); destroyServers(); @@ -337,9 +356,10 @@ public class SingleMirrorSoakTest extends SoakTestBase { } } - public long getCount(SimpleManagement simpleManagement, String queue) throws Exception { + public long getCount(SimpleManagement simpleManagement, String queue) { try { long value = simpleManagement.getMessageCountOnQueue(queue); + logger.info("Queue {} count = {}", queue, value); return value; } catch (Exception e) { logger.warn(e.getMessage(), e); diff --git a/tests/soak-tests/src/test/scripts/parameters.sh b/tests/soak-tests/src/test/scripts/longrun-parameters.sh similarity index 91% copy from tests/soak-tests/src/test/scripts/parameters.sh copy to tests/soak-tests/src/test/scripts/longrun-parameters.sh index ddbe3aa2e6..5ab2705f9a 100755 --- a/tests/soak-tests/src/test/scripts/parameters.sh +++ b/tests/soak-tests/src/test/scripts/longrun-parameters.sh @@ -16,7 +16,8 @@ # specific language governing permissions and limitations # under the License. -# this script contains a suggest set of variables to run the soak tests. +# use this script for larger parameters, possibly when using larger machines + ## Generic variable: # Some tests will support saving the producer's state before consumption. If you set this variable these tests will hold a zip file and recover it approprieatedly. @@ -144,8 +145,11 @@ export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200 export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10 export TEST_SINGLE_MIRROR_SOAK_TRACE_LOGS=false -export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=2500 -export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=100 -export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=100 -export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=500 -export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000 \ No newline at end of file +export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=500000 +export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES_RECEIVE=400000 +export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=500 +export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=1000 +export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=50000 +export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000000 +export TEST_SINGLE_MIRROR_SOAK_GENERAL_TIMEOUT=100000 +export TEST_SINGLE_MIRROR_SOAK_CONSUMER_PROCESSING_TIME=10 diff --git a/tests/soak-tests/src/test/scripts/parameters.sh b/tests/soak-tests/src/test/scripts/parameters.sh index ddbe3aa2e6..f53e46b6e3 100755 --- a/tests/soak-tests/src/test/scripts/parameters.sh +++ b/tests/soak-tests/src/test/scripts/parameters.sh @@ -144,8 +144,11 @@ export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200 export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10 export TEST_SINGLE_MIRROR_SOAK_TRACE_LOGS=false -export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=2500 -export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=100 -export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=100 -export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=500 -export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000 \ No newline at end of file +export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=50000 +export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES_RECEIVE=40000 +export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=500 +export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=1000 +export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=10000 +export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000 +export TEST_SINGLE_MIRROR_SOAK_GENERAL_TIMEOUT=10000 +export TEST_SINGLE_MIRROR_SOAK_CONSUMER_PROCESSING_TIME=10
