This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit e47d8ea7c15e9dbdf450478b79db948e18f04f31
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed May 15 23:26:30 2024 -0400

    ARTEMIS-4774 Fixing PageCounters out of sync after AckMnager retries
---
 .../protocol/amqp/connect/mirror/AckManager.java   |  38 ++++---
 .../artemis/core/paging/PagingManager.java         |   4 +
 .../core/paging/cursor/PageSubscription.java       |  13 +--
 .../cursor/impl/PageCounterRebuildManager.java     |  11 +-
 .../cursor/impl/PageSubscriptionCounterImpl.java   |   6 +-
 .../paging/cursor/impl/PageSubscriptionImpl.java   |  79 +++----------
 .../core/paging/impl/PagingManagerImpl.java        |  13 +++
 .../core/server/ScheduledDeliveryHandler.java      |   8 ++
 .../artemis/core/server/impl/QueueImpl.java        |  18 ++-
 .../core/server/impl/QueueMessageMetrics.java      | 125 ++++++++++++---------
 .../server/impl/ScheduledDeliveryHandlerImpl.java  |  20 ++++
 .../integration/amqp/connect/AckManagerTest.java   | 106 ++++++++++++++++-
 .../metrics/JournalPendingMessageTest.java         |  12 +-
 .../mirror/SingleMirrorSoakTest.java               |  52 ++++++---
 .../{parameters.sh => longrun-parameters.sh}       |  16 ++-
 tests/soak-tests/src/test/scripts/parameters.sh    |  13 ++-
 16 files changed, 336 insertions(+), 198 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index 51f88edad8..7bae95cad2 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -202,9 +202,8 @@ public class AckManager implements ActiveMQComponent {
 
 
    // to be used with the same executor as the PagingStore executor
-   public boolean retryAddress(SimpleString address, 
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
+   public void retryAddress(SimpleString address, 
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
       MirrorController previousController = 
AMQPMirrorControllerTarget.getControllerInUse();
-      boolean retriedPaging = false;
       logger.trace("retrying address {} on server {}", address, server);
       try {
          
AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController);
@@ -219,14 +218,12 @@ public class AckManager implements ActiveMQComponent {
                   logger.trace("Retry stopped while reading page {} on address 
{} as the outcome is now empty, server={}", pageId, address, server);
                   break;
                }
-               Page page = store.usePage(pageId, true, false);
+               Page page = openPage(store, pageId);
                if (page == null) {
                   continue;
                }
                try {
-                  if (retryPage(acksToRetry, page, key)) {
-                     retriedPaging = true;
-                  }
+                  retryPage(acksToRetry, address, page, key);
                } finally {
                   page.usageDown();
                }
@@ -241,7 +238,17 @@ public class AckManager implements ActiveMQComponent {
       } finally {
          AMQPMirrorControllerTarget.setControllerInUse(previousController);
       }
-      return retriedPaging;
+   }
+
+   private Page openPage(PagingStore store, long pageID) throws Throwable {
+      Page page = store.newPageObject(pageID);
+      if (page.getFile().exists()) {
+         page.getMessages();
+         return page;
+      } else {
+         return null;
+      }
+
    }
 
    private void validateExpiredSet(LongObjectHashMap<JournalHashMap<AckRetry, 
AckRetry, Queue>> queuesToRetry) {
@@ -265,10 +272,11 @@ public class AckManager implements ActiveMQComponent {
       }
    }
 
-   private boolean retryPage(LongObjectHashMap<JournalHashMap<AckRetry, 
AckRetry, Queue>> queuesToRetry,
+   private void retryPage(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, 
Queue>> queuesToRetry,
+                          SimpleString address,
                           Page page,
                           AckRetry key) throws Exception {
-      AtomicBoolean retriedPaging = new AtomicBoolean(false);
+      logger.debug("scanning for acks on page {} on address {}", 
page.getPageId(), address);
       TransactionImpl transaction = new 
TransactionImpl(server.getStorageManager()).setAsync(true);
       // scan each page for acks
       page.getMessages().forEach(pagedMessage -> {
@@ -297,8 +305,8 @@ public class AckManager implements ActiveMQComponent {
                      if (!subscription.isAcked(pagedMessage)) {
                         PagedReference reference = 
retries.getContext().getPagingStore().getCursorProvider().newReference(pagedMessage,
 subscription);
                         try {
-                           subscription.ackTx(transaction, reference);
-                           retriedPaging.set(true);
+                           subscription.ackTx(transaction, reference, false);
+                           subscription.getQueue().postAcknowledge(reference, 
ackRetry.getReason(), false);
                         } catch (Exception e) {
                            logger.warn(e.getMessage(), e);
                            if (ioCriticalErrorListener != null) {
@@ -325,8 +333,6 @@ public class AckManager implements ActiveMQComponent {
             ioCriticalErrorListener.onIOException(e, e.getMessage(), null);
          }
       }
-
-      return retriedPaging.get();
    }
 
    /** returns true if there are retries ready to be scanned on paging */
@@ -430,8 +436,6 @@ public class AckManager implements ActiveMQComponent {
 
       Iterator<Map.Entry<SimpleString, 
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>>> retryIterator;
 
-      boolean retriedPaging = false;
-
 
       MultiStepProgress(HashMap<SimpleString, 
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> retryList) {
          this.retryList = retryList;
@@ -453,9 +457,7 @@ public class AckManager implements ActiveMQComponent {
 
                PagingStore pagingStore = 
server.getPagingManager().getPageStore(entry.getKey());
                pagingStore.execute(() -> {
-                  if (AckManager.this.retryAddress(entry.getKey(), 
entry.getValue())) {
-                     retriedPaging = true;
-                  }
+                  AckManager.this.retryAddress(entry.getKey(), 
entry.getValue());
                   nextStep();
                });
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
index faddee07eb..e17fb80334 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
@@ -177,4 +177,8 @@ public interface PagingManager extends ActiveMQComponent, 
HierarchicalRepository
    default void forEachTransaction(BiConsumer<Long, PageTransactionInfo> 
transactionConsumer) {
    }
 
+   default boolean isRebuildingCounters() {
+      return false;
+   }
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index 970c545b86..5cccd69fb7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -87,10 +87,13 @@ public interface PageSubscription {
    // for internal (cursor) classes
    void confirmPosition(PagePosition ref) throws Exception;
 
-   void ackTx(Transaction tx, PagedReference position) throws Exception;
+   void ackTx(Transaction tx, PagedReference position, boolean fromDelivery) 
throws Exception;
 
+   default void ackTx(Transaction tx, PagedReference position) throws 
Exception {
+      ackTx(tx, position, true);
+   }
    // for internal (cursor) classes
-   void confirmPosition(Transaction tx, PagePosition position) throws 
Exception;
+   void confirmPosition(Transaction tx, PagePosition position, boolean 
fromDelivery) throws Exception;
 
       /**
        * @return the first page in use or MAX_LONG if none is in use
@@ -158,12 +161,6 @@ public interface PageSubscription {
     */
    void onDeletePage(Page deletedPage) throws Exception;
 
-   long getDeliveredCount();
-
-   long getDeliveredSize();
-
-   void incrementDeliveredSize(long size);
-
    void removePendingDelivery(PagedMessage pagedMessage);
 
    ConsumedPage locatePageInfo(long pageNr);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
index e4cac16264..1f60af52af 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
@@ -44,7 +44,6 @@ import java.util.function.BiConsumer;
 
 /** this class will copy current data from the Subscriptions, count messages 
while the server is already active
  * performing other activity */
-// TODO: Rename this as RebuildManager in a future major version
 public class PageCounterRebuildManager implements Runnable {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -90,7 +89,7 @@ public class PageCounterRebuildManager implements Runnable {
             Page currentPage = store.getCurrentPage();
             limitPageId = store.getCurrentWritingPage();
             limitMessageNr = currentPage.getNumberOfMessages();
-            if (logger.isDebugEnabled()) {
+            if (logger.isTraceEnabled()) {
                logger.trace("PageCounterRebuild for {}, Current writing page 
{} and limit will be {} with lastMessage on last page={}", 
store.getStoreName(), store.getCurrentWritingPage(), limitPageId, 
limitMessageNr);
             }
          } catch (Exception e) {
@@ -139,14 +138,6 @@ public class PageCounterRebuildManager implements Runnable 
{
       }
    }
 
-   private synchronized PageSubscriptionCounter getCounter(long queueID) {
-      CopiedSubscription copiedSubscription = 
copiedSubscriptionMap.get(queueID);
-      if (copiedSubscription != null) {
-         return copiedSubscription.subscriptionCounter;
-      } else {
-         return null;
-      }
-   }
 
    private CopiedSubscription getSubscription(long queueID) {
       return copiedSubscriptionMap.get(queueID);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index cb4b9750c7..b77ad7e589 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -183,10 +183,10 @@ public class PageSubscriptionCounterImpl extends 
BasePagingCounter {
       recordedSizeUpdater.set(this, size);
       valueUpdater.set(this, value);
       persistentSizeUpdater.set(this, size);
-      addedUpdater.set(this, size);
+      addedUpdater.set(this, value);
    }
 
-   private void process(int add, long size) {
+   private void process(final int add, final long size) {
       if (logger.isTraceEnabled()) {
          logger.trace("process subscription={} add={}, size={}", 
subscriptionID, add, size);
       }
@@ -203,7 +203,7 @@ public class PageSubscriptionCounterImpl extends 
BasePagingCounter {
       }
 
       if (isRebuilding()) {
-         recordedValueUpdater.addAndGet(this, value);
+         recordedValueUpdater.addAndGet(this, add);
          recordedSizeUpdater.addAndGet(this, size);
       }
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index b043b08ebc..c8e2b70a70 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -28,7 +28,6 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -105,9 +104,6 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
 
    private final PageSubscriptionCounter counter;
 
-   private final AtomicLong deliveredCount = new AtomicLong(0);
-
-   private final AtomicLong deliveredSize = new AtomicLong(0);
 
    PageSubscriptionImpl(final PageCursorProvider cursorProvider,
                         final PagingStore pageStore,
@@ -186,7 +182,7 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
       if (empty) {
          return 0;
       } else {
-         return counter.getValue() - deliveredCount.get();
+         return counter.getValue();
       }
    }
 
@@ -202,7 +198,7 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
       } else {
          //A negative value could happen if an old journal was loaded that 
didn't have
          //size metrics for old records
-         long messageSize = counter.getPersistentSize() - deliveredSize.get();
+         long messageSize = counter.getPersistentSize();
          return messageSize > 0 ? messageSize : 0;
       }
    }
@@ -399,29 +395,21 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
    }
 
    @Override
-   public void confirmPosition(final Transaction tx, final PagePosition 
position) throws Exception {
+   public void confirmPosition(final Transaction tx, final PagePosition 
position, boolean fromDelivery) throws Exception {
       // if the cursor is persistent
       if (persistent) {
          store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, 
position);
       }
-      installTXCallback(tx, position);
+      installTXCallback(tx, position, fromDelivery);
 
    }
 
-   private void confirmPosition(final Transaction tx, final PagePosition 
position, final long persistentSize) throws Exception {
-      // if the cursor is persistent
-      if (persistent) {
-         store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, 
position);
-      }
-      installTXCallback(tx, position, persistentSize);
-   }
-
    @Override
-   public void ackTx(final Transaction tx, final PagedReference reference) 
throws Exception {
+   public void ackTx(final Transaction tx, final PagedReference reference, 
boolean fromDelivery) throws Exception {
       //pre-calculate persistentSize
       final long persistentSize = getPersistentSize(reference);
 
-      confirmPosition(tx, reference.getPagedMessage().newPositionObject(), 
persistentSize);
+      confirmPosition(tx, reference.getPagedMessage().newPositionObject(), 
true);
 
       counter.increment(tx, -1, -persistentSize);
 
@@ -584,8 +572,7 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
 
    @Override
    public void reloadPreparedACK(final Transaction tx, final PagePosition 
position) {
-      deliveredCount.incrementAndGet();
-      installTXCallback(tx, position);
+      installTXCallback(tx, position, true);
 
       try {
          counter.increment(tx, -1, -position.getPersistentSize());
@@ -838,16 +825,7 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
       return info;
    }
 
-   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
-      installTXCallback(tx, position, -1);
-   }
-
-   /**
-    * @param tx
-    * @param position
-    * @param persistentSize if negative it needs to be calculated on the fly
-    */
-   private void installTXCallback(final Transaction tx, final PagePosition 
position, final long persistentSize) {
+   private void installTXCallback(final Transaction tx, final PagePosition 
position, final boolean fromDelivery) {
       if (position.getRecordID() >= 0) {
          // It needs to persist, otherwise the cursor will return to the fist 
page position
          tx.setContainsPersistent();
@@ -862,7 +840,7 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
          PageCursorTX cursorTX = (PageCursorTX) 
tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
 
          if (cursorTX == null) {
-            cursorTX = new PageCursorTX();
+            cursorTX = new PageCursorTX(fromDelivery);
             tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS, 
cursorTX);
             tx.addOperation(cursorTX);
          }
@@ -1118,6 +1096,12 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
 
    private final class PageCursorTX extends TransactionOperationAbstract {
 
+      private boolean fromDelivery;
+
+      PageCursorTX(boolean fromDelivery) {
+         this.fromDelivery = fromDelivery;
+      }
+
       private final Map<PageSubscriptionImpl, List<PagePosition>> 
pendingPositions = new HashMap<>();
 
       private void addPositionConfirmation(final PageSubscriptionImpl cursor, 
final PagePosition position) {
@@ -1140,8 +1124,6 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
 
             for (PagePosition confirmed : positions) {
                cursor.processACK(confirmed);
-               cursor.deliveredCount.decrementAndGet();
-               cursor.deliveredSize.addAndGet(-confirmed.getPersistentSize());
             }
 
          }
@@ -1435,7 +1417,6 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
 
       @Override
       public void remove() {
-         deliveredCount.incrementAndGet();
          PagedReference delivery = currentDelivery;
          if (delivery != null) {
             PageCursorInfo info = 
PageSubscriptionImpl.this.getPageInfo(delivery.getPagedMessage().getPageNumber());
@@ -1455,36 +1436,6 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
       }
    }
 
-   /**
-    * @return the deliveredCount
-    */
-   @Override
-   public long getDeliveredCount() {
-      return deliveredCount.get();
-   }
-
-   /**
-    * @return the deliveredSize
-    */
-   @Override
-   public long getDeliveredSize() {
-      return deliveredSize.get();
-   }
-
-   @Override
-   public void incrementDeliveredSize(long size) {
-      deliveredSize.addAndGet(size);
-   }
-
-   private long getPersistentSize(PagedMessage msg) {
-      try {
-         return msg != null && msg.getPersistentSize() > 0 ? 
msg.getPersistentSize() : 0;
-      } catch (ActiveMQException e) {
-         logger.warn("Error computing persistent size of message: {}", msg, e);
-         return 0;
-      }
-   }
-
    private long getPersistentSize(PagedReference ref) {
       try {
          return ref != null && ref.getPersistentSize() > 0 ? 
ref.getPersistentSize() : 0;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 2a62108e9d..eed6b2afea 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -479,6 +479,14 @@ public final class PagingManagerImpl implements 
PagingManager {
       return started;
    }
 
+   private volatile boolean rebuildingPageCounters;
+
+
+   @Override
+   public boolean isRebuildingCounters() {
+      return rebuildingPageCounters;
+   }
+
    @Override
    public void start() throws Exception {
       lock();
@@ -589,6 +597,9 @@ public final class PagingManagerImpl implements 
PagingManager {
 
    @Override
    public Future<Object> rebuildCounters(Set<Long> storedLargeMessages) {
+      if (rebuildingPageCounters) {
+         logger.debug("Rebuild page counters is already underway, ignoring 
call");
+      }
       Map<Long, PageTransactionInfo> transactionsSet = new LongObjectHashMap();
       // making a copy
       transactions.forEach((a, b) -> {
@@ -617,6 +628,8 @@ public final class PagingManagerImpl implements 
PagingManager {
       FutureTask<Object> task = new FutureTask<>(() -> null);
       managerExecutor.execute(task);
 
+      managerExecutor.execute(() -> rebuildingPageCounters = false);
+
       return task;
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
index f8a5005676..1967a406f6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
@@ -28,12 +28,20 @@ public interface ScheduledDeliveryHandler {
 
    int getScheduledCount();
 
+   int getNonPagedScheduledCount();
+
    long getScheduledSize();
 
+   long getNonPagedScheduledSize();
+
    int getDurableScheduledCount();
 
+   int getNonPagedDurableScheduledCount();
+
    long getDurableScheduledSize();
 
+   long getNonPagedDurableScheduledSize();
+
    MessageReference peekFirstScheduledMessage();
 
    List<MessageReference> getScheduledReferences();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a2c77fd7af..226ffe02ee 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1820,11 +1820,10 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       if (pageSubscription != null) {
          // messageReferences will have depaged messages which we need to 
discount from the counter as they are
          // counted on the pageSubscription as well
-         long returnValue = (long) pendingMetrics.getMessageCount() + 
getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
-         if (logger.isTraceEnabled()) {
-            logger.trace("Queue={}/{} returning getMessageCount \n\treturning 
{}. \n\tpendingMetrics.getMessageCount() = {}, \n\tgetScheduledCount() = {}, 
\n\tpageSubscription.getMessageCount()={}, 
\n\tpageSubscription.getCounter().getValue()={}, 
\n\tpageSubscription.getDeliveredCount()={}",
-                        name, id, returnValue, 
pendingMetrics.getMessageCount(), getScheduledCount(), 
pageSubscription.getMessageCount(), pageSubscription.getCounter().getValue(),
-                        pageSubscription.getDeliveredCount());
+         long returnValue = (long) pendingMetrics.getNonPagedMessageCount() + 
scheduledDeliveryHandler.getNonPagedScheduledCount() + 
deliveringMetrics.getNonPagedMessageCount() + 
pageSubscription.getMessageCount();
+         if (logger.isDebugEnabled()) {
+            logger.debug("Queue={}/{} returning getMessageCount \n\treturning 
{}. \n\tpendingMetrics.getMessageCount() = {}, \n\tgetScheduledCount() = {}, 
\n\tpageSubscription.getMessageCount()={}, 
\n\tpageSubscription.getCounter().getValue()={}",
+                        name, id, returnValue, 
pendingMetrics.getMessageCount(),  
scheduledDeliveryHandler.getNonPagedScheduledCount(), 
pageSubscription.getMessageCount(), pageSubscription.getCounter().getValue());
          }
          return returnValue;
       } else {
@@ -1837,7 +1836,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       if (pageSubscription != null) {
          // messageReferences will have depaged messages which we need to 
discount from the counter as they are
          // counted on the pageSubscription as well
-         return pendingMetrics.getPersistentSize() + getScheduledSize() + 
getDeliveringSize() + pageSubscription.getPersistentSize();
+         return pendingMetrics.getNonPagedPersistentSize() + 
scheduledDeliveryHandler.getNonPagedScheduledSize() + 
deliveringMetrics.getNonPagedPersistentSize() + 
pageSubscription.getPersistentSize();
       } else {
          return pendingMetrics.getPersistentSize() + getScheduledSize() + 
getDeliveringSize();
       }
@@ -1847,7 +1846,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    public long getDurableMessageCount() {
       if (isDurable()) {
          if (pageSubscription != null) {
-            return (long) pendingMetrics.getDurableMessageCount() + 
getDurableScheduledCount() + getDurableDeliveringCount() + 
pageSubscription.getMessageCount();
+            return (long) pendingMetrics.getNonPagedDurableMessageCount() + 
scheduledDeliveryHandler.getNonPagedDurableScheduledCount() + 
deliveringMetrics.getNonPagedDurableMessageCount() + 
pageSubscription.getMessageCount();
          } else {
             return (long) pendingMetrics.getDurableMessageCount() + 
getDurableScheduledCount() + getDurableDeliveringCount();
          }
@@ -1859,7 +1858,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    public long getDurablePersistentSize() {
       if (isDurable()) {
          if (pageSubscription != null) {
-            return pendingMetrics.getDurablePersistentSize() + 
getDurableScheduledSize() + getDurableDeliveringSize() + 
pageSubscription.getPersistentSize();
+            return pendingMetrics.getDurablePersistentSize() + 
scheduledDeliveryHandler.getNonPagedDurableScheduledSize() + 
deliveringMetrics.getNonPagedDurablePersistentSize() + 
pageSubscription.getPersistentSize();
          } else {
             return pendingMetrics.getDurablePersistentSize() + 
getDurableScheduledSize() + getDurableDeliveringSize();
          }
@@ -3496,9 +3495,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             }
             addTail(reference, false);
             pageIterator.remove();
-
-            //We have to increment this here instead of in the iterator so we 
have access to the reference from next()
-            
pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
          }
 
          if (logger.isDebugEnabled()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueMessageMetrics.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueMessageMetrics.java
index 20e68d40cc..28f7599be9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueMessageMetrics.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueMessageMetrics.java
@@ -35,23 +35,43 @@ public class QueueMessageMetrics {
    private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> 
COUNT_UPDATER =
          AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, 
"messageCount");
 
+   private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> 
COUNT_UPDATER_PAGED =
+      AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, 
"messageCountPaged");
+
    private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> 
DURABLE_COUNT_UPDATER =
          AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, 
"durableMessageCount");
 
+   private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> 
DURABLE_COUNT_UPDATER_PAGED =
+      AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, 
"durableMessageCountPaged");
+
    private static final AtomicLongFieldUpdater<QueueMessageMetrics> 
SIZE_UPDATER =
          AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, 
"persistentSize");
 
+   private static final AtomicLongFieldUpdater<QueueMessageMetrics> 
SIZE_UPDATER_PAGED =
+      AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, 
"persistentSizePaged");
+
    private static final AtomicLongFieldUpdater<QueueMessageMetrics> 
DURABLE_SIZE_UPDATER =
          AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, 
"durablePersistentSize");
 
+   private static final AtomicLongFieldUpdater<QueueMessageMetrics> 
DURABLE_SIZE_UPDATER_PAGED =
+      AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, 
"durablePersistentSizePaged");
+
    private volatile int messageCount;
 
+   private volatile int messageCountPaged;
+
    private volatile long persistentSize;
 
+   private volatile long persistentSizePaged;
+
    private volatile int durableMessageCount;
 
+   private volatile int durableMessageCountPaged;
+
    private volatile long durablePersistentSize;
 
+   private volatile long durablePersistentSizePaged;
+
    private final Queue queue;
 
    private final String name;
@@ -64,89 +84,99 @@ public class QueueMessageMetrics {
 
    public void incrementMetrics(final MessageReference reference) {
       long size = getPersistentSize(reference);
-      COUNT_UPDATER.incrementAndGet(this);
-      if (logger.isDebugEnabled()) {
-         logger.debug("{} increment messageCount to {}: {}", this, 
messageCount, reference);
-      }
-      SIZE_UPDATER.addAndGet(this, size);
-      if (queue.isDurable() && reference.isDurable()) {
-         DURABLE_COUNT_UPDATER.incrementAndGet(this);
-         DURABLE_SIZE_UPDATER.addAndGet(this, size);
+      if (reference.isPaged()) {
+         COUNT_UPDATER_PAGED.incrementAndGet(this);
+         if (logger.isDebugEnabled()) {
+            logger.debug("{} paged messageCountPaged to {}: {}", this, 
messageCountPaged, reference);
+         }
+         SIZE_UPDATER_PAGED.addAndGet(this, size);
+         if (queue.isDurable() && reference.isDurable()) {
+            DURABLE_COUNT_UPDATER_PAGED.incrementAndGet(this);
+            DURABLE_SIZE_UPDATER_PAGED.addAndGet(this, size);
+         }
+      } else {
+         COUNT_UPDATER.incrementAndGet(this);
+         if (logger.isDebugEnabled()) {
+            logger.debug("{} increment messageCount to {}: {}", this, 
messageCount, reference);
+         }
+         SIZE_UPDATER.addAndGet(this, size);
+         if (queue.isDurable() && reference.isDurable()) {
+            DURABLE_COUNT_UPDATER.incrementAndGet(this);
+            DURABLE_SIZE_UPDATER.addAndGet(this, size);
+         }
       }
    }
 
    public void decrementMetrics(final MessageReference reference) {
       long size = -getPersistentSize(reference);
-      COUNT_UPDATER.decrementAndGet(this);
-      if (logger.isDebugEnabled()) {
-         logger.debug("{} decrement messageCount to {}: {}", this, 
messageCount, reference);
-      }
-      SIZE_UPDATER.addAndGet(this, size);
-      if (queue.isDurable() && reference.isDurable()) {
-         DURABLE_COUNT_UPDATER.decrementAndGet(this);
-         DURABLE_SIZE_UPDATER.addAndGet(this, size);
+      if (reference.isPaged()) {
+         COUNT_UPDATER_PAGED.decrementAndGet(this);
+         if (logger.isDebugEnabled()) {
+            logger.debug("{} decrement messageCount to {}: {}", this, 
messageCountPaged, reference);
+         }
+         SIZE_UPDATER_PAGED.addAndGet(this, size);
+         if (queue.isDurable() && reference.isDurable()) {
+            DURABLE_COUNT_UPDATER_PAGED.decrementAndGet(this);
+            DURABLE_SIZE_UPDATER_PAGED.addAndGet(this, size);
+         }
+      } else {
+         COUNT_UPDATER.decrementAndGet(this);
+         if (logger.isDebugEnabled()) {
+            logger.debug("{} decrement messageCount to {}: {}", this, 
messageCount, reference);
+         }
+         SIZE_UPDATER.addAndGet(this, size);
+         if (queue.isDurable() && reference.isDurable()) {
+            DURABLE_COUNT_UPDATER.decrementAndGet(this);
+            DURABLE_SIZE_UPDATER.addAndGet(this, size);
+         }
       }
    }
 
 
-
-   /**
-    * @return the messageCount
-    */
-   public int getMessageCount() {
+   public int getNonPagedMessageCount() {
       return messageCount;
    }
 
    /**
-    * @param messageCount the messageCount to set
+    * @return the messageCount
     */
-   public void setMessageCount(int messageCount) {
-      this.messageCount = messageCount;
+   public int getMessageCount() {
+      return messageCount + messageCountPaged;
    }
-
    /**
     * @return the persistentSize
     */
    public long getPersistentSize() {
+      return persistentSize + persistentSizePaged;
+   }
+
+   public long getNonPagedPersistentSize() {
       return persistentSize;
    }
 
-   /**
-    * @param persistentSize the persistentSize to set
-    */
-   public void setPersistentSize(long persistentSize) {
-      this.persistentSize = persistentSize;
+   public long getNonPagedDurablePersistentSize() {
+      return durablePersistentSize;
    }
 
    /**
     * @return the durableMessageCount
     */
    public int getDurableMessageCount() {
-      return durableMessageCount;
+      return durableMessageCount + durableMessageCountPaged;
    }
 
-   /**
-    * @param durableMessageCount the durableMessageCount to set
-    */
-   public void setDurableMessageCount(int durableMessageCount) {
-      this.durableMessageCount = durableMessageCount;
+   public int getNonPagedDurableMessageCount() {
+      return durableMessageCount;
    }
 
    /**
     * @return the durablePersistentSize
     */
    public long getDurablePersistentSize() {
-      return durablePersistentSize;
-   }
-
-   /**
-    * @param durablePersistentSize the durablePersistentSize to set
-    */
-   public void setDurablePersistentSize(long durablePersistentSize) {
-      this.durablePersistentSize = durablePersistentSize;
+      return durablePersistentSize + durablePersistentSizePaged;
    }
 
-   private long getPersistentSize(final MessageReference reference) {
+   private static long getPersistentSize(final MessageReference reference) {
       long size = 0;
 
       try {
@@ -158,9 +188,4 @@ public class QueueMessageMetrics {
       return size;
    }
 
-   @Override
-   public String toString() {
-      return "QueuePendingMessageMetrics[queue=" + queue.getName() + ", name=" 
+ name + "]";
-   }
-
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
index b8e1418a79..1e848bc793 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
@@ -96,21 +96,41 @@ public class ScheduledDeliveryHandlerImpl implements 
ScheduledDeliveryHandler {
       return metrics.getMessageCount();
    }
 
+   @Override
+   public int getNonPagedScheduledCount() {
+      return metrics.getNonPagedMessageCount();
+   }
+
    @Override
    public int getDurableScheduledCount() {
       return metrics.getDurableMessageCount();
    }
 
+   @Override
+   public int getNonPagedDurableScheduledCount() {
+      return metrics.getNonPagedDurableMessageCount();
+   }
+
    @Override
    public long getScheduledSize() {
       return metrics.getPersistentSize();
    }
 
+   @Override
+   public long getNonPagedScheduledSize() {
+      return metrics.getNonPagedPersistentSize();
+   }
+
    @Override
    public long getDurableScheduledSize() {
       return metrics.getDurablePersistentSize();
    }
 
+   @Override
+   public long getNonPagedDurableScheduledSize() {
+      return metrics.getNonPagedPersistentSize();
+   }
+
    @Override
    public List<MessageReference> getScheduledReferences() {
       List<MessageReference> refs = new LinkedList<>();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
index 077764b4e4..ddcd76c1a4 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
@@ -76,7 +76,7 @@ public class AckManagerTest extends ActiveMQTestBase {
       super.setUp();
 
       server1 = createServer(true, createDefaultConfig(0, true), 100024, -1, 
-1, -1);
-      server1.getConfiguration().addAddressSetting(SNF_NAME, new 
AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1));
+      server1.getConfiguration().addAddressSetting(SNF_NAME, new 
AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1).setMaxReadPageMessages(20));
       server1.getConfiguration().getAcceptorConfigurations().clear();
       server1.getConfiguration().addAcceptorConfiguration("server", 
"tcp://localhost:61616");
       server1.start();
@@ -289,6 +289,110 @@ public class AckManagerTest extends ActiveMQTestBase {
    }
 
 
+
+   @Test
+   public void testRetryFromPaging() throws Throwable {
+
+      String protocol = "AMQP";
+
+      SimpleString TOPIC_NAME = SimpleString.toSimpleString("tp" + 
RandomUtil.randomString());
+
+      server1.addAddressInfo(new 
AddressInfo(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+
+      ConnectionFactory connectionFactory = 
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+
+      // creating 5 subscriptions
+      for (int i = 0; i < 2; i++) {
+         try (Connection connection = connectionFactory.createConnection()) {
+            connection.setClientID("c" + i);
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Topic topic = session.createTopic(TOPIC_NAME.toString());
+            session.createDurableSubscriber(topic, "s" + i);
+         }
+      }
+
+      int numberOfMessages = 15000;
+      int numberOfAcksC0 = 100;
+      int numberOfAcksC1 = 14999;
+
+      String c0s0Name = "c0.s0";
+      String c1s1Name = "c1.s1";
+
+      final Queue c0s0 = server1.locateQueue(c0s0Name);
+      Assert.assertNotNull(c0s0);
+      final Queue c1s1 = server1.locateQueue(c1s1Name);
+      Assert.assertNotNull(c1s1);
+
+      PagingStore store = server1.getPagingManager().getPageStore(TOPIC_NAME);
+      store.startPaging();
+
+      try (Connection connection = connectionFactory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic(TOPIC_NAME.toString());
+         MessageProducer producer = session.createProducer(topic);
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            Message m = session.createTextMessage("hello " + i);
+            m.setIntProperty("i", i);
+            producer.send(m);
+            if ((i + 1) % 100 == 0) {
+               c1s1.pause();
+               c0s0.pause();
+               session.commit();
+            }
+         }
+         session.commit();
+      }
+
+      ReferenceIDSupplier referenceIDSupplier = new 
ReferenceIDSupplier(server1);
+
+      {
+         AckManager ackManager = AckManagerProvider.getManager(server1);
+         ackManager.stop();
+
+         AtomicInteger counter = new AtomicInteger(0);
+
+         for (long pageID = store.getFirstPage(); pageID <= 
store.getCurrentWritingPage(); pageID++) {
+            Page page = store.usePage(pageID);
+            try {
+               page.getMessages().forEach(pagedMessage -> {
+                  int increment = counter.incrementAndGet();
+                  if (increment <= numberOfAcksC0) {
+                     
ackManager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), 
c0s0, referenceIDSupplier.getID(pagedMessage.getMessage()), AckReason.NORMAL);
+                  }
+                  if (increment <= numberOfAcksC1) {
+                     
ackManager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), 
c1s1, referenceIDSupplier.getID(pagedMessage.getMessage()), AckReason.NORMAL);
+                  }
+               });
+            } finally {
+               page.usageDown();
+            }
+         }
+      }
+
+      server1.stop();
+
+      server1.start();
+
+
+      Queue c0s0AfterRestart = server1.locateQueue(c0s0Name);
+      Assert.assertNotNull(c0s0AfterRestart);
+      Queue c1s1AfterRestart = server1.locateQueue(c1s1Name);
+      Assert.assertNotNull(c1s1AfterRestart);
+
+      Wait.assertEquals(numberOfMessages - numberOfAcksC1, 
c1s1AfterRestart::getMessageCount, 10_000);
+      Wait.assertEquals(numberOfAcksC1, 
c1s1AfterRestart::getMessagesAcknowledged, 10_000);
+      Wait.assertEquals(numberOfMessages - numberOfAcksC0, 
c0s0AfterRestart::getMessageCount, 10_000);
+      Wait.assertEquals(numberOfAcksC0, 
c0s0AfterRestart::getMessagesAcknowledged, 10_000);
+
+      server1.stop();
+
+      Assert.assertEquals(0, AckManagerProvider.getSize());
+   }
+
+
+
+
    private int getCounter(byte typeRecord, HashMap<Integer, AtomicInteger> 
values) {
       AtomicInteger value = values.get((int) typeRecord);
       if (value == null) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
index 26328fd102..073c805056 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
@@ -185,8 +185,8 @@ public class JournalPendingMessageTest extends 
AbstractPersistentStatTestSupport
 
       AtomicLong publishedMessageSize = new AtomicLong();
 
-      publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, 
publishedMessageSize);
-      verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+      publishTestQueueMessages(10, DeliveryMode.NON_PERSISTENT, 
publishedMessageSize);
+      verifyPendingStats(defaultQueueName, 10, publishedMessageSize.get());
       verifyPendingDurableStats(defaultQueueName, 0, 0);
    }
 
@@ -196,10 +196,10 @@ public class JournalPendingMessageTest extends 
AbstractPersistentStatTestSupport
       AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
       AtomicLong publishedMessageSize = new AtomicLong();
 
-      publishTestQueueMessages(100, DeliveryMode.PERSISTENT, 
publishedMessageSize);
-      publishTestQueueMessages(100, DeliveryMode.NON_PERSISTENT, 
publishedNonPersistentMessageSize);
-      verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get() + 
publishedNonPersistentMessageSize.get());
-      verifyPendingDurableStats(defaultQueueName, 100, 
publishedMessageSize.get());
+      publishTestQueueMessages(5, DeliveryMode.PERSISTENT, 
publishedMessageSize);
+      publishTestQueueMessages(10, DeliveryMode.NON_PERSISTENT, 
publishedNonPersistentMessageSize);
+      verifyPendingStats(defaultQueueName, 15, publishedMessageSize.get() + 
publishedNonPersistentMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 5, 
publishedMessageSize.get());
    }
 
    @Test
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
index 196bbc4767..fd75f66c9f 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
@@ -29,6 +29,7 @@ import java.io.StringWriter;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -61,11 +62,16 @@ public class SingleMirrorSoakTest extends SoakTestBase {
 
    // Set this to true and log4j will be configured with some relevant 
log.trace for the AckManager at the server's
    private static final boolean TRACE_LOGS = 
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", 
"false"));
-   private static final int NUMBER_MESSAGES = 
TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_500);
+   private static final int NUMBER_MESSAGES = 
TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_000);
+
+   // By default consuming 90% of the messages
+   private static final int NUMBER_MESSAGES_RECEIVE = 
TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES_RECEIVE", 1_800);
    private static final int RECEIVE_COMMIT = 
TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT", 100);
    private static final int SEND_COMMIT = 
TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
-   private static final int KILL_INTERNAL =  
TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
-   private static final int SNF_TIMEOUT =  
TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
+
+   // If -1 means to never kill the target broker
+   private static final int KILL_INTERVAL =  
TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 1_000);
+   private static final int SNF_TIMEOUT =  
TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 300_000);
    private static final int GENERAL_WAIT_TIMEOUT =  
TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
 
    /*
@@ -134,14 +140,14 @@ public class SingleMirrorSoakTest extends SoakTestBase {
       brokerProperties.put("AMQPConnections." + connectionName + ".type", 
AMQPBrokerConnectionAddressType.MIRROR.toString());
       brokerProperties.put("AMQPConnections." + connectionName + 
".connectionElements.mirror.sync", "false");
       brokerProperties.put("largeMessageSync", "false");
-      brokerProperties.put("mirrorAckManagerPageAttempts", "10");
-      brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
+      //brokerProperties.put("mirrorAckManagerPageAttempts", "20");
+      //brokerProperties.put("mirrorAckManagerRetryDelay", "100");
 
       if (paging) {
          brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
-         brokerProperties.put("addressSettings.#.maxReadPageMessages", "1000");
+         brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
          brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
-         brokerProperties.put("addressSettings.#.prefetchPageMessages", "100");
+         brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
          // un-comment this line if you want to rather use the work around 
without the fix on the PostOfficeImpl
          // brokerProperties.put("addressSettings.#.iDCacheSize", "1000");
       }
@@ -162,6 +168,12 @@ public class SingleMirrorSoakTest extends SoakTestBase {
             + "logger.ack.level=TRACE\n"
             + 
"logger.config.name=org.apache.activemq.artemis.core.config.impl.ConfigurationImpl\n"
             + "logger.config.level=TRACE\n"
+            + 
"logger.counter.name=org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl\n"
+            + "logger.counter.level=DEBUG\n"
+            + 
"logger.queue.name=org.apache.activemq.artemis.core.server.impl.QueueImpl\n"
+            + "logger.queue.level=DEBUG\n"
+            + 
"logger.rebuild.name=org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager\n"
+            + "logger.rebuild.level=DEBUG\n"
             + "appender.console.filter.threshold.type = ThresholdFilter\n"
             + "appender.console.filter.threshold.level = info"));
       }
@@ -187,7 +199,7 @@ public class SingleMirrorSoakTest extends SoakTestBase {
       startServers();
 
 
-      Assert.assertTrue(KILL_INTERNAL > SEND_COMMIT);
+      Assert.assertTrue(KILL_INTERVAL > SEND_COMMIT || KILL_INTERVAL < 0);
 
       String clientIDA = "nodeA";
       String clientIDB = "nodeB";
@@ -212,18 +224,23 @@ public class SingleMirrorSoakTest extends SoakTestBase {
 
       ExecutorService executorService = Executors.newFixedThreadPool(3);
       runAfter(executorService::shutdownNow);
+      CountDownLatch consumerDone = new CountDownLatch(2);
       executorService.execute(() -> {
          try {
-            consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 
NUMBER_MESSAGES, true, false, RECEIVE_COMMIT);
+            consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 
NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT);
          } catch (Exception e) {
             logger.warn(e.getMessage(), e);
+         } finally {
+            consumerDone.countDown();
          }
       });
       executorService.execute(() -> {
          try {
-            consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, 
NUMBER_MESSAGES, true, false, RECEIVE_COMMIT);
+            consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, 
NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT);
          } catch (Exception e) {
             logger.warn(e.getMessage(), e);
+         } finally {
+            consumerDone.countDown();
          }
       });
 
@@ -243,7 +260,7 @@ public class SingleMirrorSoakTest extends SoakTestBase {
                logger.info("Sent {} messages", i);
                session.commit();
             }
-            if (i > 0 && i % KILL_INTERNAL == 0) {
+            if (KILL_INTERVAL > 0 && i > 0 && i % KILL_INTERVAL == 0) {
                restartExeuctor.execute(() -> {
                   if (running.get()) {
                      try {
@@ -265,12 +282,14 @@ public class SingleMirrorSoakTest extends SoakTestBase {
          running.set(false);
       }
 
+      consumerDone.await(SNF_TIMEOUT, TimeUnit.MILLISECONDS);
+
       Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), 
SNF_TIMEOUT);
       Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), 
SNF_TIMEOUT);
-      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
-      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
-      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
-      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
+      Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> 
getCount(managementDC1, clientIDA + "." + subscriptionID), 
GENERAL_WAIT_TIMEOUT);
+      Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> 
getCount(managementDC1, clientIDB + "." + subscriptionID), 
GENERAL_WAIT_TIMEOUT);
+      Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> 
getCount(managementDC2, clientIDA + "." + subscriptionID), 
GENERAL_WAIT_TIMEOUT);
+      Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> 
getCount(managementDC2, clientIDB + "." + subscriptionID), 
GENERAL_WAIT_TIMEOUT);
 
       destroyServers();
 
@@ -337,9 +356,10 @@ public class SingleMirrorSoakTest extends SoakTestBase {
       }
    }
 
-   public long getCount(SimpleManagement simpleManagement, String queue) 
throws Exception {
+   public long getCount(SimpleManagement simpleManagement, String queue) {
       try {
          long value = simpleManagement.getMessageCountOnQueue(queue);
+         logger.info("Queue {} count = {}", queue, value);
          return value;
       } catch (Exception e) {
          logger.warn(e.getMessage(), e);
diff --git a/tests/soak-tests/src/test/scripts/parameters.sh 
b/tests/soak-tests/src/test/scripts/longrun-parameters.sh
similarity index 91%
copy from tests/soak-tests/src/test/scripts/parameters.sh
copy to tests/soak-tests/src/test/scripts/longrun-parameters.sh
index ddbe3aa2e6..5ab2705f9a 100755
--- a/tests/soak-tests/src/test/scripts/parameters.sh
+++ b/tests/soak-tests/src/test/scripts/longrun-parameters.sh
@@ -16,7 +16,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# this script contains a suggest set of variables to run the soak tests.
+# use this script for larger parameters, possibly when using larger machines
+
 
 ## Generic variable:
 # Some tests will support saving the producer's state before consumption. If 
you set this variable these tests will hold a zip file and recover it 
approprieatedly.
@@ -144,8 +145,11 @@ export 
TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200
 export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10
 
 export TEST_SINGLE_MIRROR_SOAK_TRACE_LOGS=false
-export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=2500
-export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=100
-export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=100
-export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=500
-export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000
\ No newline at end of file
+export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=500000
+export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES_RECEIVE=400000
+export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=500
+export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=1000
+export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=50000
+export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000000
+export TEST_SINGLE_MIRROR_SOAK_GENERAL_TIMEOUT=100000
+export TEST_SINGLE_MIRROR_SOAK_CONSUMER_PROCESSING_TIME=10
diff --git a/tests/soak-tests/src/test/scripts/parameters.sh 
b/tests/soak-tests/src/test/scripts/parameters.sh
index ddbe3aa2e6..f53e46b6e3 100755
--- a/tests/soak-tests/src/test/scripts/parameters.sh
+++ b/tests/soak-tests/src/test/scripts/parameters.sh
@@ -144,8 +144,11 @@ export 
TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200
 export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10
 
 export TEST_SINGLE_MIRROR_SOAK_TRACE_LOGS=false
-export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=2500
-export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=100
-export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=100
-export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=500
-export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000
\ No newline at end of file
+export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=50000
+export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES_RECEIVE=40000
+export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=500
+export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=1000
+export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=10000
+export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000
+export TEST_SINGLE_MIRROR_SOAK_GENERAL_TIMEOUT=10000
+export TEST_SINGLE_MIRROR_SOAK_CONSUMER_PROCESSING_TIME=10

Reply via email to