This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-6.1.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-6.1.x by this push:
new 3d8ce8eec1 AMQ-9625 - Prevent queue messages from becoming stuck
3d8ce8eec1 is described below
commit 3d8ce8eec17e40c25441649ca2853fbd1778fb9e
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed Nov 20 12:22:57 2024 -0500
AMQ-9625 - Prevent queue messages from becoming stuck
Fixes a race condition bug that can lead to a message being missed on
dispatch and stuck on a Queue until restart when caching and
concurrentStoreAndDispatch are enabled on a Queue and the cache becomes
disabled.
(cherry picked from commit 7f218fe05d67117573329593a712ec420532810d)
---
.../broker/region/cursors/AbstractStoreCursor.java | 49 +++++++++++++++++++++-
1 file changed, 48 insertions(+), 1 deletion(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 71a83acaef..30089e3551 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -255,12 +255,20 @@ public abstract class AbstractStoreCursor extends
AbstractPendingMessageCursor i
disableCache = true;
}
- if (disableCache && isCacheEnabled()) {
+ // AMQ-9625 - use this.cacheEnabled directly because the method
isCacheEnabled() is overriden
+ // to try to re-enable the cache which we don't want at this point as
we already skipped
+ // adding it to the cache
+ if (disableCache && this.cacheEnabled) {
if (LOG.isTraceEnabled()) {
LOG.trace("{} - disabling cache on add {} {}", this,
node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
}
syncWithStore(node.getMessage());
setCacheEnabled(false);
+ } else if (!this.cacheEnabled) {
+ // AMQ-9625 - Verify and wait on previous in flight async messages
here if another
+ // thread triggered the cache to be disabled
+ // see the waitForAsyncMessage() method and Jira for more info
+ waitForAsyncMessage(node.getMessage());
}
size++;
return true;
@@ -319,6 +327,11 @@ public abstract class AbstractStoreCursor extends
AbstractPendingMessageCursor i
break;
}
+ // AMQ-9625 - If we are disabling the cache and syncing the store then
+ // we need to wait for task to finish before updating the store batch
+ // see the waitForAsyncMessage() method and Jira for more info
+ waitForAsyncMessage(currentAdd);
+
MessageId candidate = lastCachedIds[ASYNC_ADD];
if (candidate != null) {
// ensure we don't skip current possibly sync add b/c we waited on
the future
@@ -530,4 +543,38 @@ public abstract class AbstractStoreCursor extends
AbstractPendingMessageCursor i
public Subscription getSubscription() {
return null;
}
+
+ // AMQ-9625 - If the cache is disabled check if we need to wait for an
async message
+ // to finish its task because the message is not being added to the cache.
+ // Normally, async messages will only be used if the cache is enabled so
most of the time
+ // this check should not find any async messages to wait on if the cache
is disabled
+ // and is basically a noop.
+ //
+ // However, while messages are being published, if the memory limit is
reached the first
+ // thread that is adding the message that reaches the limit will disable
the cache.
+ // This means there will be 1 or more potentially outstanding in flight
adds that are
+ // queued up as async writes to the store.
+ //
+ // If the cache is disabled, we need to wait for any async message tasks
to be
+ // finished otherwise there is a chance of missing the messages on dispatch
+ // when the queue pages in the next batch because store writes will finish
after
+ // the store cursor has already moved ahead leading to a stuck message.
+ private void waitForAsyncMessage(Message node) {
+ // Note: isRecievedByDFBridge() was repurposed to be used to mark
messages that
+ // are added to the store as async
+ if (node.getMessage().isRecievedByDFBridge()) {
+ final Object futureOrLong =
node.getMessageId().getFutureOrSequenceLong();
+ if (futureOrLong instanceof Future) {
+ try {
+ ((Future<?>) futureOrLong).get();
+ } catch (Exception exceptionOk) {
+ // We don't care if we get an exception (cancelled, etc)
we just want
+ // to ensure the task is finished and not pending.
+ } finally {
+ LOG.trace("{} - future finished inside waitForAsyncMessage
{} {}", this,
+ node.getMessageId(), futureOrLong);
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact