Repository: activemq
Updated Branches:
  refs/heads/master 2731f04f1 -> 679db08db


[AMQ-6643] ensure a wildcard virtual topic subscriber is restricted to the 
wildcard destination - avoid duplicate and spurious dispatch. fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/679db08d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/679db08d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/679db08d

Branch: refs/heads/master
Commit: 679db08db3dba27475b9e82c20d3dafeb155631f
Parents: 2731f04
Author: gtully <gary.tu...@gmail.com>
Authored: Tue Apr 4 10:16:00 2017 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Tue Apr 4 10:16:48 2017 +0100

----------------------------------------------------------------------
 .../region/virtual/MappedQueueFilter.java       |  54 +++----
 .../activemq/command/ActiveMQDestination.java   |   6 +
 .../mqtt/PahoVirtualTopicMQTTTest.java          |  11 +-
 .../virtual/VirtualTopicWildcardTest.java       | 149 +++++++++++++++++++
 4 files changed, 184 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/679db08d/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
index e8de910..db02490 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
@@ -48,34 +48,34 @@ public class MappedQueueFilter extends DestinationFilter {
         // recover messages for first consumer only
         boolean noSubs = getConsumers().isEmpty();
 
-        if (!sub.getActiveMQDestination().isPattern() || 
sub.getActiveMQDestination().equals(next.getActiveMQDestination())) {
+        // for virtual consumer wildcard dests, only subscribe to exact match 
to ensure no duplicates
+        if 
(sub.getActiveMQDestination().compareTo(next.getActiveMQDestination()) == 0) {
             super.addSubscription(context, sub);
-
-            if (noSubs && !getConsumers().isEmpty()) {
-                // new subscription added, recover retroactive messages
-                final RegionBroker regionBroker = (RegionBroker) 
context.getBroker().getAdaptor(RegionBroker.class);
-                final Set<Destination> virtualDests = 
regionBroker.getDestinations(virtualDestination);
-
-                final ActiveMQDestination newDestination = 
sub.getActiveMQDestination();
-                final BaseDestination regionDest = 
getBaseDestination((Destination) 
regionBroker.getDestinations(newDestination).toArray()[0]);
-
-                for (Destination virtualDest : virtualDests) {
-                    if (virtualDest.getActiveMQDestination().isTopic() &&
-                            (virtualDest.isAlwaysRetroactive() || 
sub.getConsumerInfo().isRetroactive())) {
-
-                        Topic topic = (Topic) getBaseDestination(virtualDest);
-                        if (topic != null) {
-                            // re-use browse() to get recovered messages
-                            final Message[] messages = 
topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination());
-
-                            // add recovered messages to subscription
-                            for (Message message : messages) {
-                                final Message copy = message.copy();
-                                
copy.setOriginalDestination(message.getDestination());
-                                copy.setDestination(newDestination);
-                                copy.setRegionDestination(regionDest);
-                                sub.addRecoveredMessage(context, 
newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
-                            }
+        }
+        if (noSubs && !getConsumers().isEmpty()) {
+            // new subscription added, recover retroactive messages
+            final RegionBroker regionBroker = (RegionBroker) 
context.getBroker().getAdaptor(RegionBroker.class);
+            final Set<Destination> virtualDests = 
regionBroker.getDestinations(virtualDestination);
+
+            final ActiveMQDestination newDestination = 
sub.getActiveMQDestination();
+            final BaseDestination regionDest = 
getBaseDestination((Destination) 
regionBroker.getDestinations(newDestination).toArray()[0]);
+
+            for (Destination virtualDest : virtualDests) {
+                if (virtualDest.getActiveMQDestination().isTopic() &&
+                        (virtualDest.isAlwaysRetroactive() || 
sub.getConsumerInfo().isRetroactive())) {
+
+                    Topic topic = (Topic) getBaseDestination(virtualDest);
+                    if (topic != null) {
+                        // re-use browse() to get recovered messages
+                        final Message[] messages = 
topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination());
+
+                        // add recovered messages to subscription
+                        for (Message message : messages) {
+                            final Message copy = message.copy();
+                            
copy.setOriginalDestination(message.getDestination());
+                            copy.setDestination(newDestination);
+                            copy.setRegionDestination(regionDest);
+                            sub.addRecoveredMessage(context, 
newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/679db08d/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
index 4819a1a..149145d 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
@@ -159,6 +159,12 @@ public abstract class ActiveMQDestination extends 
JNDIBaseStorable implements Da
             return 1;
         } else {
             if (destination.getDestinationType() == 
destination2.getDestinationType()) {
+
+                if (destination.isPattern() && destination2.isPattern() ) {
+                    if 
(destination.getPhysicalName().compareTo(destination2.getPhysicalName()) == 0) {
+                        return 0;
+                    }
+                }
                 if (destination.isPattern()) {
                     DestinationFilter filter = 
DestinationFilter.parseFilter(destination);
                     if (filter.matches(destination2)) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/679db08d/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java
 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java
index 5f58202..55103ac 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.transport.mqtt;
 
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.command.ActiveMQQueue;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.junit.Before;
@@ -72,14 +71,8 @@ public class PahoVirtualTopicMQTTTest extends PahoMQTTTest {
 
         RegionBroker regionBroker = (RegionBroker) 
brokerService.getBroker().getAdaptor(RegionBroker.class);
 
-        String[] queues = new 
String[]{"Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.>",
-                
"Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.client-10.>",
-                "Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.>",
-                
"Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.client-1.>"};
-
-        for (String queueName : queues) {
-            Destination queue = 
regionBroker.getQueueRegion().getDestinations(new 
ActiveMQQueue(queueName)).iterator().next();
-            assertEquals("Queue " + queueName + " have more than one 
consumer", 1, queue.getConsumers().size());
+        for (Destination queue : 
regionBroker.getQueueRegion().getDestinationMap().values()) {
+            assertEquals("Queue " + queue.getActiveMQDestination() + " have 
more than one consumer", 1, queue.getConsumers().size());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/679db08d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java
new file mode 100644
index 0000000..c256e40
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.spring.ConsumerBean;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+// https://issues.apache.org/jira/browse/AMQ-6643
+public class VirtualTopicWildcardTest {
+    
+    private static final Logger LOG = 
LoggerFactory.getLogger(VirtualTopicWildcardTest.class);
+
+    protected int total = 3;
+    protected Connection connection;
+    BrokerService brokerService;
+
+    @Before
+    public void init() throws Exception {
+        brokerService = createBroker();
+        brokerService.start();
+        connection = createConnection();
+        connection.start();
+    }
+
+    @After
+    public void afer() throws Exception {
+        connection.close();
+        brokerService.stop();
+    }
+
+    @Test
+    public void testWildcardAndSimpleConsumerShareMessages() throws Exception {
+
+        ConsumerBean messageList1 = new ConsumerBean("1:");
+        ConsumerBean messageList2 = new ConsumerBean("2:");
+        ConsumerBean messageList3 = new ConsumerBean("3:");
+
+        messageList1.setVerbose(true);
+        messageList2.setVerbose(true);
+        messageList3.setVerbose(true);
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        Destination producerDestination = new 
ActiveMQTopic("VirtualTopic.TEST.A.IT");
+        Destination destination1 = new 
ActiveMQQueue("Consumer.1.VirtualTopic.TEST.>");
+        Destination destination2 = new 
ActiveMQQueue("Consumer.1.VirtualTopic.TEST.A.IT");
+        Destination destination3 = new 
ActiveMQQueue("Consumer.1.VirtualTopic.TEST.B.IT");
+
+        LOG.info("Sending to: " + producerDestination);
+        LOG.info("Consuming from: " + destination1 + " and " + destination2 + 
", and " + destination3);
+
+        MessageConsumer c1 = session.createConsumer(destination1, null);
+        MessageConsumer c2 = session.createConsumer(destination2, null);
+        // this consumer should get no messages
+        MessageConsumer c3 = session.createConsumer(destination3, null);
+
+        c1.setMessageListener(messageList1);
+        c2.setMessageListener(messageList2);
+        c3.setMessageListener(messageList3);
+
+        // create topic producer
+        MessageProducer producer = session.createProducer(producerDestination);
+        assertNotNull(producer);
+
+        for (int i = 0; i < total; i++) {
+            producer.send(createMessage(session, i));
+        }
+
+        assertMessagesArrived(messageList1, messageList2);
+        assertEquals(0, messageList3.getMessages().size());
+
+    }
+
+    private Message createMessage(Session session, int i) throws JMSException {
+        return session.createTextMessage("val=" + i);
+    }
+
+    private Connection createConnection() throws Exception {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
+        cf.setWatchTopicAdvisories(false);
+        return cf.createConnection();
+    }
+
+    protected void assertMessagesArrived(ConsumerBean messageList1, 
ConsumerBean messageList2) {
+        try {
+            assertTrue("expected", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    LOG.info("One: " + messageList1.getMessages().size() + ", 
Two:" + messageList2.getMessages().size());
+                    return messageList1.getMessages().size() + 
messageList2.getMessages().size() == 2 * total;
+                }
+            }));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected BrokerService createBroker() throws Exception {
+
+        BrokerService broker = new BrokerService();
+        broker.setAdvisorySupport(false);
+        broker.setPersistent(false);
+
+        VirtualTopic virtualTopic = new VirtualTopic();
+        VirtualDestinationInterceptor interceptor = new 
VirtualDestinationInterceptor();
+        interceptor.setVirtualDestinations(new 
VirtualDestination[]{virtualTopic});
+        broker.setDestinationInterceptors(new 
DestinationInterceptor[]{interceptor});
+        return broker;
+    }
+}

Reply via email to