[ 
https://issues.apache.org/jira/browse/AMQ-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasco Veloso updated AMQ-6579:
------------------------------
    Description: 
When a TopicSubscription is configured with a limit on the number of pending 
messages, it will try to eagerly evict expired messages before dispatching them.

{code:title=TopicSubscription.java:169}
if (!matched.isEmpty() && matched.size() > max) {
  removeExpiredMessages();
}
{code}

When {{TopicSubscription#removeExpiredMessages}} detects an expired message, it 
will remove it but will increment the counter of dispatched messages as well.

{code:title=TopicSubscription.java:235}
if (node.isExpired()) {
  matched.remove();
  getSubscriptionStatistics().getDispatched().increment();
  node.decrementReferenceCount();
  if (broker.isExpired(node)) {
    ((Destination) 
node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
    broker.messageExpired(getContext(), node, this);
  }
  break;
}
{code}

However this has the side effect of affecting the result of 
{{getDispatchedQueueSize()}} and therefore {{isFull()}}. These counters will 
now reflect a new dispatched message that has actually been dropped.

In the worst case scenario slow consumers will no longer receive messages 
because they are "full" when in fact they have nothing to process.

Am I correct in concluding that expired messages must not count towards the 
dispatched value?

I have made a quick change, removing the increment, and things look good so 
far. However I am worried that I may be missing some side effect or 
specification detail.

  was:
When a TopicSubscription is configured with a limit on the number of pending 
messages, it will try to eagerly evict expired messages before dispatching them.

{code:title=TopicSubscription.java:169}
if (!matched.isEmpty() && matched.size() > max) {
  removeExpiredMessages();
}
{code}

When {{TopicSubscription#removeExpiredMessages}} detects an expired message, it 
will remove it but will increment the counter of dispatched messages as well.

{code:title=TopicSubscription.java:169}
if (node.isExpired()) {
  matched.remove();
  getSubscriptionStatistics().getDispatched().increment();
  node.decrementReferenceCount();
  if (broker.isExpired(node)) {
    ((Destination) 
node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
    broker.messageExpired(getContext(), node, this);
  }
  break;
}
{code}

However this has the side effect of affecting the result of 
{{getDispatchedQueueSize()}} and therefore {{isFull()}}. These counters will 
now reflect a new dispatched message that has actually been dropped.

In the worst case scenario slow consumers will no longer receive messages 
because they are "full" when in fact they have nothing to process.

Am I correct in concluding that expired messages must not count towards the 
dispatched value?

I have made a quick change, removing the increment, and things look good so 
far. However I am worried that I may be missing some side effect or 
specification detail.


> Expired messages counting as Dispatched on TopicSubscription
> ------------------------------------------------------------
>
>                 Key: AMQ-6579
>                 URL: https://issues.apache.org/jira/browse/AMQ-6579
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.15.0, 5.14.3
>            Reporter: Vasco Veloso
>
> When a TopicSubscription is configured with a limit on the number of pending 
> messages, it will try to eagerly evict expired messages before dispatching 
> them.
> {code:title=TopicSubscription.java:169}
> if (!matched.isEmpty() && matched.size() > max) {
>   removeExpiredMessages();
> }
> {code}
> When {{TopicSubscription#removeExpiredMessages}} detects an expired message, 
> it will remove it but will increment the counter of dispatched messages as 
> well.
> {code:title=TopicSubscription.java:235}
> if (node.isExpired()) {
>   matched.remove();
>   getSubscriptionStatistics().getDispatched().increment();
>   node.decrementReferenceCount();
>   if (broker.isExpired(node)) {
>     ((Destination) 
> node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
>     broker.messageExpired(getContext(), node, this);
>   }
>   break;
> }
> {code}
> However this has the side effect of affecting the result of 
> {{getDispatchedQueueSize()}} and therefore {{isFull()}}. These counters will 
> now reflect a new dispatched message that has actually been dropped.
> In the worst case scenario slow consumers will no longer receive messages 
> because they are "full" when in fact they have nothing to process.
> Am I correct in concluding that expired messages must not count towards the 
> dispatched value?
> I have made a quick change, removing the increment, and things look good so 
> far. However I am worried that I may be missing some side effect or 
> specification detail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to