This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 901956d4dd AMQ-9262 - Fix network subscriptions for composite 
consumers (#1014)
901956d4dd is described below

commit 901956d4ddb6a0ea9fe5fedf39732117ab68f087
Author: Christopher L. Shannon <christopher.l.shan...@gmail.com>
AuthorDate: Wed Jun 7 07:18:18 2023 -0400

    AMQ-9262 - Fix network subscriptions for composite consumers (#1014)
    
    This fixes network subscriptions that are generated on demand when a
    consumer uses composite destinations. Before this fix conduit
    subscriptions didn't work correctly. This fix now splits up the
    composite dest and generates correct demand for each of the individual
    destinations.
---
 .../network/DemandForwardingBridgeSupport.java     |  99 ++++-
 .../CompositeConsumerNetworkBridgeTest.java        | 435 +++++++++++++++++++++
 .../network/DurableSyncNetworkBridgeTest.java      |  15 +-
 .../network/DynamicNetworkTestSupport.java         |  61 ++-
 .../network/ForceDurableNetworkBridgeTest.java     |   9 +-
 5 files changed, 567 insertions(+), 52 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 28d136fe84..57afc85d11 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -136,7 +137,9 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
     protected ActiveMQDestination[] durableDestinations;
     protected final ConcurrentMap<ConsumerId, DemandSubscription> 
subscriptionMapByLocalId = new ConcurrentHashMap<>();
     protected final ConcurrentMap<ConsumerId, DemandSubscription> 
subscriptionMapByRemoteId = new ConcurrentHashMap<>();
-    protected final Set<ConsumerId> forcedDurableRemoteId = 
Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
+    protected final Set<ConsumerId> forcedDurableRemoteId = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+    protected final ConcurrentMap<ConsumerId, Set<ConsumerId>> 
compositeConsumerIds = new ConcurrentHashMap<>();
+    protected final ConcurrentMap<SubscriptionInfo, Set<SubscriptionInfo>> 
compositeSubscriptions = new ConcurrentHashMap<>();
     protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
     protected final CountDownLatch startedLatch = new CountDownLatch(2);
     protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
@@ -1015,6 +1018,18 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
 
         } else if (data.getClass() == RemoveInfo.class) {
             ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
+
+            // If we have an entry in compositeConsumerIds then this consumer 
was a
+            // composite consumer and we need to remove the entries in the set 
and
+            // not the consumer id we received here
+            final Set<ConsumerId> compositeIds = 
compositeConsumerIds.remove(id);
+            if (compositeIds != null) {
+                for (ConsumerId compositeId : compositeIds) {
+                    serviceRemoteConsumerAdvisory(new RemoveInfo(compositeId));
+                }
+                return;
+            }
+
             removeDemandSubscription(id);
 
             if (forcedDurableRemoteId.remove(id)) {
@@ -1030,6 +1045,23 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         } else if (data.getClass() == RemoveSubscriptionInfo.class) {
             final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) 
data);
             final SubscriptionInfo subscriptionInfo = new 
SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
+
+            // If we have an entry in compositeSubscriptions then this 
consumer was a
+            // composite consumer and we need to remove the entries in the set 
and not
+            // the subscription that we received here
+            final Set<SubscriptionInfo> compositeSubs =
+                this.compositeSubscriptions.remove(subscriptionInfo);
+            if (compositeSubs != null) {
+                for (SubscriptionInfo compositeSub : compositeSubs) {
+                    RemoveSubscriptionInfo remove = new 
RemoveSubscriptionInfo();
+                    remove.setClientId(compositeSub.getClientId());
+                    
remove.setSubscriptionName(compositeSub.getSubscriptionName());
+                    
remove.setConnectionId(this.localConnectionInfo.getConnectionId());
+                    serviceRemoteConsumerAdvisory(remove);
+                }
+                return;
+            }
+
             final boolean proxyBridgeSub = 
isProxyBridgeSubscription(subscriptionInfo.getClientId(),
                     subscriptionInfo.getSubscriptionName());
             for (Iterator<DemandSubscription> i = 
subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
@@ -1415,6 +1447,12 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
     }
 
     protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws 
IOException {
+        // Check if this was processed and split into new consumers for 
composite dests
+        if (splitCompositeConsumer(consumerInfo)) {
+            // If true we don't want to continue processing the original 
consumer info
+            return;
+        }
+
         ConsumerInfo info = consumerInfo.copy();
         addRemoteBrokerToBrokerPath(info);
         DemandSubscription sub = createDemandSubscription(info);
@@ -1443,6 +1481,65 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         }
     }
 
