This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 9233ea2b5b ARTEMIS-5564 Avoid unecessary scans on mirror AckManager
9233ea2b5b is described below

commit 9233ea2b5b9fd476ed859ed773870905f3d61fd3
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Jul 1 13:04:10 2025 -0400

    ARTEMIS-5564 Avoid unecessary scans on mirror AckManager
    
    The AckManager should preemtively give up processing when enough records 
have been acked.
    We shouldn't also remove acks unless they were at the beginning of the list.
    And we should avoid scanning pages at all costs (scan as minimal as 
possible).
---
 .../protocol/amqp/connect/mirror/AckManager.java   | 134 +++++--
 .../persistence/impl/journal/codec/AckRetry.java   |  11 +-
 .../mirror/LargeAccumulationTest.java              | 426 +++++++++++++++++++++
 .../src/test/scripts/longrun-parameters.sh         |  13 +
 tests/soak-tests/src/test/scripts/parameters.sh    |  13 +
 5 files changed, 559 insertions(+), 38 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index 0ae57bcea3..01b2698373 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.LongSupplier;
 
 import io.netty.util.collection.LongObjectHashMap;
@@ -109,7 +110,7 @@ public class AckManager implements ActiveMQComponent {
          scheduledComponent = null;
       }
       AckManagerProvider.remove(this.server);
