Updated Branches: refs/heads/trunk f88043eaf -> afded924f
Fix for AMQ-4899 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/afded924 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/afded924 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/afded924 Branch: refs/heads/trunk Commit: afded924ffde28c1f491db761774e14c5e954413 Parents: f88043e Author: Kevin Earls <[email protected]> Authored: Fri Feb 7 09:41:50 2014 +0100 Committer: Kevin Earls <[email protected]> Committed: Fri Feb 7 09:41:50 2014 +0100 ---------------------------------------------------------------------- .../SelectorAwareVirtualTopicInterceptor.java | 18 +- .../plugin/SubQueueSelectorCacheBroker.java | 35 ++-- .../org/apache/activemq/bugs/AMQ4899Test.java | 192 +++++++++++++++++++ 3 files changed, 224 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/afded924/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java index 8d58a43..0c19565 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java @@ -16,10 +16,6 @@ */ package org.apache.activemq.broker.region.virtual; -import java.io.IOException; -import java.util.List; -import java.util.Set; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; @@ -35,6 +31,10 @@ import org.apache.activemq.util.LRUCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.List; +import java.util.Set; + public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor { private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class); LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>(); @@ -70,10 +70,9 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto if (sub.matches(message, msgContext)) { matches = true; break; - } } - if (matches == false && subs.size() == 0) { + if (matches == false) { matches = tryMatchingCachedSubs(broker, dest, msgContext); } return matches; @@ -87,11 +86,14 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker); if (cache != null) { - final String selector = cache.getSelector(dest.getActiveMQDestination().getQualifiedName()); - if (selector != null) { + final Set<String> selectors = cache.getSelector(dest.getActiveMQDestination().getQualifiedName()); + for (String selector : selectors) { try { final BooleanExpression expression = getExpression(selector); matches = expression.matches(msgContext); + if (matches) { + return true; + } } catch (Exception e) { LOG.error(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/activemq/blob/afded924/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java index cfa50f7..af02b54 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java @@ -16,14 +16,6 @@ */ package org.apache.activemq.plugin; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; @@ -32,6 +24,17 @@ import org.apache.activemq.command.ConsumerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + /** * A plugin which allows the caching of the selector from a subscription queue. * <p/> @@ -51,7 +54,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl * The subscription's selector cache. We cache compiled expressions keyed * by the target destination. */ - private ConcurrentHashMap<String, String> subSelectorCache = new ConcurrentHashMap<String, String>(); + private ConcurrentHashMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>(); private final File persistFile; @@ -85,7 +88,8 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - LOG.debug("Caching consumer selector [{}] on a {}", info.getSelector(), info.getDestination().getQualifiedName()); + String destinationName = info.getDestination().getQualifiedName(); + LOG.debug("Caching consumer selector [{}] on a {}", info.getSelector(), destinationName); String selector = info.getSelector(); // As ConcurrentHashMap doesn't support null values, use always true expression @@ -93,7 +97,12 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl selector = "TRUE"; } - subSelectorCache.put(info.getDestination().getQualifiedName(), selector); + Set<String> selectors = subSelectorCache.get(destinationName); + if (selectors == null) { + selectors = Collections.synchronizedSet(new HashSet<String>()); + } + selectors.add(selector); + subSelectorCache.put(destinationName, selectors); return super.addConsumer(context, info); } @@ -105,7 +114,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl try { ObjectInputStream in = new ObjectInputStream(fis); try { - subSelectorCache = (ConcurrentHashMap<String, String>) in.readObject(); + subSelectorCache = (ConcurrentHashMap<String, Set<String>>) in.readObject(); } catch (ClassNotFoundException ex) { LOG.error("Invalid selector cache data found. Please remove file.", ex); } finally { @@ -148,7 +157,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl /** * @return The JMS selector for the specified {@code destination} */ - public String getSelector(final String destination) { + public Set<String> getSelector(final String destination) { return subSelectorCache.get(destination); } http://git-wip-us.apache.org/repos/asf/activemq/blob/afded924/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java new file mode 100644 index 0000000..81140ce --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class AMQ4899Test { + protected static final Logger LOG = LoggerFactory.getLogger(AMQ4899Test.class); + private static final String QUEUE_NAME="AMQ4899TestQueue"; + private static final String CONSUMER_QUEUE="Consumer.Orders.VirtualOrders." + QUEUE_NAME; + private static final String PRODUCER_DESTINATION_NAME = "VirtualOrders." + QUEUE_NAME; + + private static final Integer MESSAGE_LIMIT = 20; + public static final String CONSUMER_A_SELECTOR = "Order < " + 10; + public static String CONSUMER_B_SELECTOR = "Order >= " + 10; + private CountDownLatch consumersStarted = new CountDownLatch(2); + private CountDownLatch consumerAtoConsumeCount= new CountDownLatch(10); + private CountDownLatch consumerBtoConsumeCount = new CountDownLatch(10); + + private BrokerService broker; + + @Before + public void setUp() { + setupBroker("broker://()/localhost?"); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test(timeout = 60 * 1000) + public void testVirtualTopicMultipleSelectors() throws Exception{ + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue consumerQueue = session.createQueue(CONSUMER_QUEUE); + + MessageListener listenerA = new AMQ4899Listener("A", consumersStarted, consumerAtoConsumeCount); + MessageConsumer consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR); + consumerA.setMessageListener(listenerA); + + MessageListener listenerB = new AMQ4899Listener("B", consumersStarted, consumerBtoConsumeCount); + MessageConsumer consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR); + consumerB.setMessageListener(listenerB); + + consumersStarted.await(10, TimeUnit.SECONDS); + assertEquals("Not all consumers started in time", 0, consumersStarted.getCount()); + + Destination producerDestination = session.createTopic(PRODUCER_DESTINATION_NAME); + MessageProducer producer = session.createProducer(producerDestination); + int messageIndex = 0; + for (int i=0; i < MESSAGE_LIMIT; i++) { + if (i==3) { + LOG.debug("Stopping consumerA"); + consumerA.close(); + } + + if (i == 14) { + LOG.debug("Stopping consumer B"); + consumerB.close(); + } + String messageText = "hello " + messageIndex++ + " sent at " + new java.util.Date().toString(); + TextMessage message = session.createTextMessage(messageText); + message.setIntProperty("Order", i); + LOG.debug("Sending message [{}]", messageText); + producer.send(message); + Thread.sleep(100); + } + Thread.sleep(1 * 1000); + + // restart consumerA + LOG.debug("Restarting consumerA"); + consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR); + consumerA.setMessageListener(listenerA); + + // restart consumerB + LOG.debug("restarting consumerB"); + consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR); + consumerB.setMessageListener(listenerB); + + consumerAtoConsumeCount.await(5, TimeUnit.SECONDS); + consumerBtoConsumeCount.await(5, TimeUnit.SECONDS); + + LOG.debug("Unconsumed messages for consumerA {} consumerB {}", consumerAtoConsumeCount.getCount(), consumerBtoConsumeCount.getCount()); + + assertEquals("Consumer A did not consume all messages", 0, consumerAtoConsumeCount.getCount()); + assertEquals("Consumer B did not consume all messages", 0, consumerBtoConsumeCount.getCount()); + + connection.close(); + } + + /** + * Setup broker with VirtualTopic configured + */ + private void setupBroker(String uri) { + try { + broker = BrokerFactory.createBroker(uri); + + VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); + VirtualTopic virtualTopic = new VirtualTopic(); + virtualTopic.setName("VirtualOrders.>"); + virtualTopic.setSelectorAware(true); + VirtualDestination[] virtualDestinations = { virtualTopic }; + interceptor.setVirtualDestinations(virtualDestinations); + broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); + + SubQueueSelectorCacheBrokerPlugin subQueueSelectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin(); + BrokerPlugin[] updatedPlugins = {subQueueSelectorCacheBrokerPlugin}; + broker.setPlugins(updatedPlugins); + + broker.start(); + broker.waitUntilStarted(); + } catch (Exception e) { + LOG.error("Failed creating broker", e); + } + } +} + +class AMQ4899Listener implements MessageListener { + Logger LOG = LoggerFactory.getLogger(AMQ4899Listener.class); + CountDownLatch toConsume; + String id; + + public AMQ4899Listener(String id, CountDownLatch started, CountDownLatch toConsume) { + this.id = id; + this.toConsume = toConsume; + started.countDown(); + } + + @Override + public void onMessage(Message message) { + toConsume.countDown(); + try { + if (message instanceof TextMessage) { + TextMessage textMessage = (TextMessage) message; + LOG.debug("Listener {} received [{}]", id, textMessage.getText()); + } else { + LOG.error("Listener {} Expected a TextMessage, got {}", id, message.getClass().getCanonicalName()); + } + } catch (JMSException e) { + LOG.error("Unexpected JMSException in Listener " + id, e); + } + } +}