+    // Generate new consumers for each destination that part of a composite 
destination list for a consumer
+    private boolean splitCompositeConsumer(final ConsumerInfo consumerInfo) 
throws IOException {
+        // If not a composite destination or if an advisory topic then return 
false
+        // So we process normally and don't split
+        if (!consumerInfo.getDestination().isComposite() ||
+            AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination())) {
+            return false;
+        }
+
+        // At this point this is a composite destination and not an advisory 
topic. The destination
+        // will be split into individual destinations to create demand so that 
conduit subscriptions
+        // and durable subscriptions work correctly
+
+        // Handle duplicates, don't need to create again if we already have an 
entry
+        // Just return true so we stop processing
+        if (!isDuplicateSuppressionOff(consumerInfo) && 
compositeConsumerIds.containsKey(
+            consumerInfo.getConsumerId())) {
+            return true;
+        }
+
+        // Get a set to store mapped consumer Ids for each individual 
destination in the composite list
+        // and (if applicable) a set for subscriptions for durables
+        final Set<ConsumerId> consumerIds = 
compositeConsumerIds.computeIfAbsent(
+            consumerInfo.getConsumerId(),
+            k -> Collections.newSetFromMap(new ConcurrentHashMap<>()));
+        final Set<SubscriptionInfo> subscriptions = Optional.ofNullable(
+            consumerInfo.getSubscriptionName()).map(
+            subName -> compositeSubscriptions.computeIfAbsent(
+                new SubscriptionInfo(consumerInfo.getClientId(),
+                    consumerInfo.getSubscriptionName()),
+                k -> Collections.newSetFromMap(new 
ConcurrentHashMap<>()))).orElse(null);
+
+        // Split and go through each destination that is part of the composite 
list and process
+        for (ActiveMQDestination individualDest : consumerInfo.getDestination()
+            .getCompositeDestinations()) {
+            // Create a new consumer info with the individual destinations and
+            // generate new consumer Ids for each and add to the consumerIds 
set
+            final ConsumerInfo info = consumerInfo.copy();
+            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
+                consumerIdGenerator.getNextSequenceId()));
+            info.setDestination(individualDest);
+            consumerIds.add(info.getConsumerId());
+
+            // If there is a subscription name (durable) then generate a new 
one for the dest
+            // and add to the subscriptions set
+            Optional.ofNullable(subscriptions).ifPresent(
+                subs -> {
+                    info.setSubscriptionName(
+                        consumerInfo.getSubscriptionName() + 
individualDest.getPhysicalName());
+                    subs.add(
+                        new SubscriptionInfo(info.getClientId(), 
info.getSubscriptionName()));
+                });
+
+            // Continue on and process the new consumer Info
+            addConsumerInfo(info);
+        }
+        return true;
+    }
+
     private void undoMapRegistration(DemandSubscription sub) {
         subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
         subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
new file mode 100644
index 0000000000..cfdc2fd32d
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test class to verify composite consumers correctly create demand
+ * with a network of brokers, especially conduit subs
+ * See AMQ-9262
+ */
+@RunWith(Parameterized.class)
+public class CompositeConsumerNetworkBridgeTest extends 
DynamicNetworkTestSupport {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(CompositeConsumerNetworkBridgeTest.class);
+
+    private final static String testTopic1 = "test.composite.topic.1";
+    private final static String testTopic2 = "test.composite.topic.2";
+    private final static String testQueue1 = "test.composite.queue.1";
+    private final static String testQueue2 = "test.composite.queue.2";
+    private BrokerService broker1;
+    private BrokerService broker2;
+    private Session session1;
+    private Session session2;
+    private final FLOW flow;
+    private final static List<ActiveMQTopic> topics = List.of(
+        new ActiveMQTopic(testTopic1), new ActiveMQTopic(testTopic2));
+    private final static List<ActiveMQQueue> queues = List.of(
+        new ActiveMQQueue(testQueue1), new ActiveMQQueue(testQueue2));
+
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {FLOW.FORWARD},
+                {FLOW.REVERSE}
+        });
+    }
+
+    public CompositeConsumerNetworkBridgeTest(final FLOW flow) {
+        this.flow = flow;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        doTearDown();
+    }
+
+    /**
+     * Test a composite durable subscription
+     */
+    @Test
+    public void testCompositeDurableSubscriber() throws Exception {
+        setUp();
+        final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + 
"," + testTopic2);
+
+        // Create durable sub on composite destination
+        // Will create a composite consumer on the local broker but
+        // should create 2 consumers on the remote
+        TopicSubscriber durSub = 
session1.createDurableSubscriber(compositeTopic, subName);
+        assertConsumersCount(broker1, compositeTopic, 1);
+
+        // The remote broker should create two durable subs instead of 1
+        // Should be 1 durable on each of the topics that are part of the 
composite
+        assertConsumersCount(broker2, compositeTopic, 0);
+        assertNCDurableSubsCount(broker2, compositeTopic, 0);
+        for (ActiveMQTopic topic : topics) {
+            assertConsumersCount(broker2, topic, 1);
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
+        assertCompositeMapCounts(1, 1);
+
+        durSub.close();
+        Thread.sleep(1000);
+        removeSubscription(broker1, subName);
+
+        //Verify cleanup
+        for (ActiveMQTopic topic : topics) {
+            assertConsumersCount(broker2, topic, 0);
+            assertNCDurableSubsCount(broker2, topic, 0);
+        }
+        assertCompositeMapCounts(0, 0);
+    }
+
+    /**
+     * Test a composite durable subscription and normal subscription
+     */
+    @Test
+    public void testCompositeAndNormalDurableSub() throws Exception {
+        setUp();
+        final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + 
"," + testTopic2);
+
+        // create composite sub and a sub on one of the individual topics
+        TopicSubscriber durSub1 = 
session1.createDurableSubscriber(compositeTopic, subName + "1");
+        TopicSubscriber durSub2 = 
session1.createDurableSubscriber(topics.get(0), subName + "2");
+
+        // Should split the composite and create network subs on individual 
topics
+        for (ActiveMQTopic topic : topics) {
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
+        assertNCDurableSubsCount(broker2, compositeTopic, 0);
+        // Only 1 sub is composite so should just have 1 map entry
+        assertCompositeMapCounts(1, 1);
+
+        // Verify message received
+        MessageProducer producer = session2.createProducer(topics.get(0));
+        producer.send(session2.createTextMessage("test"));
+        assertNotNull(durSub1.receive(1000));
+        assertNotNull(durSub2.receive(1000));
+
+        durSub1.close();
+        durSub2.close();;
+
+        Thread.sleep(1000);
+        removeSubscription(broker1, subName + "1");
+        removeSubscription(broker1, subName + "2");
+        assertCompositeMapCounts(0, 0);
+    }
+
+
+    /**
+     * Test two topic subscriptions that match
+     */
+    @Test
+    public void testTopicCompositeSubs() throws Exception {
+        setUp();
+        final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + 
"," + testTopic2);
+
+        // Create two identical subscriptions on a composite topic
+        MessageConsumer sub1 = session1.createConsumer(compositeTopic);
+        MessageConsumer sub2 = session1.createConsumer(compositeTopic);
+        for (ActiveMQTopic topic : topics) {
+            // Verify the local broker has two subs on each individual topic
+            assertConsumersCount(broker1, topic, 2);
+            // Verify that conduit subscription works correctly now
+            // and only 1 sub on each topic. This used to be broken before 
AMQ-9262
+            // and would create two subscriptions even though conduit was true
+            assertConsumersCount(broker2, topic, 1);
+        }
+        assertCompositeMapCounts(2, 0);
+
+        MessageProducer producer = session2.createProducer(topics.get(0));
+        producer.send(session2.createTextMessage("test"));
+
+        assertNotNull(sub1.receive(1000));
+        assertNotNull(sub2.receive(1000));
+
+        sub1.close();
+        sub2.close();
+
+        assertCompositeMapCounts(0, 0);
+    }
+
+    /**
+     * Test two queue composite subscriptions that match
+     */
+    @Test
+    public void testCompositeQueueSubs() throws Exception {
+        setUp();
+        final ActiveMQQueue compositeQueue = new ActiveMQQueue(testQueue1 + 
"," + testQueue2);
+
+        // Create two matching composite queue subs to test conduit subs
+        MessageConsumer sub1 = session1.createConsumer(compositeQueue);
+        MessageConsumer sub2 = session1.createConsumer(compositeQueue);
+        for (ActiveMQDestination queue : queues) {
+            assertConsumersCount(broker1, queue, 2);
+            // Verify conduit subs now work correctly, this used to be 2
+            // which was wrong as conduit is true and is fixed as of AMQ-9262
+            assertConsumersCount(broker2, queue, 1);
+        }
+        assertCompositeMapCounts(2, 0);
+
+        MessageProducer producer = session2.createProducer(queues.get(0));
+        producer.send(session2.createTextMessage("test"));
+
+        // Make sure one of the queue receivers gets the message
+        assertTrue(sub1.receive(1000) != null
+            || sub2.receive(1000) != null);
+
+        sub1.close();
+        sub2.close();
+        assertCompositeMapCounts(0, 0);
+    }
+
+    /**
+     * Test a composite queue and normal queue sub
+     */
+    @Test
+    public void testCompositeAndNormalQueueSubs() throws Exception {
+        setUp();
+        final ActiveMQQueue compositeQueue = new ActiveMQQueue(testQueue1 + 
"," + testQueue2);
+
+        // Create two matching composite queue subs to test conduit subs
+        MessageConsumer sub1 = session1.createConsumer(compositeQueue);
+        MessageConsumer sub2 = session1.createConsumer(new 
ActiveMQQueue(testQueue2));
+
+        assertConsumersCount(broker1, queues.get(0), 1);
+        assertConsumersCount(broker1, queues.get(1), 2);
+        for (ActiveMQDestination queue : queues) {
+            assertConsumersCount(broker2, queue, 1);
+        }
+        // Only 1 sub is a composite sub
+        assertCompositeMapCounts(1, 0);
+
+        MessageProducer producer = session2.createProducer(queues.get(0));
+        producer.send(session2.createTextMessage("test"));
+
+        // Make sure message received by sub1
+        assertNotNull(sub1.receive(1000));
+
+        sub1.close();
+        sub2.close();
+        assertCompositeMapCounts(0, 0);
+    }
+
+    /**
+     * Test two matching durable composite subs
+     *
+     * This test used to fail with an exception as the bridge would
+     * try and create a duplicate network durable with the same client id
+     * and sub and would error
+     */
+    @Test
+    public void testCompositeTwoDurableSubscribers() throws Exception {
+        setUp();
+        final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + 
"," + testTopic2);
+
+        TopicSubscriber durSub1 = 
session1.createDurableSubscriber(compositeTopic, subName + "1");
+        TopicSubscriber durSub2 = 
session1.createDurableSubscriber(compositeTopic, subName + "2");
+        assertConsumersCount(broker1, compositeTopic, 2);
+
+        assertConsumersCount(broker2, compositeTopic, 0);
+        assertNCDurableSubsCount(broker2, compositeTopic, 0);
+        for (ActiveMQTopic topic : topics) {
+            assertConsumersCount(broker2, topic, 1);
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
+        assertCompositeMapCounts(2, 2);
+
+        durSub1.close();
+        Thread.sleep(1000);
+        removeSubscription(broker1, subName + "1");
+
+        for (ActiveMQTopic topic : topics) {
+            assertConsumersCount(broker2, topic, 1);
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
+
+        durSub2.close();
+        Thread.sleep(1000);
+        removeSubscription(broker1, subName + "2");
+
+        for (ActiveMQTopic topic : topics) {
+            assertConsumersCount(broker2, topic, 0);
+            assertNCDurableSubsCount(broker2, topic, 0);
+        }
+
+        assertCompositeMapCounts(0, 0);
+    }
+
+
+    private void setUp() throws Exception {
+        doSetUp(tempFolder.newFolder(), tempFolder.newFolder());
+    }
+
+    protected void doSetUp(File localDataDir, File remoteDataDir) throws 
Exception {
+        doSetUpRemoteBroker(remoteDataDir);
+        doSetUpLocalBroker(localDataDir);
+        //Give time for advisories to propagate
+        Thread.sleep(1000);
+    }
+
+    protected void doSetUpLocalBroker(File dataDir) throws Exception {
+        localBroker = createLocalBroker(dataDir);
+        localBroker.setDeleteAllMessagesOnStartup(true);
+        localBroker.start();
+        localBroker.waitUntilStarted();
+        URI localURI = localBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac = new 
ActiveMQConnectionFactory(localURI);
+        fac.setAlwaysSyncSend(true);
+        fac.setDispatchAsync(false);
+        localConnection = fac.createConnection();
+        localConnection.setClientID("clientId");
+        localConnection.start();
+
+        Wait.waitFor(() -> 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, 10000, 
500);
+        localSession = localConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        if (flow.equals(FLOW.FORWARD)) {
+            broker1 = localBroker;
+            session1 = localSession;
+        } else {
+            broker2 = localBroker;
+            session2 = localSession;
+        }
+    }
+
+    protected void doSetUpRemoteBroker(File dataDir) throws Exception {
+        remoteBroker = createRemoteBroker(dataDir);
+        remoteBroker.setDeleteAllMessagesOnStartup(true);
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+        URI remoteURI = remoteBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac = new 
ActiveMQConnectionFactory(remoteURI);
+        remoteConnection = fac.createConnection();
+        remoteConnection.setClientID("clientId");
+        remoteConnection.start();
+        remoteSession = remoteConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        if (flow.equals(FLOW.FORWARD)) {
+            broker2 = remoteBroker;
+            session2 = remoteSession;
+        } else {
+            broker1 = remoteBroker;
+            session1 = remoteSession;
+        }
+    }
+
+    protected BrokerService createLocalBroker(File dataDir) throws Exception {
+
+        BrokerService brokerService = new BrokerService();
+        brokerService.setMonitorConnectionSplits(true);
+        brokerService.setDataDirectoryFile(dataDir);
+        brokerService.setBrokerName("localBroker");
+        brokerService.addNetworkConnector(configureLocalNetworkConnector());
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.setDestinations(new ActiveMQDestination[] {
+                new ActiveMQTopic(testTopic1),
+                new ActiveMQTopic(testTopic2),
+                new ActiveMQQueue(testQueue1),
+                new ActiveMQQueue(testQueue2)});
+
+        return brokerService;
+    }
+
+    protected NetworkConnector configureLocalNetworkConnector() throws 
Exception {
+
+        List<TransportConnector> transportConnectors = 
remoteBroker.getTransportConnectors();
+        URI remoteURI = transportConnectors.get(0).getConnectUri();
+        String uri = "static:(" + remoteURI + ")";
+        NetworkConnector connector = new DiscoveryNetworkConnector(new 
URI(uri));
+        connector.setName("networkConnector");
+        connector.setDynamicOnly(false);
+        connector.setDecreaseNetworkConsumerPriority(false);
+        connector.setConduitSubscriptions(true);
+        connector.setDuplex(true);
+        connector.setStaticBridge(false);
+        ArrayList<ActiveMQDestination> dynamicIncludedDestinations = new 
ArrayList<>();
+        dynamicIncludedDestinations.addAll(List.of(new 
ActiveMQTopic("test.composite.topic.>"),
+            new ActiveMQQueue("test.composite.queue.>")));
+        
connector.setDynamicallyIncludedDestinations(dynamicIncludedDestinations);
+        return connector;
+    }
+
+
+    protected BrokerService createRemoteBroker(File dataDir) throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setBrokerName("remoteBroker");
+        brokerService.setUseJmx(false);
+        brokerService.setDataDirectoryFile(dataDir);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.setDestinations(new ActiveMQDestination[] {
+                new ActiveMQTopic(testTopic1),
+                new ActiveMQTopic(testTopic2),
+                new ActiveMQQueue(testQueue1),
+                new ActiveMQQueue(testQueue2)});
+
+        return brokerService;
+    }
+
+    protected void assertCompositeMapCounts(int compositeConsumerIdsSize, int 
compositeSubSize)
+        throws Exception {
+        DurableConduitBridge bridge = findBridge();
+        assertTrue( Wait.waitFor(() -> compositeConsumerIdsSize == 
bridge.compositeConsumerIds.size(), 5000, 500));
+        assertTrue( Wait.waitFor(() -> compositeSubSize == 
bridge.compositeSubscriptions.size(), 5000, 500));
+    }
+
+    protected DurableConduitBridge findBridge() throws Exception {
+        if (flow.equals(FLOW.FORWARD)) {
+            return findBridge(remoteBroker);
+        } else {
+            return findBridge(localBroker);
+        }
+    }
+
+    protected DurableConduitBridge findBridge(BrokerService broker) throws 
Exception {
+        final NetworkBridge bridge;
+        if (broker.getNetworkConnectors().size() > 0) {
+            assertTrue(Wait.waitFor(() -> 
broker.getNetworkConnectors().get(0).activeBridges().size() == 1, 5000, 500));
+            bridge = 
broker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+        } else {
+            bridge = 
findDuplexBridge(broker.getTransportConnectorByScheme("tcp"));
+        }
+        return (DurableConduitBridge)bridge;
+    }
+}
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 7e56bb2112..aa36e26fba 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -75,7 +75,6 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
     private boolean forceDurable = false;
     private boolean useVirtualDestSubs = false;
     private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