-      logger.debug("Stopping ackmanager on server {}", server);
+      logger.trace("Stopping ackmanager on server {}", server);
    }
 
    public synchronized void pause() {
@@ -126,8 +127,8 @@ public class AckManager implements ActiveMQComponent {
 
    @Override
    public synchronized void start() {
-      if (logger.isDebugEnabled()) {
-         logger.debug("Starting ACKManager on {} with period = {}, 
minQueueAttempts={}, maxPageAttempts={}", server, 
configuration.getMirrorAckManagerRetryDelay(), 
configuration.getMirrorAckManagerQueueAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
+      if (logger.isTraceEnabled()) {
+         logger.trace("Starting ACKManager on {} with period = {}, 
minQueueAttempts={}, maxPageAttempts={}", server, 
configuration.getMirrorAckManagerRetryDelay(), 
configuration.getMirrorAckManagerQueueAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
       }
       if (!isStarted()) {
          scheduledComponent = new 
ActiveMQScheduledComponent(server.getScheduledPool(), 
server.getExecutorFactory().getExecutor(), 
server.getConfiguration().getMirrorAckManagerRetryDelay(), 
server.getConfiguration().getMirrorAckManagerRetryDelay(), 
TimeUnit.MILLISECONDS, true) {
@@ -139,7 +140,7 @@ public class AckManager implements ActiveMQComponent {
          scheduledComponent.start();
          scheduledComponent.delay();
       } else {
-         logger.debug("Starting ignored on server {}", server);
+         logger.trace("Starting ignored on server {}", server);
       }
    }
 
@@ -198,7 +199,7 @@ public class AckManager implements ActiveMQComponent {
    }
 
    private void flushMirrorTargets() {
-      logger.debug("scanning and flushing mirror targets");
+      logger.trace("scanning and flushing mirror targets");
       List<AMQPMirrorControllerTarget> targetCopy = copyTargets();
       targetCopy.forEach(AMQPMirrorControllerTarget::flush);
    }
@@ -241,35 +242,41 @@ public class AckManager implements ActiveMQComponent {
       return retriesByAddress;
    }
 
-   private boolean isEmpty(LongObjectHashMap<JournalHashMap<AckRetry, 
AckRetry, Queue>> queuesToRetry) {
-      AtomicBoolean empty = new AtomicBoolean(true);
-
-      queuesToRetry.forEach((id, journalHashMap) -> {
-         if (!journalHashMap.isEmpty()) {
-            empty.set(false);
+   private boolean isSnapshotComplete(LongObjectHashMap<AtomicInteger> 
pendingSnapshot) {
+      AtomicBoolean complete = new AtomicBoolean(true);
+      pendingSnapshot.forEach((l, count) -> {
+         if (count.get() > 0) {
+            complete.set(false);
          }
       });
 
-      return empty.get();
+      return complete.get();
    }
 
-
-
    // to be used with the same executor as the PagingStore executor
    public void retryAddress(SimpleString address, 
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
+
+
+      // This is an optimization:
+      // we peek at how many records we currently have. When we scan all the 
records that were initially input we would
+      // interrupt the scan and start over
+      // to avoid scanning for records that will never match since new ones 
are probably "in the past" now
+      LongObjectHashMap<AtomicInteger> snapshotCount = 
buildCounterSnapshot(acksToRetry);
+
       MirrorController previousController = 
AMQPMirrorControllerTarget.getControllerInUse();
       logger.trace("retrying address {} on server {}", address, server);
       try {
          
AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController);
 
-         if (checkRetriesAndPaging(acksToRetry)) {
+         if (checkRetriesAndPaging(acksToRetry, snapshotCount)) {
             logger.trace("scanning paging for {}", address);
-            AckRetry key = new AckRetry();
 
+            boolean completeSnapshot = false;
             PagingStore store = 
server.getPagingManager().getPageStore(address);
             for (long pageId = store.getFirstPage(); pageId <= 
store.getCurrentWritingPage(); pageId++) {
-               if (isEmpty(acksToRetry)) {
-                  logger.trace("Retry stopped while reading page {} on address 
{} as the outcome is now empty, server={}", pageId, address, server);
+               if (isSnapshotComplete(snapshotCount)) {
+                  completeSnapshot = true;
+                  logger.debug("AckManager Page Scanning complete (done) on 
address {}", address);
                   break;
                }
                Page page = openPage(store, pageId);
@@ -277,12 +284,30 @@ public class AckManager implements ActiveMQComponent {
                   continue;
                }
                try {
-                  retryPage(acksToRetry, address, page, key);
+                  retryPage(snapshotCount, acksToRetry, address, page);
                } finally {
                   page.usageDown();
                }
             }
-            validateExpiredSet(address, acksToRetry);
+            if (!completeSnapshot) {
+               // completeSnapshot == true, it means that every record meant 
to be acked was used,
+               // so there is no need to check the expired set and we bypass 
this check
+               // we used to check this every page scan, but as an 
optimization we removed this unecessary step
+               validateExpiredSet(address, acksToRetry);
+
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Retry page address got to the end of the list 
without still finding a few records to acknowledge");
+                  snapshotCount.forEach((l, c) -> logger.warn("debug {} still 
have {} ack records after the scan is finished", l, c));
+                  acksToRetry.forEach((l, m) -> {
+                     logger.debug("Records on queue {}:", l);
+                     m.forEach((ack1, ack2) -> {
+                        if (ack1.getViewCount() > 0) {
+                           logger.debug("Record {}", ack1);
+                        }
+                     });
+                  });
+               }
+            }
          } else {
             logger.trace("Page Scan not required for address {}", address);
          }
@@ -296,6 +321,20 @@ public class AckManager implements ActiveMQComponent {
       }
    }
 
+   private static LongObjectHashMap<AtomicInteger> 
buildCounterSnapshot(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, 
Queue>> acksToRetry) {
+      LongObjectHashMap<AtomicInteger> snapshotCount = new 
LongObjectHashMap<>();
+      acksToRetry.forEach((l, map) -> {
+         AtomicInteger recordCount = new AtomicInteger(0);
+         snapshotCount.put(l, recordCount);
+         map.forEach((ack1, ack2) -> {
+            ack2.incrementViewCount();
+            recordCount.incrementAndGet();
+         });
+         logger.trace("Building counter snapshot for Queue {} with {} ack 
elements", l, recordCount);
+      });
+      return snapshotCount;
+   }
+
    private Page openPage(PagingStore store, long pageID) throws Throwable {
       Page page = store.newPageObject(pageID);
       if (page.getFile().exists()) {
@@ -313,7 +352,9 @@ public class AckManager implements ActiveMQComponent {
 
    private void validateExpireSet(SimpleString address, long queueID, 
JournalHashMap<AckRetry, AckRetry, Queue> retries) {
       for (AckRetry retry : retries.valuesCopy()) {
-         if (retry.getQueueAttempts() >= 
configuration.getMirrorAckManagerQueueAttempts()) {
+         // we only remove or configure to be removed if the retry was 
initially seen on the start of the process
+         // this is to avoid a race where an ACK entered the list after the 
scan been through where the element was supposed to be
+         if (retry.getViewCount() > 0 && retry.getQueueAttempts() >= 
configuration.getMirrorAckManagerQueueAttempts()) {
             if (retry.attemptedPage() >= 
configuration.getMirrorAckManagerPageAttempts()) {
                if (configuration.isMirrorAckManagerWarnUnacked()) {
                   ActiveMQAMQPProtocolLogger.LOGGER.ackRetryFailed(retry, 
address, queueID);
@@ -325,20 +366,22 @@ public class AckManager implements ActiveMQComponent {
                   mirrorRegistry.decrementMirrorAckSize();
                }
             } else {
-               if (logger.isDebugEnabled()) {
-                  logger.debug("Retry {} attempted {} times on paging, 
Configuration Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Retry {} attempted {} times on paging, 
Configuration Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
                }
             }
          } else {
-            logger.debug("Retry {} queue attempted {} times on paging, 
QueueAttempts {} Configuration Page Attempts={}", retry, 
retry.getQueueAttempts(), retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
+            logger.trace("Retry {} queue attempted {} times on paging, 
QueueAttempts {} Configuration Page Attempts={}", retry, 
retry.getQueueAttempts(), retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
          }
       }
    }
 
-   private void retryPage(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, 
Queue>> queuesToRetry,
+   private void retryPage(LongObjectHashMap<AtomicInteger> snapshotCount,
+                          LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, 
Queue>> queuesToRetry,
                           SimpleString address,
-                          Page page,
-                          AckRetry key) throws Exception {
+                          Page page) throws Exception {
+
+      AckRetry key = new AckRetry();
       logger.debug("scanning for acks on page {} on address {}", 
page.getPageId(), address);
       TransactionImpl transaction = new 
TransactionImpl(server.getStorageManager()).setAsync(true);
       // scan each page for acks
@@ -346,6 +389,7 @@ public class AckManager implements ActiveMQComponent {
          for (int i = 0; i < pagedMessage.getQueueIDs().length; i++) {
             long queueID = pagedMessage.getQueueIDs()[i];
             JournalHashMap<AckRetry, AckRetry, Queue> retries = 
queuesToRetry.get(queueID);
+            AtomicInteger snapshotOnQueue = snapshotCount.get(queueID);
             if (retries != null) {
                String serverID = 
referenceIDSupplier.getServerID(pagedMessage.getMessage());
                if (serverID == null) {
@@ -353,14 +397,11 @@ public class AckManager implements ActiveMQComponent {
                }
                long id = referenceIDSupplier.getID(pagedMessage.getMessage());
 
-               logger.trace("Looking for retry on serverID={}, id={} on 
server={}", serverID, id, server);
                key.setNodeID(serverID).setMessageID(id);
 
                AckRetry ackRetry = retries.get(key);
 
-               // we first retry messages in the queue first.
-               // this is to avoid messages that are in transit from being 
depaged into the queue
-               if (ackRetry != null && ackRetry.getQueueAttempts() > 
configuration.getMirrorAckManagerQueueAttempts()) {
+               if (ackRetry != null) {
                   Queue queue = retries.getContext();
 
                   if (queue != null) {
@@ -379,13 +420,12 @@ public class AckManager implements ActiveMQComponent {
                      }
                      if (retries.remove(ackRetry, transaction.getID()) != 
null) {
                         mirrorRegistry.decrementMirrorAckSize();
+                        decrementSnapshotCount(ackRetry, snapshotOnQueue);
                      }
                      transaction.setContainsPersistent();
                      logger.trace("retry performed ok, ackRetry={} for 
message={} on queue", ackRetry, pagedMessage);
                   }
                }
-            } else {
-               logger.trace("Retry key={} not found server={}", key, server);
             }
          }
       });
@@ -400,10 +440,28 @@ public class AckManager implements ActiveMQComponent {
       }
    }
 
+   private void decrementSnapshotCount(AckRetry retry, AtomicInteger 
queueSnapshotCount) {
+      // we check the view count as we only decrement the snapshot if the 
record was
+      // in the initial list when we started the scan
+      // otherwise we will of course ack the message since we have the record 
anyways
+      // but we won't discount it from the snapshot counter
+      if (retry.getViewCount() > 0) {
+         if (queueSnapshotCount != null) {
+            queueSnapshotCount.decrementAndGet();
+         }
+      } else {
+         // we count the initial records on the retry list
+         // however more records could still be flowing while the scan is 
being performed
+         // on this case the record entered the scan list, we can actually 
acknowledge the entry, but we should not decrement
+         // the snapshot count as it wasn't part of the initial list
+         logger.trace("Update on snapshot count ignrored for {}", retry);
+      }
+   }
+
    /**
     * {@return {@code true} if there are retries ready to be scanned on paging}
     */
-   private boolean 
checkRetriesAndPaging(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, 
Queue>> queuesToRetry) {
+   private boolean 
checkRetriesAndPaging(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, 
Queue>> queuesToRetry, LongObjectHashMap<AtomicInteger> snapshotCount) {
       boolean needScanOnPaging = false;
       Iterator<Map.Entry<Long, JournalHashMap<AckRetry, AckRetry, Queue>>> 
iter = queuesToRetry.entrySet().iterator();
 
@@ -411,11 +469,13 @@ public class AckManager implements ActiveMQComponent {
          Map.Entry<Long, JournalHashMap<AckRetry, AckRetry, Queue>> entry = 
iter.next();
          JournalHashMap<AckRetry, AckRetry, Queue> queueRetries = 
entry.getValue();
          Queue queue = queueRetries.getContext();
+         AtomicInteger queueSnapshotCount = snapshotCount.get(queue.getID());
          for (AckRetry retry : queueRetries.valuesCopy()) {
             if (ack(retry.getNodeID(), queue, retry.getMessageID(), 
retry.getReason(), false)) {
                logger.trace("Removing retry {} as the retry went ok", retry);
                queueRetries.remove(retry);
                mirrorRegistry.decrementMirrorAckSize();
+               decrementSnapshotCount(retry, queueSnapshotCount);
             } else {
                int retried = retry.attemptedQueue();
                if (logger.isTraceEnabled()) {
@@ -453,15 +513,15 @@ public class AckManager implements ActiveMQComponent {
       MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, 
messageID, referenceIDSupplier);
 
       if (reference == null) {
-         if (logger.isDebugEnabled()) {
-            logger.debug("ACK Manager could not find reference nodeID={} 
(while localID={}), messageID={} on queue {}, server={}. Adding retry with 
minQueue={}, maxPage={}, delay={}", nodeID, 
referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(), 
server, configuration.getMirrorAckManagerQueueAttempts(), 
configuration.getMirrorAckManagerPageAttempts(), 
configuration.getMirrorAckManagerRetryDelay());
+         if (logger.isTraceEnabled()) {
+            logger.trace("ACK Manager could not find reference nodeID={} 
(while localID={}), messageID={} on queue {}, server={}. Adding retry with 
minQueue={}, maxPage={}, delay={}", nodeID, 
referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(), 
server, configuration.getMirrorAckManagerQueueAttempts(), 
configuration.getMirrorAckManagerPageAttempts(), 
configuration.getMirrorAckManagerRetryDelay());
          }
 
          if (allowRetry) {
             if (configuration != null && 
configuration.isMirrorAckManagerWarnUnacked() && targetQueue.getConsumerCount() 
> 0) {
                
ActiveMQAMQPProtocolLogger.LOGGER.unackWithConsumer(targetQueue.getConsumerCount(),
 targetQueue.getName(), nodeID, messageID);
             } else {
-               logger.debug("There are {} consumers on queue {}, what made Ack 
for message with nodeID={}, messageID={} enter a retry list", 
targetQueue.getConsumerCount(), targetQueue.getName(), nodeID, messageID);
+               logger.trace("There are {} consumers on queue {}, what made Ack 
for message with nodeID={}, messageID={} enter a retry list", 
targetQueue.getConsumerCount(), targetQueue.getName(), nodeID, messageID);
             }
             addRetry(nodeID, targetQueue, messageID, reason);
          }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java
index e3cf4d11bf..9ab9653c57 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java
@@ -32,6 +32,15 @@ public final class AckRetry {
    AckReason reason;
    int pageAttempts;
    int queueAttempts;
+   int viewCount;
+
+   public int getViewCount() {
+      return viewCount;
+   }
+
+   public int incrementViewCount() {
+      return ++viewCount;
+   }
 
    private static Persister persister = new Persister();
 
@@ -41,7 +50,7 @@ public final class AckRetry {
 
    @Override
    public String toString() {
-      return "AckRetry{" + "nodeID='" + nodeID + '\'' + ", messageID=" + 
messageID + ", reason=" + reason + ", pageAttempts=" + pageAttempts + ", 
queueAttempts=" + queueAttempts + '}';
+      return "AckRetry{" + "nodeID='" + nodeID + '\'' + ", messageID=" + 
messageID + ", reason=" + reason + ", pageAttempts=" + pageAttempts + ", 
queueAttempts=" + queueAttempts + ", viewCount=" + viewCount + '}';
    }
 
    public AckRetry() {
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
new file mode 100644
index 0000000000..612460ab59
--- /dev/null
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.TestParameters;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LargeAccumulationTest extends SoakTestBase {
+
+   private static final String TEST_NAME = "LARGE_ACCUMULATION";
+   private static final String AMQP_CREDITS = 
TestParameters.testProperty(TEST_NAME, "AMQP_CREDITS", "1000");
+   private static final String MAX_PENDING_ACKS = 
TestParameters.testProperty(TEST_NAME, "MAX_PENDING_ACKS", "20");
+   private static final boolean NO_WEB = 
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "NO_WEB", "true"));
+   private static final int NUMBER_OF_THREADS = 
Integer.parseInt(TestParameters.testProperty(TEST_NAME, "THREADS", "20"));
+   private static final int NUMBER_OF_SUBSCRIPTIONS = 
Integer.parseInt(TestParameters.testProperty(TEST_NAME, 
"NUMBER_OF_SUBSCRIPTIONS", "2"));
+   private static final int NUMBER_OF_LARGE_MESSAGES = 
Integer.parseInt(TestParameters.testProperty(TEST_NAME, 
"NUMBER_OF_LARGE_MESSAGES", "25"));
+   private static final int SIZE_OF_LARGE_MESSAGE = 
Integer.parseInt(TestParameters.testProperty(TEST_NAME, 
"SIZE_OF_LARGE_MESSAGE", "200000"));
+   private static final int NUMBER_OF_REGULAR_MESSAGES = 
Integer.parseInt(TestParameters.testProperty(TEST_NAME, 
"NUMBER_OF_REGULAR_MESSAGES", "500"));
+   private static final int SIZE_OF_REGULAR_MESSAGE = 
Integer.parseInt(TestParameters.testProperty(TEST_NAME, 
"SIZE_OF_REGULAR_MESSAGE", "30000"));
+   private static final int LARGE_TIMEOUT_MINUTES = 
Integer.parseInt(TestParameters.testProperty(TEST_NAME, 
"LARGE_TIMEOUT_MINUTES", "1"));
+   private static final boolean USE_DEBUG = 
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "DEBUG", "false"));
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static String body;
+
+   static {
+      StringWriter writer = new StringWriter();
+      while (writer.getBuffer().length() < 10 * 1024) {
+         writer.append("This is a string ..... ");
+      }
+      body = writer.toString();
+   }
+
+   private static final String TOPIC_NAME = "LargeTopic";
+   private static final String QUEUE_NAME = "LargeQueue";
+
+   public static final String DC1_NODE_A = "LargeAccumulationTest/DC1";
+   public static final String DC2_NODE_A = "LargeAccumulationTest/DC2";
+
+   private static final String SNF_QUEUE = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
+
+   Process processDC1_node_A;
+   Process processDC2_node_A;
+
+   private static String DC1_NODEA_URI = "tcp://localhost:61616";
+   private static String DC2_NODEA_URI = "tcp://localhost:61618";
+
+   private static void createServer(String serverName,
+                                    String connectionName,
+                                    String mirrorURI,
+                                    int portOffset) throws Exception {
+
+      File serverLocation = getFileServerLocation(serverName);
+      deleteDirectory(serverLocation);
+
+      HelperCreate cliCreateServer = helperCreate();
+      
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
+      cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
+      cliCreateServer.setClustered(false);
+      cliCreateServer.setNoWeb(NO_WEB);
+      cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", 
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", 
DC1_NODE_A);
+      cliCreateServer.addArgs("--addresses", TOPIC_NAME);
+      cliCreateServer.addArgs("--queues", QUEUE_NAME);
+      cliCreateServer.addArgs("--java-memory", "2G");
+      cliCreateServer.setPortOffset(portOffset);
+      cliCreateServer.createServer();
+
+      Properties brokerProperties = new Properties();
+      brokerProperties.put("AMQPConnections." + connectionName + ".uri", 
mirrorURI);
+      brokerProperties.put("AMQPConnections." + connectionName + 
".retryInterval", "1000");
+      brokerProperties.put("AMQPConnections." + connectionName + ".type", 
AMQPBrokerConnectionAddressType.MIRROR.toString());
+      brokerProperties.put("AMQPConnections." + connectionName + 
".connectionElements.mirror.sync", "false");
+      brokerProperties.put("largeMessageSync", "false");
+      brokerProperties.put("pageSyncTimeout", "" + 
TimeUnit.MILLISECONDS.toNanos(1));
+      brokerProperties.put("messageExpiryScanPeriod", "-1");
+
+      brokerProperties.put("addressSettings.#.maxSizeBytes", "100MB");
+      brokerProperties.put("addressSettings.#.maxSizeMessages", "1000");
+      brokerProperties.put("addressSettings.#.addressFullMessagePolicy", 
"PAGING");
+      brokerProperties.put("addressSettings.#.maxReadPageMessages", "15000");
+      brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+      brokerProperties.put("addressSettings.#.prefetchPageMessages", "100");
+      brokerProperties.put("mirrorPageTransaction", "true");
+      brokerProperties.put("mirrorAckManagerQueueAttempts", "1");
+      brokerProperties.put("mirrorAckManagerPageAttempts", "1");
+      
brokerProperties.put("acceptorConfigurations.artemis.extraParams.amqpCredits", 
AMQP_CREDITS);
+      
brokerProperties.put("acceptorConfigurations.artemis.extraParams.mirrorMaxPendingAcks",
 MAX_PENDING_ACKS);  // 200_000
+
+      if (USE_DEBUG) {
+         replaceLogs(serverLocation);
+      }
+
+      File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
+      saveProperties(brokerProperties, brokerPropertiesFile);
+   }
+
+   private static void replaceLogs(File serverLocation) throws Exception {
+      File log4j = new File(serverLocation, "/etc/log4j2.properties");
+      // usual places to debug while dealing with this test
+      assertTrue(FileUtil.findReplace(log4j, 
"logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" +
+         "\n" + 
"logger.db1.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n"
+         + "logger.db1.level=DEBUG"));
+   }
+
+
+   @BeforeAll
+   public static void createServers() throws Exception {
+      createServer(DC1_NODE_A, "mirror", DC2_NODEA_URI, 0);
+      createServer(DC2_NODE_A, "mirror", DC1_NODEA_URI, 2);
+   }
+
+   @BeforeEach
+   public void cleanupServers() {
+      cleanupData(DC1_NODE_A);
+      cleanupData(DC2_NODE_A);
+   }
+
+   public CountDownLatch send(Executor executor,
+                              AtomicInteger errors,
+                              int threads,
+                              ConnectionFactory connectionFactory,
+                              int numberOfMessgesPerThread,
+                              int commitInterval,
+                              int sizePerMessage,
+                              Destination destination,
+                              String sendDescription) {
+      CountDownLatch done = new CountDownLatch(threads);
+      AtomicInteger messageSent = new AtomicInteger(0);
+      String body = "a".repeat(sizePerMessage);
+      for (int i = 0; i < threads; i++) {
+         executor.execute(() -> {
+            int commitControl = 0;
+            try (Connection connection = connectionFactory.createConnection()) 
{
+               Session session;
+
+               session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+               MessageProducer producer = session.createProducer(destination);
+               for (int m = 0; m < numberOfMessgesPerThread; m++) {
+                  producer.send(session.createTextMessage(body));
+                  int sent = messageSent.incrementAndGet();
+                  if (commitControl++ % commitInterval == 0) {
+                     session.commit();
+                  }
+                  if (sent % 100 == 0) {
+                     logger.info("message sent {} on {} from {}", sent, 
destination, sendDescription);
+                  }
+               }
+               session.commit();
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+      return done;
+   }
+
+   public void consume(Executor executor,
+                       AtomicInteger errors,
+                       int threads,
+                       ConnectionFactory connectionFactory,
+                       int numberOfMessgesPerThread,
+                       int commitInterval,
+                       String subscriptionID,
+                       Destination destination,
+                       CountDownLatch done) {
+      AtomicInteger messagesConsumed = new AtomicInteger(0);
+      for (int i = 0; i < threads; i++) {
+         executor.execute(() -> {
+            int commitControl = 0;
+            try (Connection connection = connectionFactory.createConnection()) 
{
+
+               Session session;
+
+               session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+               MessageConsumer consumer;
+
+               if (subscriptionID != null) {
+                  consumer = session.createSharedDurableConsumer((Topic) 
destination, subscriptionID);
+               } else {
+                  consumer = session.createConsumer(destination);
+               }
+
+               connection.start();
+
+               for (int m = 0; m < numberOfMessgesPerThread; m++) {
+                  TextMessage message = (TextMessage) 
consumer.receive(TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES));
+                  assertNotNull(message);
+                  if (commitControl++ % commitInterval == 0) {
+                     session.commit();
+                  }
+                  int consumed = messagesConsumed.incrementAndGet();
+                  if (consumed % 100 == 0) {
+                     logger.info("message consumed {} on {}", consumed, 
destination);
+                  }
+               }
+               session.commit();
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+   }
+
+   @Test
+   public void testLargeAccumulation() throws Exception {
+
+      final boolean useTopic = true;
+      final boolean useQueue = true;
+
+      AtomicInteger errors = new AtomicInteger(0);
+
+      // producers will have 2 sets of producers (queue and topic)
+      // while consumers will have 1 consumer for the queue, and one consumer 
for each topic subscription
+      // and the same is used for both consumers and producers
+      ExecutorService service = Executors.newFixedThreadPool(NUMBER_OF_THREADS 
* (1 + NUMBER_OF_SUBSCRIPTIONS));
+      runAfter(service::shutdownNow);
+
+      String protocol = "AMQP";
+      startDC1();
+      startDC2();
+
+      final int commitInterval = 1;
+
+      ConnectionFactory[] cfs = new 
ConnectionFactory[]{CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI + 
"?jms.prefetchPolicy.queuePrefetch=10"), 
CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI + 
"?jms.prefetchPolicy.queuePrefetch=10")};
+      SimpleManagement[] sm = new SimpleManagement[]{new 
SimpleManagement(DC1_NODEA_URI, null, null), new 
SimpleManagement(DC2_NODEA_URI, null, null)};
+
+      Queue largeQueue = null;
+      Topic largeTopic = null;
+
+      // creating subscriptions and lookup queue names with 
session.createTopic and session.createQueue
+      for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
+         try (Connection connection = cfs[0].createConnection()) {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            if (largeTopic == null) {
+               largeTopic = session.createTopic(TOPIC_NAME);
+               largeQueue = session.createQueue(QUEUE_NAME);
+            }
+            MessageConsumer consumer = 
session.createSharedDurableConsumer(largeTopic, "sub_" + i);
+         }
+      }
+
+      CountDownLatch doneTopic = null, doneQueue = null;
+
+      if (useTopic) {
+         doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeTopic, 
"LargeMessageTopic");
+      }
+      if (useQueue) {
+         doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeQueue, 
"LargeMessageQueue");
+      }
+
+      if (useTopic) {
+         assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+      }
+      if (useQueue) {
+         assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+      }
+
+      assertEquals(0, errors.get());
+
+      if (useTopic) {
+         doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeTopic, 
"MediumMessageTopic");
+      }
+      if (useQueue) {
+         doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeQueue, 
"MediumMessageQueue");
+      }
+
+      if (useTopic) {
+         assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+      }
+      if (useQueue) {
+         assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+      }
+      assertEquals(0, errors.get());
+
+      matchMessageCounts(sm, (long) (NUMBER_OF_LARGE_MESSAGES + 
NUMBER_OF_REGULAR_MESSAGES) * NUMBER_OF_THREADS, useTopic, useQueue, true);
+
+      if (useQueue) {
+         doneQueue = new CountDownLatch(NUMBER_OF_THREADS);
+         consume(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, null, largeQueue, 
doneQueue);
+      }
+      if (useTopic) {
+         doneTopic = new CountDownLatch(NUMBER_OF_THREADS * 
NUMBER_OF_SUBSCRIPTIONS);
+         for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
+            consume(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, "sub_" + i, 
largeTopic, doneTopic);
+         }
+      }
+
+      if (useTopic) {
+         assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+      }
+
+      if (useQueue) {
+         assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+      }
+
+      assertEquals(0, errors.get());
+
+      matchMessageCounts(sm, 0, useTopic, useQueue, true);
+   }
+
+   private boolean matchMessageCounts(SimpleManagement[] sm,
+                                      long numberOfMessages,
+                                      boolean useTopic,
+                                      boolean useQueue,
+                                      boolean useWait) throws Exception {
+      for (SimpleManagement s : sm) {
+         logger.debug("Checking counts on SNF for {}", s.getUri());
+         if (useWait) {
+            Wait.assertEquals((long) 0, () -> 
s.getMessageCountOnQueue(SNF_QUEUE), 
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
+         } else {
+            if (s.getMessageCountOnQueue(SNF_QUEUE) != 0) {
+               return false;
+            }
+         }
+
+         if (useTopic) {
+            for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
+               String subscriptionName = "sub_" + i + ":global";
+               logger.debug("Checking counts on {} on {}", subscriptionName, 
s.getUri());
+               if (useWait) {
+                  Wait.assertEquals(numberOfMessages, () -> 
s.getMessageCountOnQueue(subscriptionName), 
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
+               } else {
+                  if (s.getMessageCountOnQueue(subscriptionName) != 
numberOfMessages) {
+                     return false;
+                  }
+               }
+            }
+         }
+
+         if (useQueue) {
+            if (useWait) {
+               Wait.assertEquals(numberOfMessages, () -> 
s.getMessageCountOnQueue(QUEUE_NAME), 
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
+            } else {
+               if (s.getMessageCountOnQueue(QUEUE_NAME) != numberOfMessages) {
+                  return false;
+               }
+            }
+         }
+      }
+      return true;
+   }
+
+   int getNumberOfLargeMessages(String serverName) throws Exception {
+      File lmFolder = new File(getServerLocation(serverName) + 
"/data/large-messages");
+      assertTrue(lmFolder.exists());
+      return lmFolder.list().length;
+   }
+
+   private void startDC1() throws Exception {
+      processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new 
File(getServerLocation(DC1_NODE_A), "broker.properties"));
+      ServerUtil.waitForServerToStart(0, 10_000);
+   }
+
+   private void stopDC1() throws Exception {
+      processDC1_node_A.destroyForcibly();
+      assertTrue(processDC1_node_A.waitFor(10, TimeUnit.SECONDS));
+   }
+
+   private void stopDC2() throws Exception {
+      processDC2_node_A.destroyForcibly();
+      assertTrue(processDC2_node_A.waitFor(10, TimeUnit.SECONDS));
+   }
+
+   private void startDC2() throws Exception {
+      processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new 
File(getServerLocation(DC2_NODE_A), "broker.properties"));
+      ServerUtil.waitForServerToStart(2, 10_000);
+   }
+}
\ No newline at end of file
diff --git a/tests/soak-tests/src/test/scripts/longrun-parameters.sh 
b/tests/soak-tests/src/test/scripts/longrun-parameters.sh
index 14d3d2154d..a0f429003f 100755
--- a/tests/soak-tests/src/test/scripts/longrun-parameters.sh
+++ b/tests/soak-tests/src/test/scripts/longrun-parameters.sh
@@ -139,3 +139,16 @@ export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=50000
 export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000000
 export TEST_SINGLE_MIRROR_SOAK_GENERAL_TIMEOUT=100000
 export TEST_SINGLE_MIRROR_SOAK_CONSUMER_PROCESSING_TIME=10
+
+#LargeAccumulationTest
+export TEST_LARGE_ACCUMULATION_DEBUG=false
+export TEST_LARGE_ACCUMULATION_AMQP_CREDITS=1000
+export TEST_LARGE_ACCUMULATION_MAX_PENDING_ACKS=1000
+export TEST_LARGE_ACCUMULATION_NO_WEB=true
+export TEST_LARGE_ACCUMULATION_THREADS=20
+export TEST_LARGE_ACCUMULATION_NUMBER_OF_SUBSCRIPTIONS=2
+export TEST_LARGE_ACCUMULATION_NUMBER_OF_LARGE_MESSAGES=100
+export TEST_LARGE_ACCUMULATION_SIZE_OF_LARGE_MESSAGE=200000
+export TEST_LARGE_ACCUMULATION_NUMBER_OF_REGULAR_MESSAGES=2500
+export TEST_LARGE_ACCUMULATION_SIZE_OF_REGULAR_MESSAGE=30000
+export TEST_LARGE_ACCUMULATION_LARGE_TIMEOUT_MINUTES=1
diff --git a/tests/soak-tests/src/test/scripts/parameters.sh 
b/tests/soak-tests/src/test/scripts/parameters.sh
index 57a8be7d9a..624654e058 100755
--- a/tests/soak-tests/src/test/scripts/parameters.sh
+++ b/tests/soak-tests/src/test/scripts/parameters.sh
@@ -138,3 +138,16 @@ export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=10000
 export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000
 export TEST_SINGLE_MIRROR_SOAK_GENERAL_TIMEOUT=10000
 export TEST_SINGLE_MIRROR_SOAK_CONSUMER_PROCESSING_TIME=10
+
+#LargeAccumulationTest
+export TEST_LARGE_ACCUMULATION_DEBUG=false
+export TEST_LARGE_ACCUMULATION_AMQP_CREDITS=1000
+export TEST_LARGE_ACCUMULATION_MAX_PENDING_ACKS=1000
+export TEST_LARGE_ACCUMULATION_NO_WEB=true
+export TEST_LARGE_ACCUMULATION_THREADS=20
+export TEST_LARGE_ACCUMULATION_NUMBER_OF_SUBSCRIPTIONS=2
+export TEST_LARGE_ACCUMULATION_NUMBER_OF_LARGE_MESSAGES=25
+export TEST_LARGE_ACCUMULATION_SIZE_OF_LARGE_MESSAGE=200000
+export TEST_LARGE_ACCUMULATION_NUMBER_OF_REGULAR_MESSAGES=500
+export TEST_LARGE_ACCUMULATION_SIZE_OF_REGULAR_MESSAGE=30000
+export TEST_LARGE_ACCUMULATION_LARGE_TIMEOUT_MINUTES=1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact



Reply via email to