This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 953737ca08 AMQ-9689 - Network of Broker durable sync TTL fixes and
improvements (#1419)
953737ca08 is described below
commit 953737ca082e1ff67cf0b408e7ec72cd89be75ac
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Fri Apr 11 08:14:09 2025 -0400
AMQ-9689 - Network of Broker durable sync TTL fixes and improvements (#1419)
This commit makes several improvements and fixes for syncing durable
subscriptions when a network bridge connects.
1) A bug was fixed during durable sync that would cause the clientId
to not always be included for durables in the subscription list which
could cause a loop to be created as the other broker would not be able
to tell where the network subscription came from.
2) During reactivation when dynamicOnly is false and durable sync is set
to true, we make sure to include the TTL information (full broker path)
from the online consumer attached to durables so that TTL info is properly
propagated so we don't incorrectly create demand. Thisonly works if
consumers are online, so for TTL > 1 it is still recommended to set
dynamicOnly to true and allow only online consumers drive demand.
3) For TTL 1, we can handle sync correctly on restarts even if durables
are offline and missing consumer TTL info because we know that we should
ignore proxy durables (bridge durables for other bridges) entirely because
they will be > 1 hop away.
4) Some other minor improvements were made like filtering everything if
TTL is 0 and also consolidating logic.
---
.../network/DemandForwardingBridgeSupport.java | 125 ++++++------
.../activemq/network/DurableConduitBridge.java | 8 +-
.../network/NetworkBridgeConfiguration.java | 1 +
.../apache/activemq/util/NetworkBridgeUtils.java | 123 ++++++++----
.../DurableFiveBrokerNetworkBridgeTest.java | 220 ++++++++++++++++++++-
5 files changed, 375 insertions(+), 102 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 57afc85d11..8d16445fb9 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -113,7 +113,7 @@ import org.slf4j.LoggerFactory;
*/
public abstract class DemandForwardingBridgeSupport implements NetworkBridge,
BrokerServiceAware {
private static final Logger LOG =
LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
- protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
+ protected static final String DURABLE_SUB_PREFIX =
NetworkBridgeConfiguration.DURABLE_SUB_PREFIX;
protected final Transport localBroker;
protected final Transport remoteBroker;
protected IdGenerator idGenerator = new IdGenerator();
@@ -664,23 +664,8 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
}
}
- /**
- * Checks whether or not this consumer is a direct bridge network
subscription
- * @param info
- * @return
- */
- protected boolean isDirectBridgeConsumer(ConsumerInfo info) {
- return (info.getSubscriptionName() != null &&
info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
- (info.getClientId() == null ||
info.getClientId().startsWith(configuration.getName()));
- }
-
protected boolean isProxyBridgeSubscription(String clientId, String
subName) {
- if (subName != null && clientId != null) {
- if (subName.startsWith(DURABLE_SUB_PREFIX) &&
!clientId.startsWith(configuration.getName())) {
- return true;
- }
- }
- return false;
+ return NetworkBridgeUtils.isProxyBridgeSubscription(configuration,
clientId, subName);
}
/**
@@ -750,49 +735,61 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
} else if (command instanceof BrokerSubscriptionInfo) {
final BrokerSubscriptionInfo brokerSubscriptionInfo =
(BrokerSubscriptionInfo) command;
- //Start in a new thread so we don't block the transport
waiting for staticDestinations
- syncExecutor.execute(new Runnable() {
-
- @Override
- public void run() {
- try {
- staticDestinationsLatch.await();
- //Make sure after the countDown of
staticDestinationsLatch we aren't stopping
- if (!disposed.get()) {
- BrokerSubscriptionInfo subInfo =
brokerSubscriptionInfo;
- LOG.debug("Received Remote
BrokerSubscriptionInfo on {} from {}",
- brokerService.getBrokerName(),
subInfo.getBrokerName());
-
- if (configuration.isSyncDurableSubs() &&
configuration.isConduitSubscriptions()
- && !configuration.isDynamicOnly())
{
- if (started.get()) {
- if (subInfo.getSubscriptionInfos()
!= null) {
- for (ConsumerInfo info :
subInfo.getSubscriptionInfos()) {
- //re-add any process any
non-NC consumers that match the
-
//dynamicallyIncludedDestinations list
- //Also re-add network
consumers that are not part of this direct
- //bridge (proxy of proxy
bridges)
-
if((info.getSubscriptionName() == null || !isDirectBridgeConsumer(info)) &&
-
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations,
info.getDestination())) {
-
serviceRemoteConsumerAdvisory(info);
- }
- }
- }
+ // Skip the durable sync if any of the following are true:
+ // 1) if the flag is set to false.
+ // 2) If dynamicOnly is true, this means means to only
activate when the real
+ // consumers come back so we need to skip. This mode is
useful espeically when
+ // setting TTL > 1 as the TTL info is tied to consumers
+ // 3) If conduit subscriptions is disable we also skip,
for the same reason we
+ // skip when dynamicOnly is true, that we need to let
consumers entirely drive
+ // the creation/removal of subscriptions as each
consumer gets their own
+ if (!configuration.isSyncDurableSubs() ||
!configuration.isConduitSubscriptions()
+ || configuration.isDynamicOnly()) {
+ return;
+ }
- //After re-added, clean up any
empty durables
- for (Iterator<DemandSubscription>
i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
- DemandSubscription ds =
i.next();
- if
(NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations,
ds.getLocalInfo().getDestination())) {
- cleanupDurableSub(ds, i);
- }
- }
+ //Start in a new thread so we don't block the transport
waiting for staticDestinations
+ syncExecutor.execute(() -> {
+ try {
+ staticDestinationsLatch.await();
+
+ //Make sure after the countDown of
staticDestinationsLatch we aren't stopping
+ if (!disposed.get() && started.get()) {
+ final BrokerSubscriptionInfo subInfo =
brokerSubscriptionInfo;
+ LOG.debug("Received Remote
BrokerSubscriptionInfo on {} from {}",
+ brokerService.getBrokerName(),
subInfo.getBrokerName());
+
+ // Go through and subs sent and see if we can
add demand
+ if (subInfo.getSubscriptionInfos() != null) {
+ // Re-add and process subscriptions on the
remote broker to add demand
+ for (ConsumerInfo info :
subInfo.getSubscriptionInfos()) {
+ // Brokers filter what is sent, but
the filtering logic has changed between
+ // versions, plus some durables sent
are only processed for removes so we
+ // need to filter what to process for
adding demand
+ if
(NetworkBridgeUtils.matchesConfigForDurableSync(configuration,
+ info.getClientId(),
info.getSubscriptionName(), info.getDestination())) {
+
serviceRemoteConsumerAdvisory(info);
}
}
}
- } catch (Exception e) {
- LOG.warn("Error processing
BrokerSubscriptionInfo: {}", e.getMessage(), e);
- LOG.debug(e.getMessage(), e);
+
+ //After processing demand to add, clean up any
empty durables
+ for (Iterator<DemandSubscription> i =
subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
+ DemandSubscription ds = i.next();
+ // This filters on destinations to see if
we should process possible removal
+ // based on the bridge configuration
(included dests, TTL, etc).
+ if
(NetworkBridgeUtils.matchesDestinations(configuration.getDynamicallyIncludedDestinations(),
+ ds.getLocalInfo().getDestination())) {
+ // Note that this method will further
check that there are no remote
+ // demand that was previously added or
associated. If there are remote
+ // subscriptions tied to the DS, then
it will not be removed.
+ cleanupDurableSub(ds, i);
+ }
+ }
}
+ } catch (Exception e) {
+ LOG.warn("Error processing BrokerSubscriptionInfo:
{}", e.getMessage(), e);
+ LOG.debug(e.getMessage(), e);
}
});
@@ -1427,7 +1424,7 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
if (dests != null) {
for (ActiveMQDestination dest : dests) {
if (isPermissableDestination(dest)) {
- DemandSubscription sub = createDemandSubscription(dest,
null);
+ DemandSubscription sub = createDemandSubscription(dest,
null, null);
if (sub != null) {
sub.setStaticallyIncluded(true);
try {
@@ -1684,7 +1681,8 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
return result;
}
- final protected DemandSubscription
createDemandSubscription(ActiveMQDestination destination, final String
subscriptionName) {
+ final protected DemandSubscription
createDemandSubscription(ActiveMQDestination destination, final String
subscriptionName,
+ BrokerId[]
brokerPath) {
ConsumerInfo info = new ConsumerInfo();
info.setNetworkSubscription(true);
info.setDestination(destination);
@@ -1694,7 +1692,16 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
}
// Indicate that this subscription is being made on behalf of the
remote broker.
- info.setBrokerPath(new BrokerId[]{remoteBrokerId});
+ // If we have existing brokerPath info then use it, this is important
to
+ // preserve TTL information
+ if (brokerPath == null || brokerPath.length == 0) {
+ info.setBrokerPath(new BrokerId[]{remoteBrokerId});
+ } else {
+ info.setBrokerPath(brokerPath);
+ if (!contains(brokerPath, remoteBrokerId)) {
+ addRemoteBrokerToBrokerPath(info);
+ }
+ }
// the remote info held by the DemandSubscription holds the original
// consumerId, the local info get's overwritten
@@ -1778,7 +1785,7 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
return filterFactory.create(info, getRemoteBrokerPath(),
configuration.getMessageTTL(), configuration.getConsumerTTL());
}
- protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws
IOException {
+ protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) {
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),
getRemoteBrokerPath()));
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 8d14f7492f..9860bd13e1 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -74,10 +74,14 @@ public class DurableConduitBridge extends ConduitBridge {
String candidateSubName = getSubscriberName(dest);
for (Subscription subscription :
topicRegion.getDurableSubscriptions().values()) {
- String subName =
subscription.getConsumerInfo().getSubscriptionName();
+ ConsumerInfo subInfo =
subscription.getConsumerInfo();
+ String subName = subInfo.getSubscriptionName();
String clientId =
subscription.getContext().getClientId();
if (subName != null &&
subName.equals(candidateSubName) &&
clientId.startsWith(configuration.getName())) {
- DemandSubscription sub =
createDemandSubscription(dest, subName);
+ // Include the brokerPath if it exists so
that we can handle TTL more correctly
+ // This only works if the consumers are
online as offline consumers are missing TTL
+ // For TTL > 1 configurations setting
dynamicOnly to true may make more sense
+ DemandSubscription sub =
createDemandSubscription(dest, subName, subInfo.getBrokerPath());
if (sub != null) {
sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
sub.setStaticallyIncluded(true);
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 87ad022606..911fb0bbe4 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -28,6 +28,7 @@ import org.apache.activemq.command.ConsumerInfo;
* Configuration for a NetworkBridge
*/
public class NetworkBridgeConfiguration {
+ public static final String DURABLE_SUB_PREFIX = "NC-DS_";
private boolean conduitSubscriptions = true;
/**
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
b/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
index 700baf678d..4b39bb2bf7 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java
@@ -20,7 +20,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
@@ -36,6 +35,8 @@ import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.activemq.network.NetworkBridgeConfiguration.DURABLE_SUB_PREFIX;
+
public class NetworkBridgeUtils {
private static final Logger LOG =
LoggerFactory.getLogger(NetworkBridgeUtils.class);
@@ -54,31 +55,37 @@ public class NetworkBridgeUtils {
TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
Set<ConsumerInfo> subscriptionInfos = new HashSet<>();
- //Add all durable subscriptions to the set that match the network
config
- //which currently is just the dynamicallyIncludedDestinations list
- for (SubscriptionKey key :
topicRegion.getDurableSubscriptions().keySet()) {
- DurableTopicSubscription sub =
topicRegion.getDurableSubscriptions().get(key);
- if (sub != null && NetworkBridgeUtils.matchesNetworkConfig(config,
sub.getConsumerInfo().getDestination())) {
+ // Add all durable subscriptions to the set that match the network
config
+ // which currently is just the dynamicallyIncludedDestinations list
+ for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry :
topicRegion.getDurableSubscriptions().entrySet()) {
+ final SubscriptionKey key = entry.getKey();
+ final DurableTopicSubscription sub = entry.getValue();
+ // We must use the key for matchesConfigForDurableSync() because
the clientId for the ConsumerInfo object
+ // may be null if the subscription is offline, which is why we
copy the ConsumerInfo below
+ // and set the clientId to match the key. The correct clientId is
important for the receiving broker
+ // to do proper filtering when TTL is set
+ if (sub != null &&
NetworkBridgeUtils.matchesConfigForDurableSync(config, key.getClientId(),
+ key.getSubscriptionName(), sub.getActiveMQDestination())) {
ConsumerInfo ci = sub.getConsumerInfo().copy();
ci.setClientId(key.getClientId());
subscriptionInfos.add(ci);
}
}
- //We also need to iterate over all normal subscriptions and check if
they are part of
- //any dynamicallyIncludedDestination that is configured with
forceDurable to be true
- //over the network bridge. If forceDurable is true then we want to
add the consumer to the set
+ // We also need to iterate over all normal subscriptions and check if
they are part of
+ // any dynamicallyIncludedDestination that is configured with
forceDurable to be true
+ // over the network bridge. If forceDurable is true then we want to
add the consumer to the set
for (Subscription sub : topicRegion.getSubscriptions().values()) {
if (sub != null &&
NetworkBridgeUtils.isForcedDurable(sub.getConsumerInfo(),
- config.getDynamicallyIncludedDestinations())) {
+ config.getDynamicallyIncludedDestinations())) {
subscriptionInfos.add(sub.getConsumerInfo().copy());
}
}
try {
- //Lastly, if isUseVirtualDestSubs is configured on this broker (to
fire advisories) and
- //configured on the network connector (to listen to advisories)
then also add any virtual
- //dest subscription to the set if forceDurable is true for its
destination
+ // Lastly, if isUseVirtualDestSubs is configured on this broker
(to fire advisories) and
+ // configured on the network connector (to listen to advisories)
then also add any virtual
+ // dest subscription to the set if forceDurable is true for its
destination
AdvisoryBroker ab = (AdvisoryBroker)
brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
if (ab != null && brokerService.isUseVirtualDestSubs() &&
config.isUseVirtualDestSubs()) {
for (ConsumerInfo info :
ab.getVirtualDestinationConsumers().keySet()) {
@@ -97,10 +104,9 @@ public class NetworkBridgeUtils {
}
public static boolean isForcedDurable(final ConsumerInfo info,
- final List<ActiveMQDestination> dynamicallyIncludedDestinations) {
- return dynamicallyIncludedDestinations != null
- ? isForcedDurable(info,
- dynamicallyIncludedDestinations.toArray(new
ActiveMQDestination[0]), null) : false;
+ final List<ActiveMQDestination>
dynamicallyIncludedDestinations) {
+ return dynamicallyIncludedDestinations != null && isForcedDurable(info,
+ dynamicallyIncludedDestinations.toArray(new
ActiveMQDestination[0]), null);
}
public static boolean isForcedDurable(final ConsumerInfo info,
@@ -113,7 +119,7 @@ public class NetworkBridgeUtils {
ActiveMQDestination destination = info.getDestination();
if (AdvisorySupport.isAdvisoryTopic(destination) ||
destination.isTemporary() ||
- destination.isQueue()) {
+ destination.isQueue()) {
return false;
}
@@ -128,44 +134,71 @@ public class NetworkBridgeUtils {
return false;
}
- public static boolean matchesNetworkConfig(final
NetworkBridgeConfiguration config,
- ActiveMQDestination destination) {
- List<ActiveMQDestination> includedDests =
config.getDynamicallyIncludedDestinations();
- if (includedDests != null && includedDests.size() > 0) {
- for (ActiveMQDestination dest : includedDests) {
- DestinationFilter inclusionFilter =
DestinationFilter.parseFilter(dest);
- if (dest != null && inclusionFilter.matches(destination) &&
dest.getDestinationType() == destination.getDestinationType()) {
- return true;
- }
- }
+ /**
+ * This method is used to determine which durable subscriptions should be
sent from
+ * a broker to a remote broker so that the remote broker can process the
subscriptions
+ * to re-add demand when the bridge is first started during the durable
sync phase
+ * of a bridge starting. We can cut down on the amount of durables
sent/processed
+ * based on how the bridge is configured.
+ *
+ * @param config
+ * @param clientId
+ * @param subscriptionName
+ * @param destination
+ * @return
+ */
+ public static boolean matchesConfigForDurableSync(final
NetworkBridgeConfiguration config,
+ String clientId, String subscriptionName, ActiveMQDestination
destination) {
+
+ // If consumerTTL was set to 0 then we return false because no demand
will be
+ // generated over the bridge as the messages will be limited to the
local broker
+ if (config.getConsumerTTL() == 0) {
+ return false;
}
- return false;
+ // If this is a remote demand consumer for the current bridge we can
also skip
+ // This consumer was created by another consumer on the remote broker,
so we
+ // ignore this consumer for demand, or else we'd end up with a loop
+ if (isDirectBridgeConsumer(config, clientId, subscriptionName)) {
+ return false;
+ }
+
+ // if TTL is set to 1 then we won't ever handle proxy durables as they
+ // are at least 2 hops away so the TTL check would always fail. Proxy
durables
+ // are subs for other bridges, so we can skip these as well.
+ if (config.getConsumerTTL() == 1 && isProxyBridgeSubscription(config,
clientId,
+ subscriptionName)) {
+ return false;
+ }
+
+ // Verify the destination matches the dynamically included destination
list
+ return
matchesDestinations(config.getDynamicallyIncludedDestinations(), destination);
}
- public static boolean matchesDestinations(ActiveMQDestination[] dests,
final ActiveMQDestination destination) {
- if (dests != null && dests.length > 0) {
- for (ActiveMQDestination dest : dests) {
- DestinationFilter inclusionFilter =
DestinationFilter.parseFilter(dest);
- if (dest != null && inclusionFilter.matches(destination) &&
dest.getDestinationType() == destination.getDestinationType()) {
+ public static boolean matchesDestination(ActiveMQDestination destFilter,
ActiveMQDestination destToMatch) {
+ DestinationFilter inclusionFilter =
DestinationFilter.parseFilter(destFilter);
+ return inclusionFilter.matches(destToMatch) &&
destFilter.getDestinationType() == destToMatch.getDestinationType();
+ }
+
+ public static boolean matchesDestinations(final List<ActiveMQDestination>
includedDests, final ActiveMQDestination destination) {
+ if (includedDests != null && !includedDests.isEmpty()) {
+ for (ActiveMQDestination dest : includedDests) {
+ if (matchesDestination(dest, destination)) {
return true;
}
}
}
-
return false;
}
public static ActiveMQDestination
findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination
destination) {
- if (dests != null && dests.length > 0) {
+ if (dests != null) {
for (ActiveMQDestination dest : dests) {
- DestinationFilter inclusionFilter =
DestinationFilter.parseFilter(dest);
- if (dest != null && inclusionFilter.matches(destination) &&
dest.getDestinationType() == destination.getDestinationType()) {
+ if (matchesDestination(dest, destination)) {
return dest;
}
}
}
-
return null;
}
@@ -181,4 +214,16 @@ public class NetworkBridgeUtils {
return isForceDurable;
}
+
+ public static boolean isDirectBridgeConsumer(NetworkBridgeConfiguration
config, String clientId, String subName) {
+ return (subName != null && subName.startsWith(DURABLE_SUB_PREFIX)) &&
+ (clientId == null || clientId.startsWith(config.getName()));
+ }
+
+ public static boolean isProxyBridgeSubscription(NetworkBridgeConfiguration
config, String clientId, String subName) {
+ if (subName != null && clientId != null) {
+ return subName.startsWith(DURABLE_SUB_PREFIX) &&
!clientId.startsWith(config.getName());
+ }
+ return false;
+ }
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index c6b3352771..6d3cc5bc00 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -25,6 +25,7 @@ import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
+import junit.framework.AssertionFailedError;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
@@ -49,6 +50,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
@Override
protected NetworkConnector bridgeBrokers(String localBrokerName, String
remoteBrokerName) throws Exception {
+ return bridgeBrokers(localBrokerName, remoteBrokerName, false, -1);
+ }
+
+ protected NetworkConnector bridgeBrokers(String localBrokerName, String
remoteBrokerName,
+ boolean dynamicOnly, int
networkTTL) throws Exception {
NetworkConnector connector = super.bridgeBrokers(localBrokerName,
remoteBrokerName);
ArrayList<ActiveMQDestination> includedDestinations = new
ArrayList<>();
includedDestinations.add(new
ActiveMQTopic("TEST.FOO?forceDurable=true"));
@@ -57,7 +63,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
connector.setDecreaseNetworkConsumerPriority(false);
connector.setConduitSubscriptions(true);
connector.setSyncDurableSubs(true);
- connector.setNetworkTTL(-1);
+ connector.setDynamicOnly(dynamicOnly);
+ connector.setNetworkTTL(networkTTL);
connector.setClientIdToken("|");
return connector;
}
@@ -604,6 +611,203 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 0);
}
+ /*
+ * The following tests should work for correct sync/propagation of
durables even if
+ * TTL is missing on broker restart. This is for TTL: 0, -1, 1, or 4 (same
number
+ * of network hops in the network)
+ */
+ public void testDurablePropagationSyncTtl0BrokerRestart() throws Exception
{
+ testDurablePropagation(0, false, true, List.of(0, 0, 0, 0, 0));
+ }
+
+ public void testDurablePropagationSyncTtl1BrokerRestart() throws Exception
{
+ testDurablePropagation(1, false, true, List.of(0, 1, 0, 1, 0));
+ }
+
+ public void testDurablePropagationSyncTtl4BrokerRestart() throws Exception
{
+ testDurablePropagation(4, false, true, List.of(1, 2, 2, 2, 1));
+ }
+
+ // This test also tests the fix in
NetworkBridgeUtils.getBrokerSubscriptionInfo()
+ // where the clientId was missing for the offline durable which could
cause demand to be created
+ // by mistake and create a loop. Without the clientId the sync could not
tell the
+ // network durable was for its direct bridge (was created because of a
local consumer
+ // on its broker) so a loop could be created on restart if the sync
happened before the
+ // real consumer connected first.
+ public void testDurablePropagationSyncTtlNotSetBrokerRestart() throws
Exception {
+ testDurablePropagation(-1, false, true, List.of(1, 2, 2, 2, 1));
+ }
+
+ /*
+ * The following test demonstrates the problem with missing TTL and is
only solved
+ * by making dynamicOnly true, on restart consumers have yet to come back
online
+ * for offline durables so we end up missing TTL and create extra demand
on sync.
+ * TTL is missing on broker restart. This is for TTL > 1 but less than
same number
+ * of network hops in the network
+ */
+ public void testDurablePropagationDynamicFalseTtl2BrokerRestartFail()
throws Exception {
+ try {
+ testDurablePropagation(2, false, true, List.of(0, 1, 2, 1, 0));
+ fail("Exepected to fail");
+ } catch (AssertionFailedError e) {
+ // expected
+ }
+ }
+
+ public void testDurablePropagationDynamicFalseTtl3BrokerRestartFail()
throws Exception {
+ try {
+ testDurablePropagation(2, false, true, List.of(0, 2, 2, 2, 0));
+ fail("Exepected to fail");
+ } catch (AssertionFailedError e) {
+ // expected
+ }
+ }
+
+ /*
+ * The following tests make sure propagation works correctly with TTL set
+ * when dynamicOnly is false or true, and durable sync enabled.
+ * When false, durables get reactivated and a sync is done and TTL can be
+ * missing for offline durables so this works correctly ONLY if the
consumers
+ * are online and have correct TTL info. When true, consumers drive
reactivation
+ * entirely and the sync is skipped.
+ *
+ * For dynamicOnly being false, these tests for TTL > 1 demonstrate the
improvement in
+ * createDemandSubscription() inside of DemandForwardingBridgeSupport
+ * where we now include the full broker path (all TTL info) if the
+ * consumer is already online that created the subscription. Before,
+ * we just included the remote broker.
+ *
+ * If brokers are restarted/consumers offline, TTL can be missing so it's
recommended
+ * to set dynamicOnly to true if TTL is > 1 (TTL 1 and TTL -1 can still be
handled)
+ * to prevent propagation of durables that shouldn't exist and let
consumers drive
+ * the reactivation.
+ *
+ * The tests keep the consumers online and only restart the connectors and
not
+ * the brokers so the consumer info and TTL are preserved for the tests.
+ */
+ public void testDurablePropagationDynamicFalseTtl0() throws Exception {
+ testDurablePropagation(0, false, List.of(0, 0, 0, 0, 0));
+ }
+
+ public void testDurablePropagationDynamicFalseTtl1() throws Exception {
+ testDurablePropagation(1, false, List.of(0, 1, 0, 1, 0));
+ }
+
+ public void testDurablePropagationDynamicFalseTtl2() throws Exception {
+ testDurablePropagation(2, false, List.of(0, 1, 2, 1, 0));
+ }
+
+ public void testDurablePropagationDynamicFalseTtl3() throws Exception {
+ testDurablePropagation(3, false, List.of(0, 2, 2, 2, 0));
+ }
+
+ public void testDurablePropagationDynamicFalseTtl4() throws Exception {
+ testDurablePropagation(4, false, List.of(1, 2, 2, 2, 1));
+ }
+
+ public void testDurablePropagationDynamicFalseTtlNotSet() throws Exception
{
+ testDurablePropagation(-1, false, List.of(1, 2, 2, 2, 1));
+ }
+
+ public void testDurablePropagationDynamicTrueTtl0() throws Exception {
+ testDurablePropagation(0, true, List.of(0, 0, 0, 0, 0));
+ }
+
+ public void testDurablePropagationDynamicTrueTtl1() throws Exception {
+ testDurablePropagation(1, true, List.of(0, 1, 0, 1, 0));
+ }
+
+ public void testDurablePropagationDynamicTrueTtl2() throws Exception {
+ testDurablePropagation(2, true, List.of(0, 1, 2, 1, 0));
+ }
+
+ public void testDurablePropagationDynamicTrueTtl3() throws Exception {
+ testDurablePropagation(3, true, List.of(0, 2, 2, 2, 0));
+ }
+
+ public void testDurablePropagationDynamicTrueTtl4() throws Exception {
+ testDurablePropagation(4, true, List.of(1, 2, 2, 2, 1));
+ }
+
+ public void testDurablePropagationDynamicTrueTtlNotSet() throws Exception {
+ testDurablePropagation(-1, true, List.of(1, 2, 2, 2, 1));
+ }
+
+ private void testDurablePropagation(int ttl, boolean dynamicOnly,
+ List<Integer> expected) throws
Exception {
+ testDurablePropagation(ttl, dynamicOnly, false, expected);
+ }
+
+ private void testDurablePropagation(int ttl, boolean dynamicOnly, boolean
restartBrokers,
+ List<Integer> expected) throws
Exception {
+ duplex = true;
+
+ // Setup broker networks
+ NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B",
dynamicOnly, ttl);
+ NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C",
dynamicOnly, ttl);
+ NetworkConnector nc3 = bridgeBrokers("Broker_C_C", "Broker_D_D",
dynamicOnly, ttl);
+ NetworkConnector nc4 = bridgeBrokers("Broker_D_D", "Broker_E_E",
dynamicOnly, ttl);
+
+ startAllBrokers();
+ stopNetworkConnectors(nc1, nc2, nc3, nc4);
+
+ // Setup destination
+ ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO",
true);
+
+ // Setup consumers
+ Session ses = createSession("Broker_A_A");
+ Session ses2 = createSession("Broker_E_E");
+ MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+ MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+ Thread.sleep(1000);
+
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
+
+ startNetworkConnectors(nc1, nc2, nc3, nc4);
+ Thread.sleep(1000);
+
+ // Check that the correct network durables exist
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest,
expected.get(0));
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest,
expected.get(1));
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest,
expected.get(2));
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest,
expected.get(3));
+ assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest,
expected.get(4));
+
+ if (restartBrokers) {
+ // go offline and restart to make sure sync works to re-enable and
doesn't
+ // propagate wrong demand
+ clientA.close();
+ clientE.close();
+ destroyAllBrokers();
+ setUp();
+ brokers.values().forEach(bi ->
bi.broker.setDeleteAllMessagesOnStartup(false));
+ bridgeBrokers("Broker_A_A", "Broker_B_B", dynamicOnly, ttl);
+ bridgeBrokers("Broker_B_B", "Broker_C_C", dynamicOnly, ttl);
+ bridgeBrokers("Broker_C_C", "Broker_D_D", dynamicOnly, ttl);
+ bridgeBrokers("Broker_D_D", "Broker_E_E", dynamicOnly, ttl);
+ startAllBrokers();
+ } else {
+ // restart just the network connectors but leave the consumers
online
+ // to test sync works ok. Things should work for all cases both
dynamicOnly
+ // false and true because TTL info still exits and consumers are
online
+ stopNetworkConnectors(nc1, nc2, nc3, nc4);
+ Thread.sleep(1000);
+ startNetworkConnectors(nc1, nc2, nc3, nc4);
+ Thread.sleep(1000);
+ }
+
+ // after restarting the bridges, check sync/demand are correct
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest,
expected.get(0));
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest,
expected.get(1));
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest,
expected.get(2));
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest,
expected.get(3));
+ assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest,
expected.get(4));
+ }
+
protected void assertNCDurableSubsCount(final BrokerService brokerService,
final ActiveMQTopic dest,
final int count) throws Exception {
assertTrue(Wait.waitFor(new Condition() {
@@ -628,7 +832,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
for (SubscriptionKey key : destination.getDurableTopicSubs().keySet())
{
if
(key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX))
{
DurableTopicSubscription sub =
destination.getDurableTopicSubs().get(key);
- if (sub != null) {
+ if (sub != null && sub.isActive()) {
subs.add(sub);
}
}
@@ -657,6 +861,18 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
broker.setDataDirectory("target" + File.separator + "test-data" +
File.separator + "DurableFiveBrokerNetworkBridgeTest");
}
+ protected void startNetworkConnectors(NetworkConnector... connectors)
throws Exception {
+ for (NetworkConnector connector : connectors) {
+ connector.start();
+ }
+ }
+
+ protected void stopNetworkConnectors(NetworkConnector... connectors)
throws Exception {
+ for (NetworkConnector connector : connectors) {
+ connector.stop();
+ }
+ }
+
protected Session createSession(String broker) throws Exception {
Connection con = createConnection(broker);
con.start();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact