[
https://issues.apache.org/jira/browse/AMQ-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vasco Veloso updated AMQ-6579:
------------------------------
Description:
When a TopicSubscription is configured with a limit on the number of pending
messages, it will try to eagerly evict expired messages before dispatching them.
{code:title=TopicSubscription.java:169}
if (!matched.isEmpty() && matched.size() > max) {
removeExpiredMessages();
}
{code}
When {{TopicSubscription#removeExpiredMessages}} detects an expired message, it
will remove it but will increment the counter of dispatched messages as well.
{code:title=TopicSubscription.java:235}
if (node.isExpired()) {
matched.remove();
getSubscriptionStatistics().getDispatched().increment();
node.decrementReferenceCount();
if (broker.isExpired(node)) {
((Destination)
node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
broker.messageExpired(getContext(), node, this);
}
break;
}
{code}
However this has the side effect of affecting the result of
{{getDispatchedQueueSize()}} and therefore {{isFull()}}. These counters will
now reflect a new dispatched message that has actually been dropped.
In the worst case scenario slow consumers will no longer receive messages
because they are "full" when in fact they have nothing to process.
Am I correct in concluding that expired messages must not count towards the
dispatched value?
I have made a quick change, removing the increment, and things look good so
far. However I am worried that I may be missing some side effect or
specification detail.
was:
When a TopicSubscription is configured with a limit on the number of pending
messages, it will try to eagerly evict expired messages before dispatching them.
{code:title=TopicSubscription.java:169}
if (!matched.isEmpty() && matched.size() > max) {
removeExpiredMessages();
}
{code}
When {{TopicSubscription#removeExpiredMessages}} detects an expired message, it
will remove it but will increment the counter of dispatched messages as well.
{code:title=TopicSubscription.java:169}
if (node.isExpired()) {
matched.remove();
getSubscriptionStatistics().getDispatched().increment();
node.decrementReferenceCount();
if (broker.isExpired(node)) {
((Destination)
node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
broker.messageExpired(getContext(), node, this);
}
break;
}
{code}
However this has the side effect of affecting the result of
{{getDispatchedQueueSize()}} and therefore {{isFull()}}. These counters will
now reflect a new dispatched message that has actually been dropped.
In the worst case scenario slow consumers will no longer receive messages
because they are "full" when in fact they have nothing to process.
Am I correct in concluding that expired messages must not count towards the
dispatched value?
I have made a quick change, removing the increment, and things look good so
far. However I am worried that I may be missing some side effect or
specification detail.
> Expired messages counting as Dispatched on TopicSubscription
> ------------------------------------------------------------
>
> Key: AMQ-6579
> URL: https://issues.apache.org/jira/browse/AMQ-6579
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.15.0, 5.14.3
> Reporter: Vasco Veloso
>
> When a TopicSubscription is configured with a limit on the number of pending
> messages, it will try to eagerly evict expired messages before dispatching
> them.
> {code:title=TopicSubscription.java:169}
> if (!matched.isEmpty() && matched.size() > max) {
> removeExpiredMessages();
> }
> {code}
> When {{TopicSubscription#removeExpiredMessages}} detects an expired message,
> it will remove it but will increment the counter of dispatched messages as
> well.
> {code:title=TopicSubscription.java:235}
> if (node.isExpired()) {
> matched.remove();
> getSubscriptionStatistics().getDispatched().increment();
> node.decrementReferenceCount();
> if (broker.isExpired(node)) {
> ((Destination)
> node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
> broker.messageExpired(getContext(), node, this);
> }
> break;
> }
> {code}
> However this has the side effect of affecting the result of
> {{getDispatchedQueueSize()}} and therefore {{isFull()}}. These counters will
> now reflect a new dispatched message that has actually been dropped.
> In the worst case scenario slow consumers will no longer receive messages
> because they are "full" when in fact they have nothing to process.
> Am I correct in concluding that expired messages must not count towards the
> dispatched value?
> I have made a quick change, removing the increment, and things look good so
> far. However I am worried that I may be missing some side effect or
> specification detail.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)