This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-6.1.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-6.1.x by this push:
     new 182c598df5 AMQ-9689 - Network of Broker durable sync TTL fixes and 
improvements (#1419)
182c598df5 is described below

commit 182c598df544661e0d2e947dafaf1020ca331d6d
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Fri Apr 11 08:14:09 2025 -0400

    AMQ-9689 - Network of Broker durable sync TTL fixes and improvements (#1419)
    
    This commit makes several improvements and fixes for syncing durable
    subscriptions when a network bridge connects.
    
    1) A bug was fixed during durable sync that would cause the clientId
    to not always be included for durables in the subscription list which
    could cause a loop to be created as the other broker would not be able
    to tell where the network subscription came from.
    
    2) During reactivation when dynamicOnly is false and durable sync is set
    to true, we make sure to include the TTL information (full broker path)
    from the online consumer attached to durables so that TTL info is properly
     propagated so we don't incorrectly create demand. Thisonly works if
    consumers are online, so for TTL > 1 it is still recommended to set
    dynamicOnly to true and allow only online consumers drive demand.
    
    3) For TTL 1, we can handle sync correctly on restarts even if durables
    are offline and missing consumer TTL info because we know that we should
    ignore proxy durables (bridge durables for other bridges) entirely because
    they will be > 1 hop away.
    
    4) Some other minor improvements were made like filtering everything if
    TTL is 0 and also consolidating logic.
    
    (cherry picked from commit 953737ca082e1ff67cf0b408e7ec72cd89be75ac)
---
 .../network/DemandForwardingBridgeSupport.java     | 125 ++++++------
 .../activemq/network/DurableConduitBridge.java     |   8 +-
 .../network/NetworkBridgeConfiguration.java        |   1 +
 .../apache/activemq/util/NetworkBridgeUtils.java   | 123 ++++++++----
 .../DurableFiveBrokerNetworkBridgeTest.java        | 220 ++++++++++++++++++++-
 5 files changed, 375 insertions(+), 102 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 57afc85d11..8d16445fb9 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -113,7 +113,7 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, 
BrokerServiceAware {
     private static final Logger LOG = 
LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
-    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
+    protected static final String DURABLE_SUB_PREFIX = 
NetworkBridgeConfiguration.DURABLE_SUB_PREFIX;
     protected final Transport localBroker;
     protected final Transport remoteBroker;
     protected IdGenerator idGenerator = new IdGenerator();
@@ -664,23 +664,8 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         }
     }
 
