This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d1f03ad5e ARTEMIS-4971 Warning on Unacked messages through mirror 
AckManager
1d1f03ad5e is described below

commit 1d1f03ad5e2644b558b5eb65f737665b24697689
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Sep 23 17:02:30 2024 -0400

    ARTEMIS-4971 Warning on Unacked messages through mirror AckManager
---
 .../api/config/ActiveMQDefaultConfiguration.java   |  5 +++
 .../protocol/amqp/connect/mirror/AckManager.java   | 28 +++++++-----
 .../amqp/logger/ActiveMQAMQPProtocolLogger.java    |  6 +++
 .../artemis/core/config/Configuration.java         |  9 ++++
 .../core/config/impl/ConfigurationImpl.java        | 13 ++++++
 .../deployers/impl/FileConfigurationParser.java    |  3 ++
 .../resources/schema/artemis-configuration.xsd     |  8 ++++
 .../core/config/impl/FileConfigurationTest.java    |  1 +
 .../resources/ConfigurationTest-full-config.xml    |  1 +
 .../ConfigurationTest-xinclude-config.xml          |  1 +
 .../ConfigurationTest-xinclude-schema-config.xml   |  1 +
 .../integration/amqp/connect/AckManagerTest.java   | 51 ++++++++++++++++++++++
 12 files changed, 117 insertions(+), 10 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index d8bf42940d..11de47c758 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -715,6 +715,7 @@ public final class ActiveMQDefaultConfiguration {
 
    private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = 
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + 
".RETRY_DELAY", "100"));;
 
