[AMQ-6643] refine fix to allow wildcard subs to non wildcard subscription queues, enable simple wildcard sub to drain all subscription queues
(cherry picked from commit a67c75a9e15c9957aedc0bc8c4aa89952a4c5ea0) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3b6ba778 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3b6ba778 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3b6ba778 Branch: refs/heads/activemq-5.15.x Commit: 3b6ba778bdfa1b469d16b4b42f4556ffc1180118 Parents: 035baf1 Author: gtully <gary.tu...@gmail.com> Authored: Tue Aug 8 16:38:11 2017 +0100 Committer: Timothy Bish <tabish...@gmail.com> Committed: Thu Sep 7 12:13:40 2017 -0400 ---------------------------------------------------------------------- .../region/virtual/MappedQueueFilter.java | 5 +++-- .../activemq/network/DrainBridgeTest.java | 22 +++++++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3b6ba778/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java index db02490..490bf7b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java @@ -48,8 +48,9 @@ public class MappedQueueFilter extends DestinationFilter { // recover messages for first consumer only boolean noSubs = getConsumers().isEmpty(); - // for virtual consumer wildcard dests, only subscribe to exact match to ensure no duplicates - if (sub.getActiveMQDestination().compareTo(next.getActiveMQDestination()) == 0) { + // for virtual consumer wildcard dests, only subscribe to exact match or non wildcard dests to ensure no duplicates + int match = sub.getActiveMQDestination().compareTo(next.getActiveMQDestination()); + if (match == 0 || (!next.getActiveMQDestination().isPattern() && match == 1)) { super.addSubscription(context, sub); } if (noSubs && !getConsumers().isEmpty()) { http://git-wip-us.apache.org/repos/asf/activemq/blob/3b6ba778/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java index 510b540..c30b44d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DrainBridgeTest.java @@ -22,9 +22,11 @@ import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.Wait; import javax.jms.Connection; +import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; @@ -63,19 +65,21 @@ public class DrainBridgeTest { System.out.println("Local count: " + drainingBroker.getAdminView().getTotalMessageCount() + ", target count:" + target.getAdminView().getTotalMessageCount()); - assertEquals("local messages", 20, drainingBroker.getAdminView().getTotalMessageCount()); + assertEquals("local messages", 22, drainingBroker.getAdminView().getTotalMessageCount()); assertEquals("no remote messages", 0, target.getAdminView().getTotalMessageCount()); Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { + System.out.println("Local count: " + drainingBroker.getAdminView().getTotalMessageCount() + ", target count:" + target.getAdminView().getTotalMessageCount()); + return drainingBroker.getAdminView().getTotalMessageCount() == 0l; } }); assertEquals("no local messages", 0, drainingBroker.getAdminView().getTotalMessageCount()); - assertEquals("remote messages", 20, target.getAdminView().getTotalMessageCount()); - + assertEquals("remote messages", 22, target.getAdminView().getTotalMessageCount()); + assertEquals("number of queues match", drainingBroker.getAdminView().getQueues().length, target.getAdminView().getQueues().length); drainingBroker.stop(); target.stop(); } @@ -99,10 +103,18 @@ public class DrainBridgeTest { conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); TextMessage msg = session.createTextMessage("This is a message."); - MessageProducer producer = session.createProducer(new ActiveMQQueue("Q.Foo,Bar")); + MessageProducer producer = session.createProducer(null); + ActiveMQQueue queue = new ActiveMQQueue("Q.Foo,Bar"); for (int i = 0; i < 10; i++) { - producer.send(msg); + producer.send(queue, msg); } + + // add virtual topic consumer Q + MessageConsumer messageConsumerA = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.Y")); + MessageConsumer messageConsumeB = session.createConsumer(new ActiveMQQueue("Consumer.B.VirtualTopic.Y")); + + producer.send(new ActiveMQTopic("VirtualTopic.Y"), msg); + conn.close(); broker.stop(); }