[AMQ-6643] refine fix to allow wildcard subs to non wildcard subscription 
queues, enable simple wildcard sub to drain all subscription queues

(cherry picked from commit a67c75a9e15c9957aedc0bc8c4aa89952a4c5ea0)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3b6ba778
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3b6ba778
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3b6ba778

Branch: refs/heads/activemq-5.15.x
Commit: 3b6ba778bdfa1b469d16b4b42f4556ffc1180118
Parents: 035baf1
Author: gtully <gary.tu...@gmail.com>
Authored: Tue Aug 8 16:38:11 2017 +0100
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Thu Sep 7 12:13:40 2017 -0400

----------------------------------------------------------------------
 .../region/virtual/MappedQueueFilter.java       |  5 +++--
 .../activemq/network/DrainBridgeTest.java       | 22 +++++++++++++++-----
 2 files changed, 20 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3b6ba778/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
index db02490..490bf7b 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
@@ -48,8 +48,9 @@ public class MappedQueueFilter extends DestinationFilter {
         // recover messages for first consumer only
         boolean noSubs = getConsumers().isEmpty();
 
-        // for virtual consumer wildcard dests, only subscribe to exact match 
to ensure no duplicates
-        if 
(sub.getActiveMQDestination().compareTo(next.getActiveMQDestination()) == 0) {
+        // for virtual consumer wildcard dests, only subscribe to exact match 
or non wildcard dests to ensure no duplicates
+        int match = 
sub.getActiveMQDestination().compareTo(next.getActiveMQDestination());
+        if (match == 0 || (!next.getActiveMQDestination().isPattern() && match 
== 1)) {
             super.addSubscription(context, sub);
         }
         if (noSubs && !getConsumers().isEmpty()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/3b6ba778/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java
index 510b540..c30b44d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java
@@ -22,9 +22,11 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.Wait;
 
 import javax.jms.Connection;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -63,19 +65,21 @@ public class DrainBridgeTest {
 
         System.out.println("Local count: " + 
drainingBroker.getAdminView().getTotalMessageCount() + ", target count:" + 
target.getAdminView().getTotalMessageCount());
 
-        assertEquals("local messages", 20, 
drainingBroker.getAdminView().getTotalMessageCount());
+        assertEquals("local messages", 22, 
drainingBroker.getAdminView().getTotalMessageCount());
         assertEquals("no remote messages", 0, 
target.getAdminView().getTotalMessageCount());
 
         Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
+                System.out.println("Local count: " + 
drainingBroker.getAdminView().getTotalMessageCount() + ", target count:" + 
target.getAdminView().getTotalMessageCount());
+
                 return drainingBroker.getAdminView().getTotalMessageCount() == 
0l;
             }
         });
 
         assertEquals("no local messages", 0, 
drainingBroker.getAdminView().getTotalMessageCount());
-        assertEquals("remote messages", 20, 
target.getAdminView().getTotalMessageCount());
-
+        assertEquals("remote messages", 22, 
target.getAdminView().getTotalMessageCount());
+        assertEquals("number of queues match", 
drainingBroker.getAdminView().getQueues().length, 
target.getAdminView().getQueues().length);
         drainingBroker.stop();
         target.stop();
     }
@@ -99,10 +103,18 @@ public class DrainBridgeTest {
         conn.start();
         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         TextMessage msg = session.createTextMessage("This is a message.");
-        MessageProducer producer = session.createProducer(new 
ActiveMQQueue("Q.Foo,Bar"));
+        MessageProducer producer = session.createProducer(null);
+        ActiveMQQueue queue = new ActiveMQQueue("Q.Foo,Bar");
         for (int i = 0; i < 10; i++) {
-            producer.send(msg);
+            producer.send(queue, msg);
         }
+
+        // add virtual topic consumer Q
+        MessageConsumer messageConsumerA = session.createConsumer(new 
ActiveMQQueue("Consumer.A.VirtualTopic.Y"));
+        MessageConsumer messageConsumeB = session.createConsumer(new 
ActiveMQQueue("Consumer.B.VirtualTopic.Y"));
+
+        producer.send(new ActiveMQTopic("VirtualTopic.Y"), msg);
+
         conn.close();
         broker.stop();
     }

Reply via email to