This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 1f0ec50058e64e79dd5f1bfc9b4f1abab3f515cd
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 5 14:51:04 2025 -0500

    ARTEMIS-5338 AckManager flow control
    
    Say there are too many records in the JournalHashMap. In that case
    the sender should pace sending more data. The target should limit
    credits when the configured max pending records is reached.
---
 .../amqp/broker/ProtonProtocolManager.java         |  11 ++
 .../connect/mirror/AMQPMirrorControllerTarget.java |  32 ++++-
 .../protocol/amqp/connect/mirror/AckManager.java   |  33 ++++-
 .../amqp/connect/mirror/AckManagerProvider.java    |   3 +
 .../amqp/proton/ProtonAbstractReceiver.java        |   6 +-
 .../org/apache/activemq/artemis/utils/Wait.java    |   5 +
 .../integration/amqp/connect/AckManagerTest.java   | 154 ++++++++++++++++++++-
 .../mirror/AccumulatedInPageSoakTest.java          |   5 +
 8 files changed, 240 insertions(+), 9 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 52d767c2df..af3398f243 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -112,6 +112,8 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
 
    private boolean directDeliver = true;
 
+   private int mirrorMaxPendingAcks = 10_000;
+
    private final AMQPRoutingHandler routingHandler;
 
    /*
@@ -164,6 +166,15 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
       return amqpMinLargeMessageSize;
    }
 
+   public int getMirrorMaxPendingAcks() {
+      return mirrorMaxPendingAcks;
+   }
+
+   public ProtonProtocolManager setMirrorMaxPendingAcks(int maxPendingAcks) {
+      this.mirrorMaxPendingAcks = maxPendingAcks;
+      return this;
+   }
+
    public ProtonProtocolManager setAmqpMinLargeMessageSize(int 
amqpMinLargeMessageSize) {
       this.amqpMinLargeMessageSize = amqpMinLargeMessageSize;
       return this;
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index be0895289d..059d3e9122 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -100,6 +100,27 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
       return CONTROLLER_THREAD_LOCAL.get();
    }
 
+
+   @Override
+   public boolean isBusy() {
+      return getAckManager().size() > getMirrorMaxPendingAcks();
+   }
+
+   public int getMirrorMaxPendingAcks() {
+      try {
+         return connection.getProtocolManager().getMirrorMaxPendingAcks();
+      } catch (Throwable e) {
+         // It shouldn't happen, but if it did we just log it
+         logger.warn(e.getMessage(), e);
+         return 0;
+      }
+   }
+
+   public void verifyCredits() {
+      connection.runNow(creditTopUpRunner);
+   }
+
+
    /**
     * Objects of this class can be used by either transaction or by 
OperationContext. It is important that when you're
     * using the transactions you clear any references to the operation 
context. Don't use transaction and
@@ -473,14 +494,17 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
          logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})", 
nodeID, messageID, targetQueue.getName());
       }
 
+      getAckManager().ack(nodeID, targetQueue, messageID, reason, true);
+
+      
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, 
OperationConsistencyLevel.FULL);
+   }
+
+   private AckManager getAckManager() {
       if (ackManager == null) {
          ackManager = AckManagerProvider.getManager(server);
          ackManager.registerMirror(this);
       }
-
-      ackManager.ack(nodeID, targetQueue, messageID, reason, true);
-
-      
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, 
OperationConsistencyLevel.FULL);
+      return ackManager;
    }
 
    /**
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index 0faa078a20..5e3b4b744f 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.LongSupplier;
 
 import io.netty.util.collection.LongObjectHashMap;
@@ -78,6 +79,13 @@ public class AckManager implements ActiveMQComponent {
    volatile MultiStepProgress progress;
    ActiveMQScheduledComponent scheduledComponent;
 
+   private volatile int size;
+   private static final AtomicIntegerFieldUpdater<AckManager> sizeUpdater = 
AtomicIntegerFieldUpdater.newUpdater(AckManager.class, "size");
+
+   public int size() {
+      return sizeUpdater.get(this);
+   }
+
    public AckManager(ActiveMQServer server) {
       assert server != null && server.getConfiguration() != null;
       this.server = server;
@@ -92,6 +100,7 @@ public class AckManager implements ActiveMQComponent {
 
    public void reload(RecordInfo recordInfo) {
       journalHashMapProvider.reload(recordInfo);
+      sizeUpdater.incrementAndGet(this);
    }
 
    @Override
@@ -104,6 +113,13 @@ public class AckManager implements ActiveMQComponent {
       logger.debug("Stopping ackmanager on server {}", server);
    }
 
+   public synchronized void pause() {
+      if (scheduledComponent != null) {
+         scheduledComponent.stop();
+         scheduledComponent = null;
+      }
+   }
+
    @Override
    public synchronized boolean isStarted() {
       return scheduledComponent != null && scheduledComponent.isStarted();
@@ -188,6 +204,11 @@ public class AckManager implements ActiveMQComponent {
       targetCopy.forEach(AMQPMirrorControllerTarget::flush);
    }
 
+   private void checkFlowControlMirrorTargets() {
+      List<AMQPMirrorControllerTarget> targetCopy = copyTargets();
+      targetCopy.forEach(AMQPMirrorControllerTarget::verifyCredits);
+   }
+
    private synchronized List<AMQPMirrorControllerTarget> copyTargets() {
       return new ArrayList<>(mirrorControllerTargets);
    }
@@ -267,6 +288,8 @@ public class AckManager implements ActiveMQComponent {
             logger.trace("Page Scan not required for address {}", address);
          }
 
+         checkFlowControlMirrorTargets();
+
       } catch (Throwable e) {
          logger.warn(e.getMessage(), e);
       } finally {
@@ -299,7 +322,9 @@ public class AckManager implements ActiveMQComponent {
                if (logger.isDebugEnabled()) {
                   logger.debug("Retried {} {} times, giving up on the entry 
now. Configured Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
                }
-               retries.remove(retry);
+               if (retries.remove(retry) != null) {
+                  sizeUpdater.decrementAndGet(AckManager.this);
+               }
             } else {
                if (logger.isDebugEnabled()) {
                   logger.debug("Retry {} attempted {} times on paging, 
Configuration Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
@@ -353,7 +378,9 @@ public class AckManager implements ActiveMQComponent {
                            }
                         }
                      }
-                     retries.remove(ackRetry, transaction.getID());
+                     if (retries.remove(ackRetry, transaction.getID()) != 
null) {
+                        sizeUpdater.decrementAndGet(AckManager.this);
+                     }
                      transaction.setContainsPersistent();
                      logger.trace("retry performed ok, ackRetry={} for 
message={} on queue", ackRetry, pagedMessage);
                   }
@@ -389,6 +416,7 @@ public class AckManager implements ActiveMQComponent {
             if (ack(retry.getNodeID(), queue, retry.getMessageID(), 
retry.getReason(), false)) {
                logger.trace("Removing retry {} as the retry went ok", retry);
                queueRetries.remove(retry);
+               sizeUpdater.decrementAndGet(this);
             } else {
                int retried = retry.attemptedQueue();
                if (logger.isTraceEnabled()) {
@@ -410,6 +438,7 @@ public class AckManager implements ActiveMQComponent {
       }
       AckRetry retry = new AckRetry(nodeID, messageID, reason);
       journalHashMapProvider.getMap(queue.getID(), queue).put(retry, retry);
+      sizeUpdater.incrementAndGet(this);
       if (scheduledComponent != null) {
          // we set the retry delay again in case it was changed.
          
scheduledComponent.setPeriod(configuration.getMirrorAckManagerRetryDelay());
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java
index aaf77641a6..4b570e5433 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java
@@ -49,6 +49,9 @@ public class AckManagerProvider {
    }
 
    public static AckManager getManager(ActiveMQServer server) {
+      if (server == null) {
+         throw new NullPointerException("server is null");
+      }
       synchronized (managerHashMap) {
          AckManager ackManager = managerHashMap.get(server);
          if (ackManager != null) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
index 794b9055b4..a5acddbc06 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
@@ -175,6 +175,10 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
       return state == ReceiverState.STARTED;
    }
 
+   public boolean isBusy() {
+      return false;
+   }
+
    public boolean isStopping() {
       return state == ReceiverState.STOPPING;
    }
@@ -276,7 +280,7 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
          if (connection.isHandler()) {
             connection.requireInHandler();
 
-            if (context.isStarted()) {
+            if (context.isStarted() && !context.isBusy()) {
                final int pending = context.pendingSettles;
 
                if (isBellowThreshold(receiver.getCredit(), pending, 
threshold)) {
diff --git 
a/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java
 
b/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java
index 0864f5a8bd..dd287670d8 100644
--- 
a/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java
+++ 
b/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java
@@ -172,6 +172,11 @@ public class Wait {
       assertTrue(failureMessage, condition, duration, SLEEP_MILLIS);
    }
 
+   public static <T> T assertNotNull(Supplier<T> supplier, final long 
duration, final long sleep) throws Exception {
+      Assertions.assertTrue(waitFor(() -> supplier.get() != null, duration, 
sleep));
+      return supplier.get();
+   }
+
    public static void assertTrue(Condition condition, final long duration, 
final long sleep) {
       assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, sleep);
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
index 650104fd77..a185b9c4a2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
@@ -26,10 +26,13 @@ import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
 import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
@@ -43,9 +46,11 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import 
org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
+import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
 import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
 import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager;
 import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManagerProvider;
@@ -57,17 +62,27 @@ import 
org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class AckManagerTest extends ActiveMQTestBase {
 
@@ -86,11 +101,11 @@ public class AckManagerTest extends ActiveMQTestBase {
       server1.getConfiguration().addAddressSetting(SNF_NAME, new 
AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1).setMaxReadPageMessages(20));
       server1.getConfiguration().getAcceptorConfigurations().clear();
       server1.getConfiguration().addAcceptorConfiguration("server", 
"tcp://localhost:61616");
-      server1.start();
    }
 
    @Test
    public void testDirectACK() throws Throwable {
+      server1.start();
 
       String protocol = "AMQP";
 
@@ -289,9 +304,9 @@ public class AckManagerTest extends ActiveMQTestBase {
       assertEquals(0, AckManagerProvider.getSize());
    }
 
-
    @Test
    public void testLogUnack() throws Throwable {
+      server1.start();
       String protocol = "AMQP";
 
       SimpleString TOPIC_NAME = SimpleString.of("tp" + 
RandomUtil.randomUUIDString());
@@ -340,6 +355,7 @@ public class AckManagerTest extends ActiveMQTestBase {
 
    @Test
    public void testRetryFromPaging() throws Throwable {
+      server1.start();
 
       String protocol = "AMQP";
 
@@ -440,6 +456,140 @@ public class AckManagerTest extends ActiveMQTestBase {
 
 
 
+   @Test
+   public void testFlowControlOnPendingAcks() throws Throwable {
+
+      server1.getConfiguration().getAcceptorConfigurations().clear();
+      server1.getConfiguration().addAcceptorConfiguration("server", 
"tcp://localhost:61616?mirrorMaxPendingAcks=100&amqpCredits=100");
+      server1.start();
+
+      String protocol = "AMQP";
+
+      SimpleString QUEUE_NAME = SimpleString.of("queue_" + 
RandomUtil.randomUUIDString());
+
+      Queue testQueue = 
server1.createQueue(QueueConfiguration.of(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST));
+
+      ConnectionFactory connectionFactory = 
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+
+      // First step... adding messages to a queue // 50% paging 50% queue
+      try (Connection connection = connectionFactory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         javax.jms.Queue jmsQueue = session.createQueue(QUEUE_NAME.toString());
+         MessageProducer producer = session.createProducer(jmsQueue);
+         for (int i = 0; i < 100; i++) {
+            if (i == 50) {
+               session.commit();
+               testQueue.getPagingStore().startPaging();
+            }
+            producer.send(session.createTextMessage("hello there " + i));
+         }
+         session.commit();
+      }
+
+      Wait.assertEquals(100, testQueue::getMessageCount);
+
+      AckManager ackManager = AckManagerProvider.getManager(server1);
+      assertTrue(ackManager.isStarted());
+      ackManager.pause();
+      assertFalse(ackManager.isStarted());
+      assertSame(ackManager, AckManagerProvider.getManager(server1));
+
+      // adding fake retries to flood the manager beyond capacity
+      addFakeRetries(ackManager, testQueue, 1000);
+
+      AmqpClient client = new AmqpClient(new URI("tcp://localhost:61616"), 
null, null);
+      AmqpConnection connection = client.connect();
+      runAfter(connection::close);
+      AmqpSession session = connection.createSession();
+
+      Map<Symbol, Object> properties = new HashMap<>();
+      properties.put(AMQPMirrorControllerSource.BROKER_ID, "whatever");
+
+      // this is simulating a mirror connection...
+      // we play with a direct sender here to make sure flow control is 
working as expected when the records are beyond capacity.
+      AmqpSender sender = session.createSender(QueueImpl.MIRROR_ADDRESS, true, 
new Symbol[]{Symbol.getSymbol("amq.mirror")}, new 
Symbol[]{Symbol.getSymbol("amq.mirror")}, properties);
+
+      AMQPMirrorControllerTarget mirrorControllerTarget = 
Wait.assertNotNull(() -> locateMirrorTarget(server1), 5000, 100);
+      assertEquals(100, 
mirrorControllerTarget.getConnection().getProtocolManager().getMirrorMaxPendingAcks());
+      assertTrue(mirrorControllerTarget.isBusy());
+      // first connection it should be beyond flow control capacity, we should 
not have any credits here now
+      assertEquals(0, sender.getEndpoint().getCredit());
+      ackManager.start();
+
+      // manager resumed and the records will be eventually removed, we should 
be back to capacity
+      Wait.assertEquals(100, () -> sender.getEndpoint().getCredit(), 5000, 
100);
+      ackManager.pause();
+
+      addFakeRetries(ackManager, testQueue, 1000);
+      assertEquals(1000, ackManager.size());
+
+      // we should be able to send 100 messages
+      for (int i = 0; i < 100; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setAddress(testQueue.getAddress().toString());
+         message.setText("hello again " + i);
+         message.setDeliveryAnnotation(INTERNAL_ID.toString(), 
server1.getStorageManager().generateID());
+         message.setDeliveryAnnotation(INTERNAL_DESTINATION.toString(), 
testQueue.getAddress().toString());
+         sender.send(message);
+      }
+      // we should not get any credits
+      assertEquals(0, sender.getEndpoint().getCredit());
+
+      Wait.assertEquals(200, testQueue::getMessageCount);
+
+      ackManager.start();
+      // after restart, we should eventually get replenished on credits
+      Wait.assertEquals(100, () -> sender.getEndpoint().getCredit(), 5000, 
100);
+
+      Wait.assertEquals(200L, testQueue::getMessageCount, 5000, 100);
+
+      AtomicInteger acked = new AtomicInteger(0);
+      ackManager.pause();
+
+      // Adding real deletes, we should still flow control credits
+      testQueue.forEach(ref -> {
+         long messageID = ref.getMessageID();
+
+         Long internalID = (Long) 
ref.getMessage().getAnnotation(AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY);
+         String nodeId = (String) 
ref.getMessage().getAnnotation(AMQPMirrorControllerSource.BROKER_ID_SIMPLE_STRING);
+         if (internalID != null) {
+            messageID = internalID.longValue();
+         }
+         ackManager.addRetry(nodeId, testQueue, messageID, AckReason.NORMAL);
+         acked.incrementAndGet();
+      });
+
+      assertEquals(200, acked.get());
+      ackManager.start();
+
+      // Adding hot data... we should be able to flow credits during that
+      for (int i = 0; i < 100; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setAddress(testQueue.getAddress().toString());
+         message.setText("one of the last 100");
+         message.setDeliveryAnnotation(INTERNAL_ID.toString(), 
server1.getStorageManager().generateID());
+         message.setDeliveryAnnotation(INTERNAL_DESTINATION.toString(), 
testQueue.getAddress().toString());
+         sender.send(message);
+      }
+      Wait.assertTrue(() -> sender.getEndpoint().getCredit() > 0, 5000, 100);
+
+      Wait.assertEquals(100L, testQueue::getMessageCount, 5000, 100);
+
+      ackManager.stop();
+
+      connection.close();
+
+      server1.stop();
+
+      assertEquals(0, AckManagerProvider.getSize());
+   }
+
+   private void addFakeRetries(AckManager ackManager, Queue testQueue, int 
size) {
+      for (int i = 0; i < size; i++) {
+         // adding retries that will never succeed, just to fillup the storage 
hashmap
+         ackManager.addRetry(null, testQueue, 
server1.getStorageManager().generateID(), AckReason.NORMAL);
+      }
+   }
 
    private int getCounter(byte typeRecord, Map<Integer, AtomicInteger> values) 
{
       AtomicInteger value = values.get((int) typeRecord);
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java
index 04991e6ef2..ca6cde0225 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java
@@ -38,6 +38,7 @@ import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker
 import org.apache.activemq.artemis.tests.soak.SoakTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
@@ -109,6 +110,10 @@ public class AccumulatedInPageSoakTest extends 
SoakTestBase {
 
       File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
       saveProperties(brokerProperties, brokerPropertiesFile);
+
+      File brokerXML = new File(serverLocation, "/etc/broker.xml");
+      // Making sure we flow control mirrorACK on the lower side to make sure 
things are working
+      assertTrue(FileUtil.findReplace(brokerXML, "</acceptor>", 
";mirrorMaxPendingAcks=50</acceptor>"));
    }
 
    @BeforeAll


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to