-    public static enum FLOW {FORWARD, REVERSE}
 
     private BrokerService broker1;
     private BrokerService broker2;
@@ -139,7 +138,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         assertSubscriptionsCount(broker1, topic, 1);
         assertNCDurableSubsCount(broker2, topic, 1);
 
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
 
         assertSubscriptionsCount(broker1, topic, 0);
         assertNCDurableSubsCount(broker2, topic, 0);
@@ -161,7 +160,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         assertSubscriptionsCount(broker1, topic, 1);
         assertNCDurableSubsCount(broker2, topic, 1);
 
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
 
         assertSubscriptionsCount(broker1, topic, 0);
         assertNCDurableSubsCount(broker2, topic, 0);
@@ -188,7 +187,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         }
 
         assertSubscriptionsCount(broker1, topic, 1);
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         assertSubscriptionsCount(broker1, topic, 0);
         doTearDown();
 
@@ -217,7 +216,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         includedTopics = "different.topic";
         restartBroker(broker1, false);
         assertSubscriptionsCount(broker1, topic, 1);
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         assertSubscriptionsCount(broker1, topic, 0);
 
         //Test that on successful reconnection of the bridge that
@@ -310,7 +309,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
 
         assertSubscriptionsCount(broker1, topic, 1);
         session1.createDurableSubscriber(topic2, "sub2");
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         assertSubscriptionsCount(broker1, topic, 0);
         assertSubscriptionsCount(broker1, topic2, 1);
 
