Repository: activemq Updated Branches: refs/heads/master 13c471cc1 -> 002ade79b
https://issues.apache.org/jira/browse/AMQ-5639 - the duplex case needed work. All advisories were being acked async in duplex mode, that code needed to be more selective to forward advisories that dont terminate at the bridge. Fix and test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/002ade79 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/002ade79 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/002ade79 Branch: refs/heads/master Commit: 002ade79b01db228377c24438bae246690f57b7d Parents: 13c471c Author: gtully <[email protected]> Authored: Fri Jun 26 14:54:11 2015 +0100 Committer: gtully <[email protected]> Committed: Fri Jun 26 14:54:29 2015 +0100 ---------------------------------------------------------------------- .../network/DemandForwardingBridgeSupport.java | 5 +- .../activemq/command/NetworkBridgeFilter.java | 4 +- .../usecases/AdvisoryViaNetworkTest.java | 77 ++++++++++++++++++++ 3 files changed, 81 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/002ade79/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 8ba1d98..8e08f95 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -619,8 +619,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType()); if (command.isMessage()) { final ActiveMQMessage message = (ActiveMQMessage) command; - if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) - || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) { + if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { serviceRemoteConsumerAdvisory(message.getDataStructure()); ackAdvisory(message); } else { @@ -989,7 +988,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), message }); - if (isDuplex() && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { + if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { try { // never request b/c they are eventually acked async remoteBroker.oneway(message); http://git-wip-us.apache.org/repos/asf/activemq/blob/002ade79/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java index af0c09e..245c098 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java @@ -97,7 +97,7 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression { } if (message.isAdvisory()) { - if (consumerInfo != null && consumerInfo.isNetworkSubscription() && advisoryIsInterpretedByNetworkBridge(message)) { + if (consumerInfo != null && consumerInfo.isNetworkSubscription() && isAdvisoryInterpretedByNetworkBridge(message)) { // they will be interpreted by the bridge leading to dup commands if (LOG.isTraceEnabled()) { LOG.trace("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message); @@ -124,7 +124,7 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression { return true; } - private boolean advisoryIsInterpretedByNetworkBridge(Message message) { + public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) { return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination()); } http://git-wip-us.apache.org/repos/asf/activemq/blob/002ade79/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java index aa7d6ee..ab61709 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java @@ -17,10 +17,16 @@ package org.apache.activemq.usecases; import java.net.URI; +import java.util.Arrays; import javax.jms.MessageConsumer; import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.virtual.CompositeTopic; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.network.NetworkConnector; @@ -71,6 +77,33 @@ public class AdvisoryViaNetworkTest extends JmsMultipleBrokersTestSupport { } + public void testAdvisoryForwardingDuplexNC() throws Exception { + ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO"); + + createBroker("A"); + createBroker("B"); + NetworkConnector networkBridge = bridgeBrokers("A", "B"); + networkBridge.addStaticallyIncludedDestination(advisoryTopic); + networkBridge.setDuplex(true); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + + + MessageConsumer consumerA = createConsumer("A", advisoryTopic); + MessageConsumer consumerB = createConsumer("B", advisoryTopic); + + this.sendMessages("A", new ActiveMQTopic("FOO"), 1); + + MessageIdList messagesA = getConsumerMessages("A", consumerA); + MessageIdList messagesB = getConsumerMessages("B", consumerB); + + LOG.info("consumerA = " + messagesA); + LOG.info("consumerB = " + messagesB); + + messagesA.assertMessagesReceived(2); + messagesB.assertMessagesReceived(2); + } + public void testBridgeRelevantAdvisoryNotAvailable() throws Exception { ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.FOO"); createBroker("A"); @@ -97,6 +130,50 @@ public class AdvisoryViaNetworkTest extends JmsMultipleBrokersTestSupport { messagesB.assertMessagesReceived(0); } + public void testAdvisoryViaVirtualDest() throws Exception { + ActiveMQQueue advisoryQueue = new ActiveMQQueue("advQ"); + createBroker("A"); + + // convert advisories into advQ that cross the network bridge + CompositeTopic compositeTopic = new CompositeTopic(); + compositeTopic.setName("ActiveMQ.Advisory.Connection"); + compositeTopic.setForwardOnly(false); + compositeTopic.setForwardTo(Arrays.asList(advisoryQueue)); + VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor(); + virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{compositeTopic}); + brokers.get("A").broker.setDestinationInterceptors(new DestinationInterceptor[]{virtualDestinationInterceptor}); + + createBroker("B"); + NetworkConnector networkBridge = bridgeBrokers("A", "B"); + networkBridge.setDuplex(true); + networkBridge.setPrefetchSize(1); // so advisories are acked immediately b/c we check inflight count below + + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 1); + + MessageConsumer consumerB = createConsumer("B", advisoryQueue); + + // to make a connection on A + createConsumer("A", new ActiveMQTopic("FOO")); + + MessageIdList messagesB = getConsumerMessages("B", consumerB); + + messagesB.waitForMessagesToArrive(2); + + assertTrue("deq and inflight as expected", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + RegionBroker regionBroker = (RegionBroker) brokers.get("A").broker.getRegionBroker(); + LOG.info("A Deq:" + regionBroker.getDestinationStatistics().getDequeues().getCount()); + LOG.info("A Inflight:" + regionBroker.getDestinationStatistics().getInflight().getCount()); + return regionBroker.getDestinationStatistics().getDequeues().getCount() > 2 + && regionBroker.getDestinationStatistics().getInflight().getCount() == 0; + } + })); + + } + private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception { final BrokerService broker = brokerItem.broker; final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