+   private static final boolean DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED = 
false;
    private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
 
    /**
@@ -1961,6 +1962,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS;
    }
 
+   public static boolean getMirrorAckManagerWarnUnacked() {
+      return DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED;
+   }
+
    public static int getMirrorAckManagerRetryDelay() {
       return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY;
    }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index 3dc5322136..0ef1a9b649 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -57,6 +57,7 @@ import 
org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,6 +79,7 @@ public class AckManager implements ActiveMQComponent {
    ActiveMQScheduledComponent scheduledComponent;
 
    public AckManager(ActiveMQServer server) {
+      assert server != null && server.getConfiguration() != null;
       this.server = server;
       this.configuration = server.getConfiguration();
       this.ioCriticalErrorListener = server.getIoCriticalErrorListener();
@@ -260,7 +262,7 @@ public class AckManager implements ActiveMQComponent {
                   page.usageDown();
                }
             }
-            validateExpiredSet(acksToRetry);
+            validateExpiredSet(address, acksToRetry);
          } else {
             logger.trace("Page Scan not required for address {}", address);
          }
@@ -283,16 +285,19 @@ public class AckManager implements ActiveMQComponent {
 
    }
 
-   private void validateExpiredSet(LongObjectHashMap<JournalHashMap<AckRetry, 
AckRetry, Queue>> queuesToRetry) {
-      queuesToRetry.forEach(this::validateExpireSet);
+   private void validateExpiredSet(SimpleString address, 
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) {
+      queuesToRetry.forEach((q, r) -> this.validateExpireSet(address, q, r));
    }
 
-   private void validateExpireSet(long queueID, JournalHashMap<AckRetry, 
AckRetry, Queue> retries) {
+   private void validateExpireSet(SimpleString address, long queueID, 
JournalHashMap<AckRetry, AckRetry, Queue> retries) {
       for (AckRetry retry : retries.valuesCopy()) {
          if (retry.getQueueAttempts() >= 
configuration.getMirrorAckManagerQueueAttempts()) {
             if (retry.attemptedPage() >= 
configuration.getMirrorAckManagerPageAttempts()) {
+               if (configuration.isMirrorAckManagerWarnUnacked()) {
+                  ActiveMQAMQPProtocolLogger.LOGGER.ackRetryFailed(retry, 
address, queueID);
+               }
                if (logger.isDebugEnabled()) {
-                  logger.debug("Retried {} {} times, giving up on the entry 
now. Configuration Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
+                  logger.debug("Retried {} {} times, giving up on the entry 
now. Configured Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
                }
                retries.remove(retry);
             } else {
@@ -300,6 +305,8 @@ public class AckManager implements ActiveMQComponent {
                   logger.debug("Retry {} attempted {} times on paging, 
Configuration Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
                }
             }
+         } else {
+            logger.debug("Retry {} queue attempted {} times on paging, 
QueueAttempts {} Configuration Page Attempts={}", retry, 
retry.getQueueAttempts(), retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
          }
       }
    }
@@ -418,9 +425,14 @@ public class AckManager implements ActiveMQComponent {
       if (reference == null) {
          if (logger.isDebugEnabled()) {
             logger.debug("ACK Manager could not find reference nodeID={} 
(while localID={}), messageID={} on queue {}, server={}. Adding retry with 
minQueue={}, maxPage={}, delay={}", nodeID, 
referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(), 
server, configuration.getMirrorAckManagerQueueAttempts(), 
configuration.getMirrorAckManagerPageAttempts(), 
configuration.getMirrorAckManagerRetryDelay());
-            printQueueDebug(targetQueue);
          }
+
          if (allowRetry) {
+            if (configuration != null && 
configuration.isMirrorAckManagerWarnUnacked() && targetQueue.getConsumerCount() 
> 0) {
+               
ActiveMQAMQPProtocolLogger.LOGGER.unackWithConsumer(targetQueue.getConsumerCount(),
 targetQueue.getName(), nodeID, messageID);
+            } else {
+               logger.debug("There are {} consumers on queue {}, what made Ack 
for message with nodeID={}, messageID={} enter a retry list", 
targetQueue.getConsumerCount(), targetQueue.getName(), nodeID, messageID);
+            }
             addRetry(nodeID, targetQueue, messageID, reason);
          }
          return false;
@@ -436,10 +448,6 @@ public class AckManager implements ActiveMQComponent {
       }
    }
 
-   private void printQueueDebug(Queue targetQueue) {
-      logger.debug("... queue {}/{} had {} consumers, {} messages, {} 
scheduled messages, {} delivering messages, paging={}", targetQueue.getID(), 
targetQueue.getName(), targetQueue.getConsumerCount(), 
targetQueue.getMessageCount(), targetQueue.getScheduledCount(), 
targetQueue.getDeliveringCount(), targetQueue.getPagingStore().isPaging());
-   }
-
    private void doACK(Queue targetQueue, MessageReference reference, AckReason 
reason) {
       try {
          switch (reason) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
index dc7dbda0ac..02e78a783b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
@@ -67,4 +67,10 @@ public interface ActiveMQAMQPProtocolLogger {
 
    @LogMessage(id = 111010, value = "Duplicate AckManager node detected. 
Queue={}, ServerID={}, recordID={}", level = LogMessage.Level.WARN)
    void duplicateNodeStoreID(String queue, String serverId, long recordID, 
Exception trace);
+
+   @LogMessage(id = 111011, value = "There are {} consumers on queue {}, what 
made the Ack for message with nodeID={}, messageID={} enter a retry list", 
level = LogMessage.Level.WARN)
+   void unackWithConsumer(int numberOfConsumers, Object queueName, String 
nodeID, long messageID);
+
+   @LogMessage(id = 111012, value = "Acknowledgement retry failed for {} on 
address {}, queueID={}", level = LogMessage.Level.WARN)
+   void ackRetryFailed(Object ackRetryInformation, Object address, long 
queueID);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 1df52bd0bc..17765802b5 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1535,4 +1535,13 @@ public interface Configuration {
    boolean isMirrorPageTransaction();
 
    Configuration setMirrorPageTransaction(boolean ignorePageTransactions);
+
+   /**
+    * should log.warn when ack retries failed.
+    * @param warnUnacked
+    * @return
+    */
+   Configuration setMirrorAckManagerWarnUnacked(boolean warnUnacked);
+
+   boolean isMirrorAckManagerWarnUnacked();
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 4983527b6b..88b7721099 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -444,6 +444,8 @@ public class ConfigurationImpl implements Configuration, 
Serializable {
 
    private int mirrorAckManagerPageAttempts = 
ActiveMQDefaultConfiguration.getMirrorAckManagerPageAttempts();
 
+   private boolean mirrorAckManagerWarnUnacked = 
ActiveMQDefaultConfiguration.getMirrorAckManagerWarnUnacked();
+
    private int mirrorAckManagerRetryDelay = 
ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay();
 
    private boolean mirrorPageTransaction = 
ActiveMQDefaultConfiguration.getMirrorPageTransaction();
@@ -3385,6 +3387,17 @@ public class ConfigurationImpl implements Configuration, 
Serializable {
       return mirrorAckManagerQueueAttempts;
    }
 
+   @Override
+   public boolean isMirrorAckManagerWarnUnacked() {
+      return mirrorAckManagerWarnUnacked;
+   }
+
+   @Override
+   public ConfigurationImpl setMirrorAckManagerWarnUnacked(boolean 
warnUnacked) {
+      this.mirrorAckManagerWarnUnacked = warnUnacked;
+      return this;
+   }
+
    @Override
    public ConfigurationImpl setMirrorAckManagerQueueAttempts(int 
minQueueAttempts) {
       logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}", 
minQueueAttempts);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 588e0651fa..6819b307ad 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -386,6 +386,7 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
    private static final String MIRROR_ACK_MANAGER_PAGE_ATTEMPTS = 
"mirror-ack-manager-page-attempts";
 
    private static final String MIRROR_ACK_MANAGER_RETRY_DELAY = 
"mirror-ack-manager-retry-delay";
+   private static final String MIRROR_ACK_MANAGER_WARN_UNACKED = 
"mirror-ack-manager-warn-unacked";
 
    private static final String MIRROR_PAGE_TRANSACTION = 
