This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 0b85912a71 AMQ-9853 Ensure consumer is closed after all
acknowledgments are processed in ActiveMQJMS2MessageListenerTest (#1643)
0b85912a71 is described below
commit 0b85912a713aae3673a1fb5ab2446cf58353d620
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Fri Jan 30 17:10:06 2026 +0100
AMQ-9853 Ensure consumer is closed after all acknowledgments are processed
in ActiveMQJMS2MessageListenerTest (#1643)
ACKs are Async and Closing Consumer Too Early Skips Them.
The broker's queue statistics (dequeueCount) are the only reliable
synchronization point to confirm ACKs were processed
---
.../activemq/jms2/ActiveMQJMS2MessageListenerTest.java | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
index c7263cb46c..9edea8b0b7 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
@@ -161,17 +161,19 @@ public class ActiveMQJMS2MessageListenerTest extends
ActiveMQJMS2TestBase {
break;
default: break;
}
- jmsConsumer.close();
final Logger logger = LoggerFactory.getLogger(this.getClass());
- assertTrue("Queue should drain in time", Wait.waitFor(new
Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- logger.info("Current Queue size: " +
localQueueViewMBean.getQueueSize() +
- ", dequeue count: " +
localQueueViewMBean.getDequeueCount());
- return localQueueViewMBean.getQueueSize() == 0L &&
localQueueViewMBean.getDequeueCount() >= 2L;
- }
+
+ // Wait for all acknowledgments to be processed BEFORE closing
the consumer.
+ // With CLIENT_ACKNOWLEDGE and INDIVIDUAL_ACKNOWLEDGE, the ack
processing may be
+ // asynchronous and closing the consumer too early can cause
messages to not be dequeued.
+ assertTrue("Queue should drain in time", Wait.waitFor(() -> {
+ logger.info("Current Queue size: " +
localQueueViewMBean.getQueueSize() +
+ ", dequeue count: " +
localQueueViewMBean.getDequeueCount());
+ return localQueueViewMBean.getQueueSize() == 0L &&
localQueueViewMBean.getDequeueCount() >= 2L;
}, 60000L, 200L));
+
+ jmsConsumer.close();
} // Close consumer context
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact