This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 8eae494904 fix(test): fix CompositeConsumerNetworkBridgeTest flakiness 
(#1742)
8eae494904 is described below

commit 8eae494904bce6022b4ad5cf0ea62579e3ecbb36
Author: JB Onofré <[email protected]>
AuthorDate: Mon Mar 9 14:23:45 2026 +0100

    fix(test): fix CompositeConsumerNetworkBridgeTest flakiness (#1742)
    
    Replace Thread.sleep with proper synchronization to eliminate race
    conditions: use assertBridgeStarted() instead of sleeping for advisory
    propagation, reorder assertions to confirm individual topic subs before
    checking composite has none, and retry removeSubscription via
    Wait.waitFor instead of a fixed sleep after closing durable subscribers.
---
 .../CompositeConsumerNetworkBridgeTest.java        | 41 +++++++++++++---------
 1 file changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
index 17b0447dd4..77ac2a5f83 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
@@ -101,17 +101,18 @@ public class CompositeConsumerNetworkBridgeTest extends 
DynamicNetworkTestSuppor
 
         // The remote broker should create two durable subs instead of 1
         // Should be 1 durable on each of the topics that are part of the 
composite
-        assertConsumersCount(broker2, compositeTopic, 0);
-        assertNCDurableSubsCount(broker2, compositeTopic, 0);
+        // First confirm individual topic subs are created (wait for bridge to 
finish processing)
         for (ActiveMQTopic topic : topics) {
             assertConsumersCount(broker2, topic, 1);
             assertNCDurableSubsCount(broker2, topic, 1);
         }
+        // Then verify no subs were created on the composite topic itself
+        assertConsumersCount(broker2, compositeTopic, 0);
+        assertNCDurableSubsCount(broker2, compositeTopic, 0);
         assertCompositeMapCounts(1, 1);
 
         durSub.close();
-        Thread.sleep(1000);
-        removeSubscription(broker1, subName);
+        waitAndRemoveSubscription(broker1, subName);
 
         //Verify cleanup
         for (ActiveMQTopic topic : topics) {
@@ -148,11 +149,10 @@ public class CompositeConsumerNetworkBridgeTest extends 
DynamicNetworkTestSuppor
         assertNotNull(durSub2.receive(1000));
 
         durSub1.close();
-        durSub2.close();;
+        durSub2.close();
 
-        Thread.sleep(1000);
-        removeSubscription(broker1, subName + "1");
-        removeSubscription(broker1, subName + "2");
+        waitAndRemoveSubscription(broker1, subName + "1");
+        waitAndRemoveSubscription(broker1, subName + "2");
         assertCompositeMapCounts(0, 0);
     }
 
@@ -268,17 +268,16 @@ public class CompositeConsumerNetworkBridgeTest extends 
DynamicNetworkTestSuppor
         TopicSubscriber durSub2 = 
session1.createDurableSubscriber(compositeTopic, subName + "2");
         assertConsumersCount(broker1, compositeTopic, 2);
 
-        assertConsumersCount(broker2, compositeTopic, 0);
-        assertNCDurableSubsCount(broker2, compositeTopic, 0);
         for (ActiveMQTopic topic : topics) {
             assertConsumersCount(broker2, topic, 1);
             assertNCDurableSubsCount(broker2, topic, 1);
         }
+        assertConsumersCount(broker2, compositeTopic, 0);
+        assertNCDurableSubsCount(broker2, compositeTopic, 0);
         assertCompositeMapCounts(2, 2);
 
         durSub1.close();
-        Thread.sleep(1000);
-        removeSubscription(broker1, subName + "1");
+        waitAndRemoveSubscription(broker1, subName + "1");
 
         for (ActiveMQTopic topic : topics) {
             assertConsumersCount(broker2, topic, 1);
@@ -286,8 +285,7 @@ public class CompositeConsumerNetworkBridgeTest extends 
DynamicNetworkTestSuppor
         }
 
         durSub2.close();
-        Thread.sleep(1000);
-        removeSubscription(broker1, subName + "2");
+        waitAndRemoveSubscription(broker1, subName + "2");
 
         for (ActiveMQTopic topic : topics) {
             assertConsumersCount(broker2, topic, 0);
@@ -305,8 +303,7 @@ public class CompositeConsumerNetworkBridgeTest extends 
DynamicNetworkTestSuppor
     protected void doSetUp(File localDataDir, File remoteDataDir) throws 
Exception {
         doSetUpRemoteBroker(remoteDataDir);
         doSetUpLocalBroker(localDataDir);
-        //Give time for advisories to propagate
-        Thread.sleep(1000);
+        assertBridgeStarted();
     }
 
     protected void doSetUpLocalBroker(File dataDir) throws Exception {
@@ -414,6 +411,18 @@ public class CompositeConsumerNetworkBridgeTest extends 
DynamicNetworkTestSuppor
         assertTrue( Wait.waitFor(() -> compositeSubSize == 
bridge.compositeSubscriptions.size(), 5000, 500));
     }
 
+    private void waitAndRemoveSubscription(BrokerService broker, String 
subName) throws Exception {
+        assertTrue("Subscription " + subName + " should be removable",
+            Wait.waitFor(() -> {
+                try {
+                    removeSubscription(broker, subName);
+                    return true;
+                } catch (Exception e) {
+                    return false;
+                }
+            }, 10000, 500));
+    }
+
     protected DurableConduitBridge findBridge() throws Exception {
         if (flow.equals(FLOW.FORWARD)) {
             return findBridge(remoteBroker);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to