-    /**
-     * Checks whether or not this consumer is a direct bridge network 
subscription
-     * @param info
-     * @return
-     */
-    protected boolean isDirectBridgeConsumer(ConsumerInfo info) {
-        return (info.getSubscriptionName() != null && 
info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
-                (info.getClientId() == null || 
info.getClientId().startsWith(configuration.getName()));
-    }
-
     protected boolean isProxyBridgeSubscription(String clientId, String 
subName) {
-        if (subName != null && clientId != null) {
-            if (subName.startsWith(DURABLE_SUB_PREFIX) && 
!clientId.startsWith(configuration.getName())) {
-                return true;
-            }
-        }
-        return false;
+        return NetworkBridgeUtils.isProxyBridgeSubscription(configuration, 
clientId, subName);
     }
 
     /**
@@ -750,49 +735,61 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                 } else if (command instanceof BrokerSubscriptionInfo) {
                     final BrokerSubscriptionInfo brokerSubscriptionInfo = 
(BrokerSubscriptionInfo) command;
 
-                    //Start in a new thread so we don't block the transport 
waiting for staticDestinations
-                    syncExecutor.execute(new Runnable() {
-
-                        @Override
-                        public void run() {
-                            try {
-                                staticDestinationsLatch.await();
-                                //Make sure after the countDown of 
staticDestinationsLatch we aren't stopping
-                                if (!disposed.get()) {
-                                    BrokerSubscriptionInfo subInfo = 
brokerSubscriptionInfo;
-                                    LOG.debug("Received Remote 
BrokerSubscriptionInfo on {} from {}",
-                                            brokerService.getBrokerName(), 
subInfo.getBrokerName());
-
-                                    if (configuration.isSyncDurableSubs() && 
configuration.isConduitSubscriptions()
-                                            && !configuration.isDynamicOnly()) 
{
-                                        if (started.get()) {
-                                            if (subInfo.getSubscriptionInfos() 
!= null) {
-                                                for (ConsumerInfo info : 
subInfo.getSubscriptionInfos()) {
-                                                    //re-add any process any 
non-NC consumers that match the
-                                                    
//dynamicallyIncludedDestinations list
-                                                    //Also re-add network 
consumers that are not part of this direct
-                                                    //bridge (proxy of proxy 
bridges)
-                                                    
if((info.getSubscriptionName() == null || !isDirectBridgeConsumer(info)) &&
-                                                            
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, 
info.getDestination())) {
-                                                        
serviceRemoteConsumerAdvisory(info);
-                                                    }
-                                                }
-                                            }
+                    // Skip the durable sync if any of the following are true:
+                    // 1) if the flag is set to false.
+                    // 2) If dynamicOnly is true, this means means to only 
activate when the real
+                    //    consumers come back so we need to skip. This mode is 
useful espeically when
+                    //    setting TTL > 1 as the TTL info is tied to consumers
+                    // 3) If conduit subscriptions is disable we also skip, 
for the same reason we
+                    //    skip when dynamicOnly is true, that we need to let 
consumers entirely drive
+                    //    the creation/removal of subscriptions as each 
consumer gets their own
+                    if (!configuration.isSyncDurableSubs() || 
!configuration.isConduitSubscriptions()
+                        || configuration.isDynamicOnly()) {
+                        return;
+                    }
 
-                                            //After re-added, clean up any 
empty durables
-                                            for (Iterator<DemandSubscription> 
i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
-                                                DemandSubscription ds = 
i.next();
-                                                if 
(NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, 
ds.getLocalInfo().getDestination())) {
-                                                    cleanupDurableSub(ds, i);
-                                                }
-                                            }
+                    //Start in a new thread so we don't block the transport 
waiting for staticDestinations
+                    syncExecutor.execute(() -> {
+                        try {
+                            staticDestinationsLatch.await();
+
+                            //Make sure after the countDown of 
staticDestinationsLatch we aren't stopping
+                            if (!disposed.get() && started.get()) {
+                                final BrokerSubscriptionInfo subInfo = 
brokerSubscriptionInfo;
+                                LOG.debug("Received Remote 
BrokerSubscriptionInfo on {} from {}",
+                                    brokerService.getBrokerName(), 
subInfo.getBrokerName());
+
+                                // Go through and subs sent and see if we can 
add demand
+                                if (subInfo.getSubscriptionInfos() != null) {
+                                    // Re-add and process subscriptions on the 
remote broker to add demand
+                                    for (ConsumerInfo info : 
subInfo.getSubscriptionInfos()) {
+                                        // Brokers filter what is sent, but 
the filtering logic has changed between
+                                        // versions, plus some durables sent 
are only processed for removes so we
+                                        // need to filter what to process for 
adding demand
+                                        if 
(NetworkBridgeUtils.matchesConfigForDurableSync(configuration,
+                                            info.getClientId(), 
info.getSubscriptionName(), info.getDestination())) {
+                                            
serviceRemoteConsumerAdvisory(info);
                                         }
                                     }
                                 }
-                            } catch (Exception e) {
-                                LOG.warn("Error processing 
BrokerSubscriptionInfo: {}", e.getMessage(), e);
-                                LOG.debug(e.getMessage(), e);
+
+                                //After processing demand to add, clean up any 
empty durables
+                                for (Iterator<DemandSubscription> i = 
subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
+                                    DemandSubscription ds = i.next();
+                                    // This filters on destinations to see if 
we should process possible removal
+                                    // based on the bridge configuration 
(included dests, TTL, etc).
+                                    if 
(NetworkBridgeUtils.matchesDestinations(configuration.getDynamicallyIncludedDestinations(),
+                                        ds.getLocalInfo().getDestination())) {
+                                        // Note that this method will further 
check that there are no remote
+                                        // demand that was previously added or 
associated. If there are remote
+                                        // subscriptions tied to the DS, then 
it will not be removed.
+                                        cleanupDurableSub(ds, i);
+                                    }
+                                }
                             }
+                        } catch (Exception e) {
+                            LOG.warn("Error processing BrokerSubscriptionInfo: 
{}", e.getMessage(), e);
+                            LOG.debug(e.getMessage(), e);
                         }
                     });
 
@@ -1427,7 +1424,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         if (dests != null) {
             for (ActiveMQDestination dest : dests) {
                 if (isPermissableDestination(dest)) {
-                    DemandSubscription sub = createDemandSubscription(dest, 
null);
+                    DemandSubscription sub = createDemandSubscription(dest, 
null, null);
                     if (sub != null) {
                         sub.setStaticallyIncluded(true);
                         try {
@@ -1684,7 +1681,8 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         return result;
     }
 
-    final protected DemandSubscription 
createDemandSubscription(ActiveMQDestination destination, final String 
subscriptionName) {
+    final protected DemandSubscription 
createDemandSubscription(ActiveMQDestination destination, final String 
subscriptionName,
+                                                                BrokerId[] 
brokerPath) {
         ConsumerInfo info = new ConsumerInfo();
         info.setNetworkSubscription(true);
         info.setDestination(destination);
@@ -1694,7 +1692,16 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         }
 
         // Indicate that this subscription is being made on behalf of the 
remote broker.
-        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
+        // If we have existing brokerPath info then use it, this is important 
to
+        // preserve TTL information
+        if (brokerPath == null || brokerPath.length == 0) {
+            info.setBrokerPath(new BrokerId[]{remoteBrokerId});
+        } else {
+            info.setBrokerPath(brokerPath);
+            if (!contains(brokerPath, remoteBrokerId)) {
+                addRemoteBrokerToBrokerPath(info);
+            }
+        }
 
         // the remote info held by the DemandSubscription holds the original
         // consumerId, the local info get's overwritten
@@ -1778,7 +1785,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         return filterFactory.create(info, getRemoteBrokerPath(), 
configuration.getMessageTTL(), configuration.getConsumerTTL());
     }
 
-    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws 
IOException {
+    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) {
         info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), 
getRemoteBrokerPath()));
     }
 
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 8d14f7492f..9860bd13e1 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -74,10 +74,14 @@ public class DurableConduitBridge extends ConduitBridge {
 
                             String candidateSubName = getSubscriberName(dest);
                             for (Subscription subscription : 
topicRegion.getDurableSubscriptions().values()) {
-                                String subName = 
subscription.getConsumerInfo().getSubscriptionName();
+                                ConsumerInfo subInfo = 
subscription.getConsumerInfo();
+                                String subName = subInfo.getSubscriptionName();
                                 String clientId = 
subscription.getContext().getClientId();
                                 if (subName != null && 
subName.equals(candidateSubName) && 
clientId.startsWith(configuration.getName())) {
-                                    DemandSubscription sub = 
createDemandSubscription(dest, subName);
+                                    // Include the brokerPath if it exists so 
that we can handle TTL more correctly
+                                    // This only works if the consumers are 
online as offline consumers are missing TTL
+                                    // For TTL > 1 configurations setting 
dynamicOnly to true may make more sense
+                                    DemandSubscription sub = 
createDemandSubscription(dest, subName, subInfo.getBrokerPath());
                                     if (sub != null) {
                                         
sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
                                         sub.setStaticallyIncluded(true);
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 37e88d0c0a..48a0fdfdca 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -28,6 +28,7 @@ import org.apache.activemq.command.ConsumerInfo;
  * Configuration for a NetworkBridge
  */
 public class NetworkBridgeConfiguration {
+    public static final String DURABLE_SUB_PREFIX = "NC-DS_";
 
     private boolean conduitSubscriptions = true;
     /**
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
 
b/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
index 700baf678d..4b39bb2bf7 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
@@ -20,7 +20,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
@@ -36,6 +35,8 @@ import org.apache.activemq.network.NetworkBridgeConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.activemq.network.NetworkBridgeConfiguration.DURABLE_SUB_PREFIX;
+
 public class NetworkBridgeUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NetworkBridgeUtils.class);
@@ -54,31 +55,37 @@ public class NetworkBridgeUtils {
         TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
         Set<ConsumerInfo> subscriptionInfos = new HashSet<>();
 
-        //Add all durable subscriptions to the set that match the network 
config
-        //which currently is just the dynamicallyIncludedDestinations list
-        for (SubscriptionKey key : 
topicRegion.getDurableSubscriptions().keySet()) {
-            DurableTopicSubscription sub = 
topicRegion.getDurableSubscriptions().get(key);
-            if (sub != null && NetworkBridgeUtils.matchesNetworkConfig(config, 
sub.getConsumerInfo().getDestination())) {
+        // Add all durable subscriptions to the set that match the network 
config
+        // which currently is just the dynamicallyIncludedDestinations list
+        for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : 
topicRegion.getDurableSubscriptions().entrySet()) {
+            final SubscriptionKey key = entry.getKey();
+            final DurableTopicSubscription sub = entry.getValue();
+            // We must use the key for matchesConfigForDurableSync() because 
the clientId for the ConsumerInfo object
+            // may be null if the subscription is offline, which is why we 
copy the ConsumerInfo below
+            // and set the clientId to match the key. The correct clientId is 
important for the receiving broker
+            // to do proper filtering when TTL is set
+            if (sub != null && 
NetworkBridgeUtils.matchesConfigForDurableSync(config, key.getClientId(),
+                key.getSubscriptionName(), sub.getActiveMQDestination())) {
                 ConsumerInfo ci = sub.getConsumerInfo().copy();
                 ci.setClientId(key.getClientId());
                 subscriptionInfos.add(ci);
             }
         }
 
-        //We also need to iterate over all normal subscriptions and check if 
they are part of
-        //any dynamicallyIncludedDestination that is configured with 
forceDurable to be true
-        //over the network bridge.  If forceDurable is true then we want to 
add the consumer to the set
+        // We also need to iterate over all normal subscriptions and check if 
they are part of
+        // any dynamicallyIncludedDestination that is configured with 
forceDurable to be true
+        // over the network bridge.  If forceDurable is true then we want to 
add the consumer to the set
         for (Subscription sub : topicRegion.getSubscriptions().values()) {
             if (sub != null && 
NetworkBridgeUtils.isForcedDurable(sub.getConsumerInfo(),
-                    config.getDynamicallyIncludedDestinations())) {
+                config.getDynamicallyIncludedDestinations())) {
                 subscriptionInfos.add(sub.getConsumerInfo().copy());
             }
         }
 
         try {
-            //Lastly, if isUseVirtualDestSubs is configured on this broker (to 
fire advisories) and
-            //configured on the network connector (to listen to advisories) 
then also add any virtual
-            //dest subscription to the set if forceDurable is true for its 
destination
+            // Lastly, if isUseVirtualDestSubs is configured on this broker 
(to fire advisories) and
+            // configured on the network connector (to listen to advisories) 
then also add any virtual
+            // dest subscription to the set if forceDurable is true for its 
destination
             AdvisoryBroker ab = (AdvisoryBroker) 
brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
             if (ab != null && brokerService.isUseVirtualDestSubs() && 
config.isUseVirtualDestSubs()) {
                 for (ConsumerInfo info : 
ab.getVirtualDestinationConsumers().keySet()) {
@@ -97,10 +104,9 @@ public class NetworkBridgeUtils {
     }
 
     public static boolean isForcedDurable(final ConsumerInfo info,
-            final List<ActiveMQDestination> dynamicallyIncludedDestinations) {
-        return dynamicallyIncludedDestinations != null
-                ? isForcedDurable(info,
-                        dynamicallyIncludedDestinations.toArray(new 
ActiveMQDestination[0]), null) : false;
+                                          final List<ActiveMQDestination> 
dynamicallyIncludedDestinations) {
+        return dynamicallyIncludedDestinations != null && isForcedDurable(info,
+            dynamicallyIncludedDestinations.toArray(new 
ActiveMQDestination[0]), null);
     }
 
     public static boolean isForcedDurable(final ConsumerInfo info,
@@ -113,7 +119,7 @@ public class NetworkBridgeUtils {
 
         ActiveMQDestination destination = info.getDestination();
         if (AdvisorySupport.isAdvisoryTopic(destination) || 
destination.isTemporary() ||
-                destination.isQueue()) {
+            destination.isQueue()) {
             return false;
         }
 
@@ -128,44 +134,71 @@ public class NetworkBridgeUtils {
         return false;
     }
 
-    public static boolean matchesNetworkConfig(final 
NetworkBridgeConfiguration config,
-            ActiveMQDestination destination) {
-        List<ActiveMQDestination> includedDests = 
config.getDynamicallyIncludedDestinations();
-        if (includedDests != null && includedDests.size() > 0) {
-            for (ActiveMQDestination dest : includedDests) {
-                DestinationFilter inclusionFilter = 
DestinationFilter.parseFilter(dest);
-                if (dest != null && inclusionFilter.matches(destination) && 
dest.getDestinationType() == destination.getDestinationType()) {
-                    return true;
-                }
-            }
+    /**
+     * This method is used to determine which durable subscriptions should be 
sent from
+     * a broker to a remote broker so that the remote broker can process the 
subscriptions
+     * to re-add demand when the bridge is first started during the durable 
sync phase
+     * of a bridge starting. We can cut down on the amount of durables 
sent/processed
+     * based on how the bridge is configured.
+     *
+     * @param config
+     * @param clientId
+     * @param subscriptionName
+     * @param destination
+     * @return
+     */
+    public static boolean matchesConfigForDurableSync(final 
NetworkBridgeConfiguration config,
+        String clientId, String subscriptionName, ActiveMQDestination 
destination) {
+
+        // If consumerTTL was set to 0 then we return false because no demand 
will be
+        // generated over the bridge as the messages will be limited to the 
local broker
+        if (config.getConsumerTTL() == 0) {
+            return false;
         }
 
-        return false;
+        // If this is a remote demand consumer for the current bridge we can 
also skip
+        // This consumer was created by another consumer on the remote broker, 
so we
+        // ignore this consumer for demand, or else we'd end up with a loop
+        if (isDirectBridgeConsumer(config, clientId, subscriptionName)) {
+            return false;
+        }
+
+        // if TTL is set to 1 then we won't ever handle proxy durables as they
+        // are at least 2 hops away so the TTL check would always fail. Proxy 
durables
+        // are subs for other bridges, so we can skip these as well.
+        if (config.getConsumerTTL() == 1 && isProxyBridgeSubscription(config, 
clientId,
+            subscriptionName)) {
+            return false;
+        }
+
+        // Verify the destination matches the dynamically included destination 
list
+        return 
matchesDestinations(config.getDynamicallyIncludedDestinations(), destination);
     }
 
-    public static boolean matchesDestinations(ActiveMQDestination[] dests, 
final ActiveMQDestination destination) {
-        if (dests != null && dests.length > 0) {
-            for (ActiveMQDestination dest : dests) {
-                DestinationFilter inclusionFilter = 
DestinationFilter.parseFilter(dest);
-                if (dest != null && inclusionFilter.matches(destination) && 
dest.getDestinationType() == destination.getDestinationType()) {
+    public static boolean matchesDestination(ActiveMQDestination destFilter, 
ActiveMQDestination destToMatch) {
+        DestinationFilter inclusionFilter = 
DestinationFilter.parseFilter(destFilter);
+        return inclusionFilter.matches(destToMatch) && 
destFilter.getDestinationType() == destToMatch.getDestinationType();
+    }
+
+    public static boolean matchesDestinations(final List<ActiveMQDestination> 
includedDests, final ActiveMQDestination destination) {
+        if (includedDests != null && !includedDests.isEmpty()) {
+            for (ActiveMQDestination dest : includedDests) {
+                if (matchesDestination(dest, destination)) {
                     return true;
                 }
             }
         }
-
         return false;
     }
 
     public static ActiveMQDestination 
findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination 
destination) {
-        if (dests != null && dests.length > 0) {
+        if (dests != null) {
             for (ActiveMQDestination dest : dests) {
-                DestinationFilter inclusionFilter = 
DestinationFilter.parseFilter(dest);
-                if (dest != null && inclusionFilter.matches(destination) && 
dest.getDestinationType() == destination.getDestinationType()) {
+                if (matchesDestination(dest, destination)) {
                     return dest;
                 }
             }
         }
-
         return null;
     }
 
@@ -181,4 +214,16 @@ public class NetworkBridgeUtils {
 
         return isForceDurable;
     }
+
+    public static boolean isDirectBridgeConsumer(NetworkBridgeConfiguration 
config, String clientId, String subName) {
+        return (subName != null && subName.startsWith(DURABLE_SUB_PREFIX)) &&
+            (clientId == null || clientId.startsWith(config.getName()));
+    }
+
+    public static boolean isProxyBridgeSubscription(NetworkBridgeConfiguration 
config, String clientId, String subName) {
+        if (subName != null && clientId != null) {
+            return subName.startsWith(DURABLE_SUB_PREFIX) && 
!clientId.startsWith(config.getName());
+        }
+        return false;
+    }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index c6b3352771..6d3cc5bc00 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -25,6 +25,7 @@ import jakarta.jms.Connection;
 import jakarta.jms.MessageConsumer;
 import jakarta.jms.Session;
 
+import junit.framework.AssertionFailedError;
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
@@ -49,6 +50,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
 
     @Override
     protected NetworkConnector bridgeBrokers(String localBrokerName, String 
remoteBrokerName) throws Exception {
+        return bridgeBrokers(localBrokerName, remoteBrokerName, false, -1);
+    }
+
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String 
remoteBrokerName,
+                                             boolean dynamicOnly, int 
networkTTL) throws Exception {
         NetworkConnector connector = super.bridgeBrokers(localBrokerName, 
remoteBrokerName);
         ArrayList<ActiveMQDestination> includedDestinations = new 
ArrayList<>();
         includedDestinations.add(new 
ActiveMQTopic("TEST.FOO?forceDurable=true"));
@@ -57,7 +63,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         connector.setDecreaseNetworkConsumerPriority(false);
         connector.setConduitSubscriptions(true);
         connector.setSyncDurableSubs(true);
-        connector.setNetworkTTL(-1);
+        connector.setDynamicOnly(dynamicOnly);
+        connector.setNetworkTTL(networkTTL);
         connector.setClientIdToken("|");
         return connector;
     }
@@ -604,6 +611,203 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 0);
     }
 
+    /*
+     * The following tests should work for correct sync/propagation of 
durables even if
+     * TTL is missing on broker restart. This is for TTL: 0, -1, 1, or 4 (same 
number
+     * of network hops in the network)
+     */
+    public void testDurablePropagationSyncTtl0BrokerRestart() throws Exception 
{
+        testDurablePropagation(0, false, true, List.of(0, 0, 0, 0, 0));
+    }
+
+    public void testDurablePropagationSyncTtl1BrokerRestart() throws Exception 
{
+        testDurablePropagation(1, false, true, List.of(0, 1, 0, 1, 0));
+    }
+
+    public void testDurablePropagationSyncTtl4BrokerRestart() throws Exception 
{
+        testDurablePropagation(4, false, true, List.of(1, 2, 2, 2, 1));
+    }
+
+    // This test also tests the fix in 
NetworkBridgeUtils.getBrokerSubscriptionInfo()
+    // where the clientId was missing for the offline durable which could 
cause demand to be created
+    // by mistake and create a loop. Without the clientId the sync could not 
tell the
+    // network durable was for its direct bridge (was created because of a 
local consumer
+    // on its broker) so a loop could be created on restart if the sync 
happened before the
+    // real consumer connected first.
+    public void testDurablePropagationSyncTtlNotSetBrokerRestart() throws 
Exception {
+        testDurablePropagation(-1, false, true, List.of(1, 2, 2, 2, 1));
+    }
+
+    /*
+     * The following test demonstrates the problem with missing TTL and is 
only solved
+     * by making dynamicOnly true, on restart consumers have yet to come back 
online
+     * for offline durables so we end up missing TTL and create extra demand 
on sync.
+     * TTL is missing on broker restart. This is for TTL > 1 but less than 
same number
+     * of network hops in the network
+     */
+    public void testDurablePropagationDynamicFalseTtl2BrokerRestartFail() 
throws Exception {
+        try {
+            testDurablePropagation(2, false, true, List.of(0, 1, 2, 1, 0));
+            fail("Exepected to fail");
+        } catch (AssertionFailedError e) {
+            // expected
+        }
+    }
+
+    public void testDurablePropagationDynamicFalseTtl3BrokerRestartFail() 
throws Exception {
+        try {
+            testDurablePropagation(2, false, true, List.of(0, 2, 2, 2, 0));
+            fail("Exepected to fail");
+        } catch (AssertionFailedError e) {
+            // expected
+        }
+    }
+
+    /*
+     * The following tests make sure propagation works correctly with TTL set
+     * when dynamicOnly is false or true, and durable sync enabled.
+     * When false, durables get reactivated and a sync is done and TTL can be
+     * missing for offline durables so this works correctly ONLY if the 
consumers
+     * are online and have correct TTL info. When true, consumers drive 
reactivation
+     * entirely and the sync is skipped.
+     *
+     * For dynamicOnly being false, these tests for TTL > 1 demonstrate the 
improvement in
+     * createDemandSubscription() inside of DemandForwardingBridgeSupport
+     * where we now include the full broker path (all TTL info) if the
+     * consumer is already online that created the subscription. Before,
+     * we just included the remote broker.
+     *
+     * If brokers are restarted/consumers offline, TTL can be missing so it's 
recommended
+     * to set dynamicOnly to true if TTL is > 1 (TTL 1 and TTL -1 can still be 
handled)
+     * to prevent propagation of durables that shouldn't exist and let 
consumers drive
+     * the reactivation.
+     *
+     * The tests keep the consumers online and only restart the connectors and 
not
+     * the brokers so the consumer info and TTL are preserved for the tests.
+     */
+    public void testDurablePropagationDynamicFalseTtl0() throws Exception {
+        testDurablePropagation(0, false, List.of(0, 0, 0, 0, 0));
+    }
+
+    public void testDurablePropagationDynamicFalseTtl1() throws Exception {
+        testDurablePropagation(1, false, List.of(0, 1, 0, 1, 0));
+    }
+
+    public void testDurablePropagationDynamicFalseTtl2() throws Exception {
+        testDurablePropagation(2, false, List.of(0, 1, 2, 1, 0));
+    }
+
+    public void testDurablePropagationDynamicFalseTtl3() throws Exception {
+        testDurablePropagation(3, false, List.of(0, 2, 2, 2, 0));
+    }
+
+    public void testDurablePropagationDynamicFalseTtl4() throws Exception {
+        testDurablePropagation(4, false, List.of(1, 2, 2, 2, 1));
+    }
+
+    public void testDurablePropagationDynamicFalseTtlNotSet() throws Exception 
{
+        testDurablePropagation(-1, false, List.of(1, 2, 2, 2, 1));
+    }
+
+    public void testDurablePropagationDynamicTrueTtl0() throws Exception {
+        testDurablePropagation(0, true, List.of(0, 0, 0, 0, 0));
+    }
+
+    public void testDurablePropagationDynamicTrueTtl1() throws Exception {
+        testDurablePropagation(1, true, List.of(0, 1, 0, 1, 0));
+    }
+
+    public void testDurablePropagationDynamicTrueTtl2() throws Exception {
+        testDurablePropagation(2, true, List.of(0, 1, 2, 1, 0));
+    }
+
+    public void testDurablePropagationDynamicTrueTtl3() throws Exception {
+        testDurablePropagation(3, true, List.of(0, 2, 2, 2, 0));
+    }
+
+    public void testDurablePropagationDynamicTrueTtl4() throws Exception {
+        testDurablePropagation(4, true, List.of(1, 2, 2, 2, 1));
+    }
+
+    public void testDurablePropagationDynamicTrueTtlNotSet() throws Exception {
+        testDurablePropagation(-1, true, List.of(1, 2, 2, 2, 1));
+    }
+
+    private void testDurablePropagation(int ttl, boolean dynamicOnly,
+                                        List<Integer> expected) throws 
Exception {
+        testDurablePropagation(ttl, dynamicOnly, false, expected);
+    }
+
+    private void testDurablePropagation(int ttl, boolean dynamicOnly, boolean 
restartBrokers,
+                                         List<Integer> expected) throws 
Exception {
+        duplex = true;
+
+        // Setup broker networks
+        NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B", 
dynamicOnly, ttl);
+        NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C", 
dynamicOnly, ttl);
+        NetworkConnector nc3 = bridgeBrokers("Broker_C_C", "Broker_D_D", 
dynamicOnly, ttl);
+        NetworkConnector nc4 = bridgeBrokers("Broker_D_D", "Broker_E_E", 
dynamicOnly, ttl);
+
+        startAllBrokers();
+        stopNetworkConnectors(nc1, nc2, nc3, nc4);
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+
+        // Setup consumers
+        Session ses = createSession("Broker_A_A");
+        Session ses2 = createSession("Broker_E_E");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
+
+        startNetworkConnectors(nc1, nc2, nc3, nc4);
+        Thread.sleep(1000);
+
+        // Check that the correct network durables exist
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 
expected.get(0));
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 
expected.get(1));
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 
expected.get(2));
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 
expected.get(3));
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 
expected.get(4));
+
+        if (restartBrokers) {
+            // go offline and restart to make sure sync works to re-enable and 
doesn't
+            // propagate wrong demand
+            clientA.close();
+            clientE.close();
+            destroyAllBrokers();
+            setUp();
+            brokers.values().forEach(bi -> 
bi.broker.setDeleteAllMessagesOnStartup(false));
+            bridgeBrokers("Broker_A_A", "Broker_B_B", dynamicOnly, ttl);
+            bridgeBrokers("Broker_B_B", "Broker_C_C", dynamicOnly, ttl);
+            bridgeBrokers("Broker_C_C", "Broker_D_D", dynamicOnly, ttl);
+            bridgeBrokers("Broker_D_D", "Broker_E_E", dynamicOnly, ttl);
+            startAllBrokers();
+        } else {
+            // restart just the network connectors but leave the consumers 
online
+            // to test sync works ok. Things should work for all cases both 
dynamicOnly
+            // false and true because TTL info still exits and consumers are 
online
+            stopNetworkConnectors(nc1, nc2, nc3, nc4);
+            Thread.sleep(1000);
+            startNetworkConnectors(nc1, nc2, nc3, nc4);
+            Thread.sleep(1000);
+        }
+
+        // after restarting the bridges, check sync/demand are correct
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 
expected.get(0));
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 
expected.get(1));
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 
expected.get(2));
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 
expected.get(3));
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 
expected.get(4));
+    }
+
     protected void assertNCDurableSubsCount(final BrokerService brokerService, 
final ActiveMQTopic dest,
             final int count) throws Exception {
         assertTrue(Wait.waitFor(new Condition() {
@@ -628,7 +832,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) 
{
             if 
(key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX))
 {
                 DurableTopicSubscription sub = 
destination.getDurableTopicSubs().get(key);
-                if (sub != null) {
+                if (sub != null && sub.isActive()) {
                     subs.add(sub);
                 }
             }
@@ -657,6 +861,18 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         broker.setDataDirectory("target" + File.separator + "test-data" + 
File.separator + "DurableFiveBrokerNetworkBridgeTest");
     }
 
+    protected void startNetworkConnectors(NetworkConnector... connectors) 
throws Exception {
+        for (NetworkConnector connector : connectors) {
+            connector.start();
+        }
+    }
+
+    protected void stopNetworkConnectors(NetworkConnector... connectors) 
throws Exception {
+        for (NetworkConnector connector : connectors) {
+            connector.stop();
+        }
+    }
+
     protected Session createSession(String broker) throws Exception {
         Connection con = createConnection(broker);
         con.start();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact



Reply via email to