[ 
https://issues.apache.org/jira/browse/AMQ-9625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher L. Shannon updated AMQ-9625:
----------------------------------------
    Description: 
For the last several years I have occasionally seen "stuck" messages that 
appear on queues that will not be dispatched until after a broker restart. The 
bug looks to be the same as described in 
https://issues.apache.org/jira/browse/AMQ-2955 (the root cause was never 
figured out, it was just closed as they couldn't reproduce it). The resulting 
behavior seen is that KahaDB has the batch cursor set to a point after a 
message that is stored so that message will never dispatch.  There's been some 
other work done previously to help this issue, notably AMQ-3149 (more details 
in that issue and more below) 

I recently figured out how to reproduce it in a test environment and finally 
tracked down what the root cause and a fix. Not surprisingly, there are a few 
things at play here and the bug is a race condition so it won't be seen unless 
a bunch of things hold true (and if the broker is configured a certain way)
h3. Background:

There are 2 optimizations that the broker uses that are playing into this and 
both must be enabled for the issue to happen.
 # {{useCache=true}} , The normal flow for incoming messages is that they get 
written to the store and then they get paged off disk (same thread or another 
thread) to be dispatched to consumers. However, there's also a message cache 
and if enabled and if there's free memory, the message will be added to the the 
cache after sending to disk so we don't need to re-read it off disk again later 
when dispatching.
 # {{concurrentStoreAndDispatchQueues=true}} The broker also has an 
optimization for queues where it it will try and dispatch incoming messages 
concurrently to consumers while also writing to disk. (Async message writes) if 
the consumers are fast enough to ack, we can cancel the disk write which saves 
disk IO and this obviously is a benefit for slow disks. This requires the cache 
to be enabled as described in AMQ-3149 otherwise we run into problems because 
messages won't be dispatched if not finished so this mode is really only useful 
for the cache being enabled. Furthermore, if the cache is not enabled messages 
could get stuck if no new messages come in for a while so that's another reason 
this mode has no effect if the cache is off.

The two settings work together and in practice this means the flow ends up 
being that the message is submitted to the store to be added as part of an 
async task that is queued up in the background by the store. While the task is 
in the queue, the message is then concurrently added to the in memory cache and 
the broker will proceed to dispatch to consumers, who may or may not 
acknowledge dispatched messages before the disk write is finished if the 
consumers are fast and keeping up. Messages that were already written are 
removed like normal but if the async task was not finished it gets cancelled 
and saves a disk write.
h3. Bug description:

When the broker runs out of memory to cache messages, the cache has to be 
[disabled|https://github.com/apache/activemq/blob/3400983a22284a28a8989d4b0aaf762090b0911a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java#L258].
 As part of this process the cache has to 
[tell|https://github.com/apache/activemq/blob/3400983a22284a28a8989d4b0aaf762090b0911a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java#L336]
 the store what the last message is that was cached so that when the cache is 
exhausted we can resume paging off disk and dispatching in the correct spot.

The process for disabling the cache starts when a new incoming message is 
attempted to be added to the cache and it detects that memory is full. When 
this happens the process for disabling and syncing to the store starts and the 
cache goes through and makes sure any previously cached messages that may be 
pending to be written are completed (either acked and cancelled or written to 
disk and completed) and after that will tell the store where to resume, which 
would be after the last cached message. When the cache is disabled, new writes 
should [no 
longer|https://github.com/apache/activemq/blob/3400983a22284a28a8989d4b0aaf762090b0911a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java#L895]
 be async (AMQ-3149) because we need to have the messages written to disk to be 
dispatched and prevent stuck/delayed delivery.

In theory, because the store was told the last cached message, the new incoming 
message that triggered the disabling/sync would be eventually paged off disk 
and dispatched. However, there is a race condition bug and what is actually 
happening is sometimes the new incoming message has not completed the write to 
the store when the queue goes to fill the next batch to dispatch (because it 
was async still), so it gets missed as it's still pending. Normally as 
described earlier, if the cache is disabled, all the new store writes should be 
sync and not async messages but in this case the message that triggered the 
disabling/sync was still submitted as async but never cached. Furthermore, if 
there are several producers you can have multiple in flight async messages that 
were submitted before the cache was disabled, but then by the time they get 
added to the cache the cache is seen as disabled and they are not added.

The end result is that if the consumer and producers are very fast, you might 
have one or more pending async writes that are skipped by the cache because it 
was disabled so you get into a similar situation as aMQ-3149 described, and 
when the cursor goes to page off the next messages it may not see the pending 
write(s) that haven't finished (and was not added to the cache) so those 
messages are skipped. By the time the incoming message is finished writing to 
the store, the disk cursor has moved past it and the message will be skipped 
and gets stuck until the broker restarts to reset the batch.

This can happen repeatedly for cache enables/disables which is why you might 
see 1 stuck message or more if it repeatedly happens.
h3. Solution:

The solution is actually rather simple and a couple lines of code. When the 
cache is disabled we just need to ensure that any messages that are in flight 
are now treated as sync and not async so they do not get missed as they will no 
longer be added to the memory in the cache.

When a message added to the cache triggers the cache to disable, we just need 
to wait for its async task to finish so that it will be visible when reading in 
the next batch and paging off disk before we set the batch on the store. Also, 
lots of fast producers could cause other messages to be submitted as async just 
before the cache was disabled, and then by the time it reaches the cache they 
get skipped so we need to wait on those as well. 

Writing a unit test for this would be very difficult due to the race conditions 
an async going on, however these small changes are very simple and low risk as 
the only thing that is happening is we are just converting waiting on the async 
write to finish (converts them to sync) during the small window when the cache 
is toggled off which prevents the bug.

  was:
For the last several years I have occasionally seen "stuck" messages that 
appear on queues that will not be dispatched until after a broker restart. The 
bug looks to be the same as described in 
https://issues.apache.org/jira/browse/AMQ-2955 (the root cause was never 
figured out, it was just closed as they couldn't reproduce it). The resulting 
behavior seen is that KahaDB has the batch cursor set to a point after a 
message that is stored so that message will never dispatch.  There's been some 
other work done previously to help this issue, notably AMQ-3149 (more details 
in that issue and more below) 

I recently figured out how to reproduce it in a test environment and finally 
tracked down what the root cause and a fix. Not surprisingly, there are a few 
things at play here and the bug is a race condition so it won't be seen unless 
a bunch of things hold true (and if the broker is configured a certain way)
h3. Background:

There are 2 optimizations that the broker uses that are playing into this and 
both must be enabled for the issue to happen.
 # {{useCache=true}} , The normal flow for incoming messages is that they get 
written to the store and then they get paged off disk (same thread or another 
thread) to be dispatched to consumers. However, there's also a message cache 
and if enabled and if there's free memory, the message will be added to the the 
cache after sending to disk so we don't need to re-read it off disk again later 
when dispatching.
 # {{concurrentStoreAndDispatchQueues=true}} The broker also has an 
optimization for queues where it it will try and dispatch incoming messages 
concurrently to consumers while also writing to disk. (Async message writes) if 
the consumers are fast enough to ack, we can cancel the disk write which saves 
disk IO and this obviously is a benefit for slow disks. This requires the cache 
to be enabled as described in AMQ-3149 otherwise we run into problems because 
messages won't be dispatched if not finished so this mode is really only useful 
for the cache being enabled. Furthermore, if the cache is not enabled messages 
could get stuck if no new messages come in for a while so that's another reason 
this mode has no effect if the cache is off.

The two settings work together and in practice this means the flow ends up 
being that the message is submitted to the store to be added as part of an 
async task that is queued up in the background by the store. While the task is 
in the queue, the message is then concurrently added to the in memory cache and 
the broker will proceed to dispatch to consumers, who may or may not 
acknowledge dispatched messages before the disk write is finished if the 
consumers are fast and keeping up. Messages that were already written are 
removed like normal but if the async task was not finished it gets cancelled 
and saves a disk write.
h3. Bug description:

When the broker runs out of memory to cache messages, the cache has to be 
[disabled|https://github.com/apache/activemq/blob/3400983a22284a28a8989d4b0aaf762090b0911a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java#L258].
 As part of this process the cache has to 
[tell|https://github.com/apache/activemq/blob/3400983a22284a28a8989d4b0aaf762090b0911a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java#L336]
 the store what the last message is that was cached so that when the cache is 
exhausted we can resume paging off disk and dispatching in the correct spot.

The process for disabling the cache starts when a new incoming message is 
attempted to be added to the cache and it detects that memory is full. When 
this happens the process for disabling and syncing to the store starts and the 
cache goes through and makes sure any previously cached messages that may be 
pending to be written are completed (either acked and cancelled or written to 
disk and completed) and after that will tell the store where to resume, which 
would be after the last cached message. When the cache is disabled, new writes 
should [no 
longer|[http://example.com|https://github.com/apache/activemq/blob/3400983a22284a28a8989d4b0aaf762090b0911a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java#L895]]
 be async (AMQ-3149) because we need to have the messages written to disk to be 
dispatched and prevent stuck/delayed delivery.

In theory, because the store was told the last cached message, the new incoming 
message that triggered the disabling/sync would be eventually paged off disk 
and dispatched. However, there is a race condition bug and what is actually 
happening is sometimes the new incoming message has not completed the write to 
the store when the queue goes to fill the next batch to dispatch (because it 
was async still), so it gets missed as it's still pending. Normally as 
described earlier, if the cache is disabled, all the new store writes should be 
sync and not async messages but in this case the message that triggered the 
disabling/sync was still submitted as async but never cached. Furthermore, if 
there are several producers you can have multiple in flight async messages that 
were submitted before the cache was disabled, but then by the time they get 
added to the cache the cache is seen as disabled and they are not added.

The end result is that if the consumer and producers are very fast, you might 
have one or more pending async writes that are skipped by the cache because it 
was disabled so you get into a similar situation as aMQ-3149 described, and 
when the cursor goes to page off the next messages it may not see the pending 
write(s) that haven't finished (and was not added to the cache) so those 
messages are skipped. By the time the incoming message is finished writing to 
the store, the disk cursor has moved past it and the message will be skipped 
and gets stuck until the broker restarts to reset the batch.

This can happen repeatedly for cache enables/disables which is why you might 
see 1 stuck message or more if it repeatedly happens.
h3. Solution:

The solution is actually rather simple and a couple lines of code. When the 
cache is disabled we just need to ensure that any messages that are in flight 
are now treated as sync and not async so they do not get missed as they will no 
longer be added to the memory in the cache.

When a message added to the cache triggers the cache to disable, we just need 
to wait for its async task to finish so that it will be visible when reading in 
the next batch and paging off disk before we set the batch on the store. Also, 
lots of fast producers could cause other messages to be submitted as async just 
before the cache was disabled, and then by the time it reaches the cache they 
get skipped so we need to wait on those as well. 

Writing a unit test for this would be very difficult due to the race conditions 
an async going on, however these small changes are very simple and low risk as 
the only thing that is happening is we are just converting waiting on the async 
write to finish (converts them to sync) during the small window when the cache 
is toggled off which prevents the bug.


> Messages can become stuck on Queues
> -----------------------------------
>
>                 Key: AMQ-9625
>                 URL: https://issues.apache.org/jira/browse/AMQ-9625
>             Project: ActiveMQ Classic
>          Issue Type: Bug
>    Affects Versions: 5.18.6, 6.1.4
>            Reporter: Christopher L. Shannon
>            Assignee: Christopher L. Shannon
>            Priority: Major
>             Fix For: 6.2.0, 5.18.7, 6.1.5
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> For the last several years I have occasionally seen "stuck" messages that 
> appear on queues that will not be dispatched until after a broker restart. 
> The bug looks to be the same as described in 
> https://issues.apache.org/jira/browse/AMQ-2955 (the root cause was never 
> figured out, it was just closed as they couldn't reproduce it). The resulting 
> behavior seen is that KahaDB has the batch cursor set to a point after a 
> message that is stored so that message will never dispatch.  There's been 
> some other work done previously to help this issue, notably AMQ-3149 (more 
> details in that issue and more below) 
> I recently figured out how to reproduce it in a test environment and finally 
> tracked down what the root cause and a fix. Not surprisingly, there are a few 
> things at play here and the bug is a race condition so it won't be seen 
> unless a bunch of things hold true (and if the broker is configured a certain 
> way)
> h3. Background:
> There are 2 optimizations that the broker uses that are playing into this and 
> both must be enabled for the issue to happen.
>  # {{useCache=true}} , The normal flow for incoming messages is that they get 
> written to the store and then they get paged off disk (same thread or another 
> thread) to be dispatched to consumers. However, there's also a message cache 
> and if enabled and if there's free memory, the message will be added to the 
> the cache after sending to disk so we don't need to re-read it off disk again 
> later when dispatching.
>  # {{concurrentStoreAndDispatchQueues=true}} The broker also has an 
> optimization for queues where it it will try and dispatch incoming messages 
> concurrently to consumers while also writing to disk. (Async message writes) 
> if the consumers are fast enough to ack, we can cancel the disk write which 
> saves disk IO and this obviously is a benefit for slow disks. This requires 
> the cache to be enabled as described in AMQ-3149 otherwise we run into 
> problems because messages won't be dispatched if not finished so this mode is 
> really only useful for the cache being enabled. Furthermore, if the cache is 
> not enabled messages could get stuck if no new messages come in for a while 
> so that's another reason this mode has no effect if the cache is off.
> The two settings work together and in practice this means the flow ends up 
> being that the message is submitted to the store to be added as part of an 
> async task that is queued up in the background by the store. While the task 
> is in the queue, the message is then concurrently added to the in memory 
> cache and the broker will proceed to dispatch to consumers, who may or may 
> not acknowledge dispatched messages before the disk write is finished if the 
> consumers are fast and keeping up. Messages that were already written are 
> removed like normal but if the async task was not finished it gets cancelled 
> and saves a disk write.
> h3. Bug description:
> When the broker runs out of memory to cache messages, the cache has to be 
> [disabled|https://github.com/apache/activemq/blob/3400983a22284a28a8989d4b0aaf762090b0911a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java#L258].
>  As part of this process the cache has to 
> [tell|https://github.com/apache/activemq/blob/3400983a22284a28a8989d4b0aaf762090b0911a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java#L336]
>  the store what the last message is that was cached so that when the cache is 
> exhausted we can resume paging off disk and dispatching in the correct spot.
> The process for disabling the cache starts when a new incoming message is 
> attempted to be added to the cache and it detects that memory is full. When 
> this happens the process for disabling and syncing to the store starts and 
> the cache goes through and makes sure any previously cached messages that may 
> be pending to be written are completed (either acked and cancelled or written 
> to disk and completed) and after that will tell the store where to resume, 
> which would be after the last cached message. When the cache is disabled, new 
> writes should [no 
> longer|https://github.com/apache/activemq/blob/3400983a22284a28a8989d4b0aaf762090b0911a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java#L895]
>  be async (AMQ-3149) because we need to have the messages written to disk to 
> be dispatched and prevent stuck/delayed delivery.
> In theory, because the store was told the last cached message, the new 
> incoming message that triggered the disabling/sync would be eventually paged 
> off disk and dispatched. However, there is a race condition bug and what is 
> actually happening is sometimes the new incoming message has not completed 
> the write to the store when the queue goes to fill the next batch to dispatch 
> (because it was async still), so it gets missed as it's still pending. 
> Normally as described earlier, if the cache is disabled, all the new store 
> writes should be sync and not async messages but in this case the message 
> that triggered the disabling/sync was still submitted as async but never 
> cached. Furthermore, if there are several producers you can have multiple in 
> flight async messages that were submitted before the cache was disabled, but 
> then by the time they get added to the cache the cache is seen as disabled 
> and they are not added.
> The end result is that if the consumer and producers are very fast, you might 
> have one or more pending async writes that are skipped by the cache because 
> it was disabled so you get into a similar situation as aMQ-3149 described, 
> and when the cursor goes to page off the next messages it may not see the 
> pending write(s) that haven't finished (and was not added to the cache) so 
> those messages are skipped. By the time the incoming message is finished 
> writing to the store, the disk cursor has moved past it and the message will 
> be skipped and gets stuck until the broker restarts to reset the batch.
> This can happen repeatedly for cache enables/disables which is why you might 
> see 1 stuck message or more if it repeatedly happens.
> h3. Solution:
> The solution is actually rather simple and a couple lines of code. When the 
> cache is disabled we just need to ensure that any messages that are in flight 
> are now treated as sync and not async so they do not get missed as they will 
> no longer be added to the memory in the cache.
> When a message added to the cache triggers the cache to disable, we just need 
> to wait for its async task to finish so that it will be visible when reading 
> in the next batch and paging off disk before we set the batch on the store. 
> Also, lots of fast producers could cause other messages to be submitted as 
> async just before the cache was disabled, and then by the time it reaches the 
> cache they get skipped so we need to wait on those as well. 
> Writing a unit test for this would be very difficult due to the race 
> conditions an async going on, however these small changes are very simple and 
> low risk as the only thing that is happening is we are just converting 
> waiting on the async write to finish (converts them to sync) during the small 
> window when the cache is toggled off which prevents the bug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to