This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new a987c04962 AMQ-9852 Ensure advisory consumer receives producer 
advisory before creation to prevent race conditions (#1641)
a987c04962 is described below

commit a987c04962438bef3fd94a385a9cc1c82f49c1f6
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Fri Jan 30 16:14:50 2026 +0100

    AMQ-9852 Ensure advisory consumer receives producer advisory before 
creation to prevent race conditions (#1641)
---
 .../apache/activemq/advisory/AdvisoryBroker.java   |  2 ++
 .../broker/advisory/AdvisoryBrokerTest.java        | 26 +++++++++++++---------
 2 files changed, 18 insertions(+), 10 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 38cd49bab0..fb797a6793 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -367,6 +367,8 @@ public class AdvisoryBroker extends BrokerFilter {
             } finally {
                 consumersLock.writeLock().unlock();
             }
+            // Fire advisory outside the lock to avoid potential deadlocks
+            // (fireConsumerAdvisory calls next.send() which can acquire other 
locks)
             if (!dest.isTemporary() || destinations.containsKey(dest)) {
                 fireConsumerAdvisory(context, dest, topic, 
info.createRemoveCommand());
             }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
index fdf6cf04f1..4f9e6dd3e4 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
@@ -135,12 +135,12 @@ public class AdvisoryBrokerTest extends BrokerTestSupport 
{
 
         ActiveMQDestination queue = new ActiveMQQueue("test");
         ActiveMQDestination destination = 
AdvisorySupport.getConsumerAdvisoryTopic(queue);
-        
+
         // Setup a first connection
         StubConnection connection1 = createConnection();
         ConnectionInfo connectionInfo1 = createConnectionInfo();
         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        
+
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
 
@@ -149,12 +149,15 @@ public class AdvisoryBrokerTest extends BrokerTestSupport 
{
         ConnectionInfo connectionInfo2 = createConnectionInfo();
         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, queue);
-        consumerInfo2.setPrefetchSize(100);        
+        consumerInfo2.setPrefetchSize(100);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
-        connection2.send(consumerInfo2);
-        
-        // We should get an advisory of the previous consumer.        
+        // Use request() to ensure consumer is fully registered and its "new 
consumer"
+        // advisory is fired before we create the advisory consumer. This 
prevents a race
+        // where the advisory consumer could receive both a replay AND the 
broadcast advisory.
+        connection2.request(consumerInfo2);
+
+        // We should get an advisory of the previous consumer.
         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
         consumerInfo1.setPrefetchSize(100);
         connection1.send(consumerInfo1);
@@ -246,12 +249,15 @@ public class AdvisoryBrokerTest extends BrokerTestSupport 
{
         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
         ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
         producerInfo2.setDestination(queue);
-        
+
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
-        connection2.send(producerInfo2);
-        
-        // Create the advisory consumer.. it should see the previous producer  
      
+        // Use request() to ensure producer is fully registered and its "new 
producer"
+        // advisory is fired before we create the advisory consumer. This 
prevents a race
+        // where the advisory consumer could receive both a replay AND the 
broadcast advisory.
+        connection2.request(producerInfo2);
+
+        // Create the advisory consumer.. it should see the previous producer
         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
         consumerInfo1.setPrefetchSize(100);
         connection1.send(consumerInfo1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to