This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new a987c04962 AMQ-9852 Ensure advisory consumer receives producer
advisory before creation to prevent race conditions (#1641)
a987c04962 is described below
commit a987c04962438bef3fd94a385a9cc1c82f49c1f6
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Fri Jan 30 16:14:50 2026 +0100
AMQ-9852 Ensure advisory consumer receives producer advisory before
creation to prevent race conditions (#1641)
---
.../apache/activemq/advisory/AdvisoryBroker.java | 2 ++
.../broker/advisory/AdvisoryBrokerTest.java | 26 +++++++++++++---------
2 files changed, 18 insertions(+), 10 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 38cd49bab0..fb797a6793 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -367,6 +367,8 @@ public class AdvisoryBroker extends BrokerFilter {
} finally {
consumersLock.writeLock().unlock();
}
+ // Fire advisory outside the lock to avoid potential deadlocks
+ // (fireConsumerAdvisory calls next.send() which can acquire other
locks)
if (!dest.isTemporary() || destinations.containsKey(dest)) {
fireConsumerAdvisory(context, dest, topic,
info.createRemoveCommand());
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
index fdf6cf04f1..4f9e6dd3e4 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
@@ -135,12 +135,12 @@ public class AdvisoryBrokerTest extends BrokerTestSupport
{
ActiveMQDestination queue = new ActiveMQQueue("test");
ActiveMQDestination destination =
AdvisorySupport.getConsumerAdvisoryTopic(queue);
-
+
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-
+
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
@@ -149,12 +149,15 @@ public class AdvisoryBrokerTest extends BrokerTestSupport
{
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, queue);
- consumerInfo2.setPrefetchSize(100);
+ consumerInfo2.setPrefetchSize(100);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
- connection2.send(consumerInfo2);
-
- // We should get an advisory of the previous consumer.
+ // Use request() to ensure consumer is fully registered and its "new
consumer"
+ // advisory is fired before we create the advisory consumer. This
prevents a race
+ // where the advisory consumer could receive both a replay AND the
broadcast advisory.
+ connection2.request(consumerInfo2);
+
+ // We should get an advisory of the previous consumer.
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
consumerInfo1.setPrefetchSize(100);
connection1.send(consumerInfo1);
@@ -246,12 +249,15 @@ public class AdvisoryBrokerTest extends BrokerTestSupport
{
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
producerInfo2.setDestination(queue);
-
+
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
- connection2.send(producerInfo2);
-
- // Create the advisory consumer.. it should see the previous producer
+ // Use request() to ensure producer is fully registered and its "new
producer"
+ // advisory is fired before we create the advisory consumer. This
prevents a race
+ // where the advisory consumer could receive both a replay AND the
broadcast advisory.
+ connection2.request(producerInfo2);
+
+ // Create the advisory consumer.. it should see the previous producer
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
consumerInfo1.setPrefetchSize(100);
connection1.send(consumerInfo1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact