Repository: activemq
Updated Branches:
  refs/heads/master 13c471cc1 -> 002ade79b


https://issues.apache.org/jira/browse/AMQ-5639 - the duplex case needed work. 
All advisories were being acked async in duplex mode, that code needed to be 
more selective to forward advisories that dont terminate at the bridge. Fix and 
test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/002ade79
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/002ade79
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/002ade79

Branch: refs/heads/master
Commit: 002ade79b01db228377c24438bae246690f57b7d
Parents: 13c471c
Author: gtully <[email protected]>
Authored: Fri Jun 26 14:54:11 2015 +0100
Committer: gtully <[email protected]>
Committed: Fri Jun 26 14:54:29 2015 +0100

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |  5 +-
 .../activemq/command/NetworkBridgeFilter.java   |  4 +-
 .../usecases/AdvisoryViaNetworkTest.java        | 77 ++++++++++++++++++++
 3 files changed, 81 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/002ade79/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 8ba1d98..8e08f95 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -619,8 +619,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                         LOG.trace("{} duplex command type: {}", 
configuration.getBrokerName(), command.getDataStructureType());
                         if (command.isMessage()) {
                             final ActiveMQMessage message = (ActiveMQMessage) 
command;
-                            if 
(AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
-                                    || 
AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
+                            if 
(NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
                                 
serviceRemoteConsumerAdvisory(message.getDataStructure());
                                 ackAdvisory(message);
                             } else {
@@ -989,7 +988,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                                 configuration.getBrokerName(), 
remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), 
md.getConsumerId(), message.getDestination(), 
Arrays.toString(message.getBrokerPath()), message
                         });
 
-                        if (isDuplex() && 
AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
+                        if (isDuplex() && 
NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
                             try {
                                 // never request b/c they are eventually acked 
async
                                 remoteBroker.oneway(message);

http://git-wip-us.apache.org/repos/asf/activemq/blob/002ade79/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
index af0c09e..245c098 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
@@ -97,7 +97,7 @@ public class NetworkBridgeFilter implements DataStructure, 
BooleanExpression {
         }
 
         if (message.isAdvisory()) {
-            if (consumerInfo != null && consumerInfo.isNetworkSubscription() 
&& advisoryIsInterpretedByNetworkBridge(message)) {
+            if (consumerInfo != null && consumerInfo.isNetworkSubscription() 
&& isAdvisoryInterpretedByNetworkBridge(message)) {
                 // they will be interpreted by the bridge leading to dup 
commands
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("not propagating advisory to network sub: " + 
consumerInfo.getConsumerId() + ", message: "+ message);
@@ -124,7 +124,7 @@ public class NetworkBridgeFilter implements DataStructure, 
BooleanExpression {
         return true;
     }
 
-    private boolean advisoryIsInterpretedByNetworkBridge(Message message) {
+    public static boolean isAdvisoryInterpretedByNetworkBridge(Message 
message) {
         return 
AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || 
AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination());
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/002ade79/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
index aa7d6ee..ab61709 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
@@ -17,10 +17,16 @@
 package org.apache.activemq.usecases;
 
 import java.net.URI;
+import java.util.Arrays;
 import javax.jms.MessageConsumer;
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.virtual.CompositeTopic;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.network.NetworkConnector;
@@ -71,6 +77,33 @@ public class AdvisoryViaNetworkTest extends 
JmsMultipleBrokersTestSupport {
     }
 
 
+    public void testAdvisoryForwardingDuplexNC() throws Exception {
+        ActiveMQTopic advisoryTopic = new 
ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO");
+
+        createBroker("A");
+        createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.setDuplex(true);
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+
+        MessageConsumer consumerA = createConsumer("A", advisoryTopic);
+        MessageConsumer consumerB = createConsumer("B", advisoryTopic);
+
+        this.sendMessages("A", new ActiveMQTopic("FOO"), 1);
+
+        MessageIdList messagesA = getConsumerMessages("A", consumerA);
+        MessageIdList messagesB = getConsumerMessages("B", consumerB);
+
+        LOG.info("consumerA = " + messagesA);
+        LOG.info("consumerB = " + messagesB);
+
+        messagesA.assertMessagesReceived(2);
+        messagesB.assertMessagesReceived(2);
+    }
+
     public void testBridgeRelevantAdvisoryNotAvailable() throws Exception {
         ActiveMQTopic advisoryTopic = new 
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.FOO");
         createBroker("A");
@@ -97,6 +130,50 @@ public class AdvisoryViaNetworkTest extends 
JmsMultipleBrokersTestSupport {
         messagesB.assertMessagesReceived(0);
     }
 
+    public void testAdvisoryViaVirtualDest() throws Exception {
+        ActiveMQQueue advisoryQueue = new ActiveMQQueue("advQ");
+        createBroker("A");
+
+        // convert advisories into advQ that cross the network bridge
+        CompositeTopic compositeTopic = new CompositeTopic();
+        compositeTopic.setName("ActiveMQ.Advisory.Connection");
+        compositeTopic.setForwardOnly(false);
+        compositeTopic.setForwardTo(Arrays.asList(advisoryQueue));
+        VirtualDestinationInterceptor virtualDestinationInterceptor = new 
VirtualDestinationInterceptor();
+        virtualDestinationInterceptor.setVirtualDestinations(new 
VirtualDestination[]{compositeTopic});
+        brokers.get("A").broker.setDestinationInterceptors(new 
DestinationInterceptor[]{virtualDestinationInterceptor});
+
+        createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.setDuplex(true);
+        networkBridge.setPrefetchSize(1); // so advisories are acked 
immediately b/c we check inflight count below
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+        verifyPeerBrokerInfo(brokers.get("B"), 1);
+
+        MessageConsumer consumerB = createConsumer("B", advisoryQueue);
+
+        // to make a connection on A
+        createConsumer("A", new ActiveMQTopic("FOO"));
+
+        MessageIdList messagesB = getConsumerMessages("B", consumerB);
+
+        messagesB.waitForMessagesToArrive(2);
+
+        assertTrue("deq and inflight as expected", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                RegionBroker regionBroker = (RegionBroker) 
brokers.get("A").broker.getRegionBroker();
+                LOG.info("A Deq:" + 
regionBroker.getDestinationStatistics().getDequeues().getCount());
+                LOG.info("A Inflight:" + 
regionBroker.getDestinationStatistics().getInflight().getCount());
+                return 
regionBroker.getDestinationStatistics().getDequeues().getCount() > 2
+                        && 
regionBroker.getDestinationStatistics().getInflight().getCount() == 0;
+            }
+        }));
+
+    }
+
     private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) 
throws Exception {
         final BrokerService broker = brokerItem.broker;
         final RegionBroker regionBroker = (RegionBroker) 
broker.getRegionBroker();

Reply via email to