"mirror-page-transaction";
 
@@ -868,6 +869,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       config.setMirrorAckManagerRetryDelay(getInteger(e, 
MIRROR_ACK_MANAGER_RETRY_DELAY, config.getMirrorAckManagerRetryDelay(), 
GT_ZERO));
 
+      config.setMirrorAckManagerWarnUnacked(getBoolean(e, 
MIRROR_ACK_MANAGER_WARN_UNACKED, config.isMirrorAckManagerWarnUnacked()));
+
       parseAddressSettings(e, config);
 
       parseResourceLimits(e, config);
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index e16a7dc9b2..4905b77f98 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -944,6 +944,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="mirror-ack-manager-warn-unacked" 
type="xsd:boolean" maxOccurs="1" minOccurs="0" default="false">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Should the system log.warn when an acknowledgment retry 
fails (unacked).
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="mirror-page-transaction" type="xsd:boolean" 
maxOccurs="1" minOccurs="0" default="false">
             <xsd:annotation>
                <xsd:documentation>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 3d8fb9b033..e32527ee31 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -582,6 +582,7 @@ public class FileConfigurationTest extends 
AbstractConfigurationTestBase {
       assertEquals(Integer.valueOf(128), 
conf.getAddressSettings().get("a2").getInitialQueueBufferSize());
 
       assertEquals(111, conf.getMirrorAckManagerQueueAttempts());
+      assertTrue(conf.isMirrorAckManagerWarnUnacked());
       assertEquals(222, conf.getMirrorAckManagerPageAttempts());
       assertEquals(333, conf.getMirrorAckManagerRetryDelay());
       assertTrue(conf.isMirrorPageTransaction());
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 36d43ad604..f53d5a4844 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -558,6 +558,7 @@
       
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
       <mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
       <mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
+      <mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
       <mirror-page-transaction>true</mirror-page-transaction>
 
       <security-settings>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 689ee7e436..99fac28fca 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -74,6 +74,7 @@
       
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
       <mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
       <mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
+      <mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
       <mirror-page-transaction>true</mirror-page-transaction>
 
       <remoting-incoming-interceptors>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
index 1c163b4a14..c26879b294 100644
--- 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
+++ 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
@@ -74,6 +74,7 @@
       
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
       <mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
       <mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
+      <mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
       <mirror-page-transaction>true</mirror-page-transaction>
 
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
index 89f81fd898..fc3b53efa3 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.activemq.artemis.tests.integration.amqp.connect;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
@@ -45,11 +46,13 @@ import org.apache.activemq.artemis.core.paging.impl.Page;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import 
org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
 import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
 import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager;
@@ -64,6 +67,7 @@ import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -294,6 +298,53 @@ public class AckManagerTest extends ActiveMQTestBase {
    }
 
 
+   @Test
+   public void testLogUnack() throws Throwable {
+      String protocol = "AMQP";
+
+      SimpleString TOPIC_NAME = SimpleString.of("tp" + 
RandomUtil.randomString());
+
+      server1.addAddressInfo(new 
AddressInfo(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+
+      ConnectionFactory connectionFactory = 
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+
+      // creating 5 subscriptions
+      for (int i = 0; i < 5; i++) {
+         try (Connection connection = connectionFactory.createConnection()) {
+            connection.setClientID("c" + i);
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Topic topic = session.createTopic(TOPIC_NAME.toString());
+            session.createDurableSubscriber(topic, "s" + i);
+         }
+      }
+
+      Queue c1s1 = server1.locateQueue("c1.s1");
+      assertNotNull(c1s1);
+      Queue c2s2 = server1.locateQueue("c2.s2");
+      assertNotNull(c2s2);
+
+      try (AssertionLoggerHandler assertionLoggerHandler = new 
AssertionLoggerHandler()) {
+         server1.getConfiguration().setMirrorAckManagerWarnUnacked(true);
+         c1s1.addConsumer(Mockito.mock(Consumer.class));
+         AckManager ackManager = AckManagerProvider.getManager(server1);
+         ackManager.ack("neverFound", c1s1, 1000, AckReason.NORMAL, true);
+
+         // ID for there are consumers
+         Wait.assertTrue(() -> assertionLoggerHandler.findText("AMQ111011"), 
5000, 100);
+         // ID for give up retry
+         Wait.assertTrue(() -> assertionLoggerHandler.findText("AMQ111012"), 
5000, 100);
+
+         server1.getConfiguration().setMirrorAckManagerWarnUnacked(false);
+         assertionLoggerHandler.clear();
+
+         ackManager.ack("neverFound", c1s1, 1000, AckReason.NORMAL, true);
+
+         // ID for there are consumers
+         assertFalse(assertionLoggerHandler.findText("AMQ111011"));
+         // ID for give up retry
+         assertFalse(assertionLoggerHandler.findText("AMQ111012"));
+      }
+   }
 
    @Test
    public void testRetryFromPaging() throws Throwable {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to