@@ -376,7 +375,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //with bridge off, remove 100 subs
         for (int i = 0; i < 10; i++) {
             for (int j = 0; j < 10; j++) {
-                removeSubscription(broker1, new ActiveMQTopic("include.test." 
+ i), subName + i + j);
+                removeSubscription(broker1, subName + i + j);
             }
         }
 
@@ -481,7 +480,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         restartBroker(broker1, false);
 
         assertSubscriptionsCount(broker1, topic, 1);
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         session1.createDurableSubscriber(topic, "sub2").close();
         assertSubscriptionsCount(broker1, topic, 1);
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index aade6d36dd..2d83fb71b7 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -40,6 +40,7 @@ import 
org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.util.SubscriptionKey;
@@ -50,6 +51,7 @@ import org.junit.rules.TemporaryFolder;
 
 
 public abstract class DynamicNetworkTestSupport {
+    public enum FLOW {FORWARD, REVERSE};
 
     protected Connection localConnection;
     protected Connection remoteConnection;
@@ -92,14 +94,10 @@ public abstract class DynamicNetworkTestSupport {
         }
     }
 
-
     protected void assertBridgeStarted() throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
-            }
-        }, 10000, 500));
+        assertTrue(Wait.waitFor(
+            () -> 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+            10000, 500));
     }
 
     protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final 
