This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b85912a71 AMQ-9853 Ensure consumer is closed after all 
acknowledgments are processed in ActiveMQJMS2MessageListenerTest (#1643)
0b85912a71 is described below

commit 0b85912a713aae3673a1fb5ab2446cf58353d620
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Fri Jan 30 17:10:06 2026 +0100

    AMQ-9853 Ensure consumer is closed after all acknowledgments are processed 
in ActiveMQJMS2MessageListenerTest (#1643)
    
    ACKs are Async and Closing Consumer Too Early Skips Them.
    The broker's queue statistics (dequeueCount) are the only reliable 
synchronization point to confirm ACKs were processed
---
 .../activemq/jms2/ActiveMQJMS2MessageListenerTest.java | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
index c7263cb46c..9edea8b0b7 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
@@ -161,17 +161,19 @@ public class ActiveMQJMS2MessageListenerTest extends 
ActiveMQJMS2TestBase {
                     break;
                 default: break;
                 }
-                jmsConsumer.close();
 
                 final Logger logger = LoggerFactory.getLogger(this.getClass());
-                assertTrue("Queue should drain in time", Wait.waitFor(new 
Wait.Condition() {
-                    @Override
-                    public boolean isSatisified() throws Exception {
-                        logger.info("Current Queue size: " + 
localQueueViewMBean.getQueueSize() +
-                                ", dequeue count: " + 
localQueueViewMBean.getDequeueCount());
-                        return localQueueViewMBean.getQueueSize() == 0L && 
localQueueViewMBean.getDequeueCount() >= 2L;
-                    }
+
+                // Wait for all acknowledgments to be processed BEFORE closing 
the consumer.
+                // With CLIENT_ACKNOWLEDGE and INDIVIDUAL_ACKNOWLEDGE, the ack 
processing may be
+                // asynchronous and closing the consumer too early can cause 
messages to not be dequeued.
+                assertTrue("Queue should drain in time", Wait.waitFor(() -> {
+                    logger.info("Current Queue size: " + 
localQueueViewMBean.getQueueSize() +
+                            ", dequeue count: " + 
localQueueViewMBean.getDequeueCount());
+                    return localQueueViewMBean.getQueueSize() == 0L && 
localQueueViewMBean.getDequeueCount() >= 2L;
                 }, 60000L, 200L));
+
+                jmsConsumer.close();
             } // Close consumer context
 
         } catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to