Repository: activemq
Updated Branches:
  refs/heads/master a15626193 -> a67c75a9e


[AMQ-6643] refine fix to allow wildcard subs to non wildcard subscription 
queues, enable simple wildcard sub to drain all subscription queues


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a67c75a9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a67c75a9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a67c75a9

Branch: refs/heads/master
Commit: a67c75a9e15c9957aedc0bc8c4aa89952a4c5ea0
Parents: a156261
Author: gtully <gary.tu...@gmail.com>
Authored: Tue Aug 8 16:38:11 2017 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Tue Aug 8 16:38:11 2017 +0100

----------------------------------------------------------------------
 .../region/virtual/MappedQueueFilter.java       |  5 +++--
 .../activemq/network/DrainBridgeTest.java       | 22 +++++++++++++++-----
 2 files changed, 20 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a67c75a9/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
index db02490..490bf7b 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
@@ -48,8 +48,9 @@ public class MappedQueueFilter extends DestinationFilter {
         // recover messages for first consumer only
         boolean noSubs = getConsumers().isEmpty();
 
-        // for virtual consumer wildcard dests, only subscribe to exact match 
to ensure no duplicates
-        if 
(sub.getActiveMQDestination().compareTo(next.getActiveMQDestination()) == 0) {
+        // for virtual consumer wildcard dests, only subscribe to exact match 
or non wildcard dests to ensure no duplicates
+        int match = 
sub.getActiveMQDestination().compareTo(next.getActiveMQDestination());
+        if (match == 0 || (!next.getActiveMQDestination().isPattern() && match 
== 1)) {
             super.addSubscription(context, sub);
         }
         if (noSubs && !getConsumers().isEmpty()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a67c75a9/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java
index 510b540..c30b44d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java
@@ -22,9 +22,11 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.Wait;
 
 import javax.jms.Connection;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -63,19 +65,21 @@ public class DrainBridgeTest {
 
         System.out.println("Local count: " + 
drainingBroker.getAdminView().getTotalMessageCount() + ", target count:" + 
target.getAdminView().getTotalMessageCount());
 
-        assertEquals("local messages", 20, 
drainingBroker.getAdminView().getTotalMessageCount());
+        assertEquals("local messages", 22, 
drainingBroker.getAdminView().getTotalMessageCount());
         assertEquals("no remote messages", 0, 
target.getAdminView().getTotalMessageCount());
 
         Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
+                System.out.println("Local count: " + 
drainingBroker.getAdminView().getTotalMessageCount() + ", target count:" + 
target.getAdminView().getTotalMessageCount());
+
                 return drainingBroker.getAdminView().getTotalMessageCount() == 
0l;
             }
         });
 
         assertEquals("no local messages", 0, 
drainingBroker.getAdminView().getTotalMessageCount());
-        assertEquals("remote messages", 20, 
target.getAdminView().getTotalMessageCount());
-
+        assertEquals("remote messages", 22, 
target.getAdminView().getTotalMessageCount());
+        assertEquals("number of queues match", 
drainingBroker.getAdminView().getQueues().length, 
target.getAdminView().getQueues().length);
         drainingBroker.stop();
         target.stop();
     }
@@ -99,10 +103,18 @@ public class DrainBridgeTest {
         conn.start();
         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         TextMessage msg = session.createTextMessage("This is a message.");
-        MessageProducer producer = session.createProducer(new 
ActiveMQQueue("Q.Foo,Bar"));
+        MessageProducer producer = session.createProducer(null);
+        ActiveMQQueue queue = new ActiveMQQueue("Q.Foo,Bar");
         for (int i = 0; i < 10; i++) {
-            producer.send(msg);
+            producer.send(queue, msg);
         }
+
+        // add virtual topic consumer Q
+        MessageConsumer messageConsumerA = session.createConsumer(new 
ActiveMQQueue("Consumer.A.VirtualTopic.Y"));
+        MessageConsumer messageConsumeB = session.createConsumer(new 
ActiveMQQueue("Consumer.B.VirtualTopic.Y"));
+
+        producer.send(new ActiveMQTopic("VirtualTopic.Y"), msg);
+
         conn.close();
         broker.stop();
     }

Reply via email to