ConnectionContext context,
@@ -113,24 +111,16 @@ public abstract class DynamicNetworkTestSupport {
     }
 
     protected void waitForConsumerCount(final DestinationStatistics 
destinationStatistics, final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                //should only be 1 for the composite destination creation
-                return count == 
destinationStatistics.getConsumers().getCount();
-            }
+        assertTrue(Wait.waitFor(() -> {
+            //should only be 1 for the composite destination creation
+            return count == destinationStatistics.getConsumers().getCount();
         }));
     }
 
     protected void waitForDispatchFromLocalBroker(final DestinationStatistics 
destinationStatistics, final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == destinationStatistics.getDequeues().getCount() 
&&
-                       count == 
destinationStatistics.getDispatched().getCount() &&
-                       count == destinationStatistics.getForwards().getCount();
-            }
-        }));
+        assertTrue(Wait.waitFor(() -> count == 
destinationStatistics.getDequeues().getCount() &&
+               count == destinationStatistics.getDispatched().getCount() &&
+               count == destinationStatistics.getForwards().getCount()));
     }
 
     protected void assertLocalBrokerStatistics(final DestinationStatistics 
localStatistics, final int count) {
@@ -145,27 +135,22 @@ public abstract class DynamicNetworkTestSupport {
 
     protected void assertNCDurableSubsCount(final BrokerService brokerService,
             final ActiveMQTopic dest, final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == getNCDurableSubs(brokerService, dest).size();
-            }
-        }, 10000, 500));
+        assertTrue(Wait.waitFor(() -> count == getNCDurableSubs(brokerService, 
dest).size(),
+            10000, 500));
     }
 
     protected void assertConsumersCount(final BrokerService brokerService,
-            final ActiveMQTopic dest, final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == getConsumers(brokerService, dest).size();
-            }
-        }, 10000, 500));
+            final ActiveMQDestination dest, final int count) throws Exception {
+        assertTrue(Wait.waitFor(() -> count == getConsumers(brokerService, 
dest).size(),
+            10000, 500));
+        Thread.sleep(1000);
+        // Check one more time after a short pause to make sure the count 
didn't increase past what we wanted
+        assertEquals(count, getConsumers(brokerService, dest).size());
     }
 
     protected List<Subscription> getConsumers(final BrokerService 
brokerService,
-            final ActiveMQTopic dest) throws Exception {
-        Topic destination = (Topic) brokerService.getDestination(dest);
+            final ActiveMQDestination dest) throws Exception {
+        Destination destination = brokerService.getDestination(dest);
         return destination.getConsumers();
     }
 
