This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch 6.1.x.AMF.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit c06969583d9d6340674e332e9dce356fad2a314f
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed Nov 20 12:22:57 2024 -0500

    AMQ-9625 - Prevent queue messages from becoming stuck
    
    Fixes a race condition bug that can lead to a message being missed on
    dispatch and stuck on a Queue until restart when caching and
    concurrentStoreAndDispatch are enabled on a Queue and the cache becomes
    disabled.
    
    AMFU-168, AMF-3692 - cherry-pick in fix from FOSS broker
---
 .../broker/region/cursors/AbstractStoreCursor.java | 49 +++++++++++++++++++++-
 1 file changed, 48 insertions(+), 1 deletion(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 26611a2482..c38498cab4 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -259,12 +259,20 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
             disableCache = true;
         }
 
-        if (disableCache && isCacheEnabled()) {
+        // AMQ-9625 - use this.cacheEnabled directly because the method 
isCacheEnabled() is overriden
+        // to try to re-enable the cache which we don't want at this point as 
we already skipped
+        // adding it to the cache
+        if (disableCache && this.cacheEnabled) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("{} - disabling cache on add {} {}", this, 
node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
             }
             syncWithStore(node.getMessage());
             setCacheEnabled(false);
+        } else if (!this.cacheEnabled) {
+            // AMQ-9625 - Verify and wait on previous in flight async messages 
here if another
+            // thread triggered the cache to be disabled
+            // see the waitForAsyncMessage() method and Jira for more info
+            waitForAsyncMessage(node.getMessage());
         }
         size++;
         return true;
@@ -317,6 +325,11 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
             break;
         }
 
+        // AMQ-9625 - If we are disabling the cache and syncing the store then
+        // we need to wait for task to finish before updating the store batch
+        // see the waitForAsyncMessage() method and Jira for more info
+        waitForAsyncMessage(currentAdd);
+
         MessageId candidate = lastCachedIds[ASYNC_ADD];
         if (candidate != null) {
             // ensure we don't skip current possibly sync add b/c we waited on 
the future
@@ -565,4 +578,38 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     public Subscription getSubscription() {
         return null;
     }
+
+    // AMQ-9625 - If the cache is disabled check if we need to wait for an 
async message
+    // to finish its task because the message is not being added to the cache.
+    // Normally, async messages will only be used if the cache is enabled so 
most of the time
+    // this check should not find any async messages to wait on if the cache 
is disabled
+    // and is basically a noop.
+    //
+    // However, while messages are being published, if the memory limit is 
reached the first
+    // thread that is adding the message that reaches the limit will disable 
the cache.
+    // This means there will be 1 or more potentially outstanding in flight 
adds that are
+    // queued up as async writes to the store.
+    //
+    // If the cache is disabled, we need to wait for any async message tasks 
to be
+    // finished otherwise there is a chance of missing the messages on dispatch
+    // when the queue pages in the next batch because store writes will finish 
after
+    // the store cursor has already moved ahead leading to a stuck message.
+    private void waitForAsyncMessage(Message node) {
+        // Note: isRecievedByDFBridge() was repurposed to be used to mark 
messages that
+        // are added to the store as async
+        if (node.getMessage().isRecievedByDFBridge()) {
+            final Object futureOrLong = 
node.getMessageId().getFutureOrSequenceLong();
+            if (futureOrLong instanceof Future) {
+                try {
+                    ((Future<?>) futureOrLong).get();
+                } catch (Exception exceptionOk) {
+                    // We don't care if we get an exception (cancelled, etc) 
we just want
+                    // to ensure the task is finished and not pending.
+                } finally {
+                    LOG.trace("{} - future finished inside waitForAsyncMessage 
{} {}", this,
+                        node.getMessageId(), futureOrLong);
+                }
+            }
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to