@@ -208,8 +193,8 @@ public abstract class DynamicNetworkTestSupport {
         return subs;
     }
 
-    protected void removeSubscription(final BrokerService brokerService, final 
ActiveMQTopic topic,
-            final String subName) throws Exception {
+    protected void removeSubscription(final BrokerService brokerService,
+        final String subName) throws Exception {
         final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
         info.setClientId(clientId);
         info.setSubscriptionName(subName);
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
index 678935bc1a..a93420e32f 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
@@ -51,7 +51,6 @@ public class ForceDurableNetworkBridgeTest extends 
DynamicNetworkTestSupport {
     protected String testTopicName2 = "include.nonforced.bar";
     protected String staticTopic = "include.static.bar";
     protected String staticTopic2 = "include.static.nonforced.bar";
-    public static enum FLOW {FORWARD, REVERSE};
     private BrokerService broker1;
     private BrokerService broker2;
     private Session session1;
@@ -126,7 +125,7 @@ public class ForceDurableNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //Remove the sub
         durSub.close();
         Thread.sleep(1000);
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
 
         //The durable should be gone even though there is a consumer left
         //since we are not forcing durable subs
@@ -186,7 +185,7 @@ public class ForceDurableNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         Thread.sleep(1000);
         assertNCDurableSubsCount(broker2, topic, 1);
 
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         assertNCDurableSubsCount(broker2, topic, 0);
     }
 
@@ -201,7 +200,7 @@ public class ForceDurableNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         Thread.sleep(1000);
         assertNCDurableSubsCount(broker2, topic, 1);
 
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         Thread.sleep(1000);
         assertConsumersCount(broker2, topic, 1);
         assertNCDurableSubsCount(broker2, topic, 1);
@@ -225,7 +224,7 @@ public class ForceDurableNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         assertConsumersCount(broker2, topic, 1);
         assertNCDurableSubsCount(broker2, topic, 1);
 
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         assertConsumersCount(broker2, topic, 0);
         assertNCDurableSubsCount(broker2, topic, 0);
     }


Reply via email to