Repository: activemq Updated Branches: refs/heads/master e16d05436 -> 61da1faa4
https://issues.apache.org/jira/browse/AMQ-5672 Added an option for allowing only a single selector for the virtual destination selector cache. also added some JMX views into the selector cache that can be used at runtime. includes unit tests Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/61da1faa Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/61da1faa Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/61da1faa Branch: refs/heads/master Commit: 61da1faa4c4e49191b373049ee36434aaad58897 Parents: e16d054 Author: Christian Posta <[email protected]> Authored: Thu Mar 12 18:14:09 2015 -0700 Committer: Christian Posta <[email protected]> Committed: Mon Apr 6 16:12:20 2015 -0700 ---------------------------------------------------------------------- .../activemq/broker/jmx/BrokerMBeanSupport.java | 7 + .../VirtualDestinationSelectorCacheView.java | 49 ++ ...irtualDestinationSelectorCacheViewMBean.java | 36 + .../network/DemandForwardingBridgeSupport.java | 2 +- .../plugin/SubQueueSelectorCacheBroker.java | 157 ++++- .../SubQueueSelectorCacheBrokerPlugin.java | 35 +- .../apache/activemq/util/ProducerThread.java | 68 +- .../activemq/JmsMultipleBrokersTestSupport.java | 21 + ...VirtualTopicSelectorAwareForwardingTest.java | 693 +++++++++++++++++++ .../org/apache/activemq/util/MessageIdList.java | 5 +- 10 files changed, 1042 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java index e7d888d..43254c5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java @@ -161,6 +161,13 @@ public class BrokerMBeanSupport { return createNetworkConnectorName(brokerObjectName.toString(), type, name); } + public static ObjectName createVirtualDestinationSelectorCacheName(ObjectName brokerObjectName, String type, String name) throws MalformedObjectNameException { + String objectNameStr = brokerObjectName.toString(); + objectNameStr += ",service=" + type + ",virtualDestinationSelectoCache="+ JMXSupport.encodeObjectNamePart(name); + ObjectName objectName = new ObjectName(objectNameStr); + return objectName; + } + public static ObjectName createNetworkConnectorName(String brokerObjectName, String type, String name) throws MalformedObjectNameException { String objectNameStr = brokerObjectName; objectNameStr += ",connector=" + type + ",networkConnectorName="+ JMXSupport.encodeObjectNamePart(name); http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheView.java new file mode 100644 index 0000000..6fbb33e --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheView.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import org.apache.activemq.plugin.SubQueueSelectorCacheBroker; + +import java.util.Set; + +/** + * Created by ceposta + * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>. + */ +public class VirtualDestinationSelectorCacheView implements VirtualDestinationSelectorCacheViewMBean { + + private final SubQueueSelectorCacheBroker selectorCacheBroker; + + public VirtualDestinationSelectorCacheView(SubQueueSelectorCacheBroker selectorCacheBroker) { + this.selectorCacheBroker = selectorCacheBroker; + } + + @Override + public Set<String> selectorsForDestination(String destinationName) { + return selectorCacheBroker.getSelectorsForDestination(destinationName); + } + + @Override + public boolean deleteSelectorForDestination(String destinationName, String selector) { + return selectorCacheBroker.deleteSelectorForDestination(destinationName, selector); + } + + @Override + public boolean deleteAllSelectorsForDestination(String destinationName) { + return selectorCacheBroker.deleteAllSelectorsForDestination(destinationName); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheViewMBean.java new file mode 100644 index 0000000..7490d13 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheViewMBean.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import java.util.Set; + +/** + * Created by ceposta + * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>. + */ +public interface VirtualDestinationSelectorCacheViewMBean { + + @MBeanInfo("Dump raw cache of selectors organized by destination") + public Set<String> selectorsForDestination(String destinationName); + + @MBeanInfo("Delete a selector for a destination. Selector must match what returns from selectorsForDestination operation") + public boolean deleteSelectorForDestination(String destinationName, String selector); + + @MBeanInfo("Dump raw cache of selectors organized by destination") + public boolean deleteAllSelectorsForDestination(String destinationName); + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 83eea31..bbf11f0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -981,7 +981,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } Message message = configureMessage(md); - LOG.debug("bridging ({} -> {}), consumer: {}, destinaition: {}, brokerPath: {}, message: {}", new Object[]{ + LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{ configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), message }); http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java index af02b54..c6a788f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java @@ -16,14 +16,21 @@ */ package org.apache.activemq.plugin; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.jmx.AnnotatedMBean; +import org.apache.activemq.broker.jmx.BrokerMBeanSupport; +import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheView; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ConsumerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.JMException; +import javax.management.ObjectName; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -49,6 +56,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class); + public static final String MATCH_EVERYTHING = "TRUE"; /** * The subscription's selector cache. We cache compiled expressions keyed @@ -57,10 +65,14 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl private ConcurrentHashMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>(); private final File persistFile; + private boolean singleSelectorPerDestination = false; + private boolean ignoreWildcardSelectors = false; + private ObjectName objectName; private boolean running = true; private Thread persistThread; - private static final long MAX_PERSIST_INTERVAL = 600000; + private long persistInterval = MAX_PERSIST_INTERVAL; + public static final long MAX_PERSIST_INTERVAL = 600000; private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread"; /** @@ -75,6 +87,22 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME); persistThread.start(); + enableJmx(); + } + + private void enableJmx() { + BrokerService broker = getBrokerService(); + if (broker.isUseJmx()) { + VirtualDestinationSelectorCacheView view = new VirtualDestinationSelectorCacheView(this); + try { + objectName = BrokerMBeanSupport.createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache"); + LOG.trace("virtualDestinationCacheSelector mbean name; " + objectName.toString()); + AnnotatedMBean.registerMBean(broker.getManagementContext(), view, objectName); + } catch (Exception e) { + LOG.warn("JMX is enabled, but when installing the VirtualDestinationSelectorCache, couldn't install the JMX mbeans. Continuing without installing the mbeans."); + } + + } } @Override @@ -84,29 +112,79 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl persistThread.interrupt(); persistThread.join(); } //if + unregisterMBeans(); + } + + private void unregisterMBeans() { + BrokerService broker = getBrokerService(); + if (broker.isUseJmx() && this.objectName != null) { + try { + broker.getManagementContext().unregisterMBean(objectName); + } catch (JMException e) { + LOG.warn("Trying uninstall VirtualDestinationSelectorCache; couldn't uninstall mbeans, continuting..."); + } + } } @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - String destinationName = info.getDestination().getQualifiedName(); - LOG.debug("Caching consumer selector [{}] on a {}", info.getSelector(), destinationName); - String selector = info.getSelector(); + // don't track selectors for advisory topics + if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { + String destinationName = info.getDestination().getQualifiedName(); + LOG.debug("Caching consumer selector [{}] on '{}'", info.getSelector(), destinationName); - // As ConcurrentHashMap doesn't support null values, use always true expression - if (selector == null) { - selector = "TRUE"; - } + String selector = info.getSelector() == null ? MATCH_EVERYTHING : info.getSelector(); - Set<String> selectors = subSelectorCache.get(destinationName); - if (selectors == null) { - selectors = Collections.synchronizedSet(new HashSet<String>()); - } - selectors.add(selector); - subSelectorCache.put(destinationName, selectors); + if (!(ignoreWildcardSelectors && hasWildcards(selector))) { + + Set<String> selectors = subSelectorCache.get(destinationName); + if (selectors == null) { + selectors = Collections.synchronizedSet(new HashSet<String>()); + } else if (singleSelectorPerDestination && !MATCH_EVERYTHING.equals(selector)) { + // in this case, we allow only ONE selector. But we don't count the catch-all "null/TRUE" selector + // here, we always allow that one. But only one true selector. + boolean containsMatchEverything = selectors.contains(MATCH_EVERYTHING); + selectors.clear(); + // put back the MATCH_EVERYTHING selector + if (containsMatchEverything) { + selectors.add(MATCH_EVERYTHING); + } + } + + LOG.debug("adding new selector: into cache " + selector); + selectors.add(selector); + LOG.debug("current selectors in cache: " + selectors); + subSelectorCache.put(destinationName, selectors); + } + + + } return super.addConsumer(context, info); } + // trivial check for SQL92/selector wildcards + private boolean hasWildcards(String selector) { + return selector.contains("%") || selector.contains("_"); + } + + @Override + public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { + + if (singleSelectorPerDestination) { + String destinationName = info.getDestination().getQualifiedName(); + Set<String> selectors = subSelectorCache.get(destinationName); + if (info.getSelector() == null && selectors.size() > 1) { + boolean removed = selectors.remove(MATCH_EVERYTHING); + LOG.debug("A non-selector consumer has dropped. Removing the catchall matching pattern 'TRUE'. Successful? " + removed); + } + } + + } + super.removeConsumer(context, info); + } + private void readCache() { if (persistFile != null && persistFile.exists()) { try { @@ -169,12 +247,61 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl public void run() { while (running) { try { - Thread.sleep(MAX_PERSIST_INTERVAL); + Thread.sleep(persistInterval); } catch (InterruptedException ex) { } //try persistCache(); } } + + public boolean isSingleSelectorPerDestination() { + return singleSelectorPerDestination; + } + + public void setSingleSelectorPerDestination(boolean singleSelectorPerDestination) { + this.singleSelectorPerDestination = singleSelectorPerDestination; + } + + public Set<String> getSelectorsForDestination(String destinationName) { + if (subSelectorCache.containsKey(destinationName)) { + return new HashSet<String>(subSelectorCache.get(destinationName)); + } + + return Collections.EMPTY_SET; + } + + public long getPersistInterval() { + return persistInterval; + } + + public void setPersistInterval(long persistInterval) { + this.persistInterval = persistInterval; + } + + public boolean deleteSelectorForDestination(String destinationName, String selector) { + if (subSelectorCache.containsKey(destinationName)) { + Set<String> cachedSelectors = subSelectorCache.get(destinationName); + return cachedSelectors.remove(selector); + } + + return false; + } + + public boolean deleteAllSelectorsForDestination(String destinationName) { + if (subSelectorCache.containsKey(destinationName)) { + Set<String> cachedSelectors = subSelectorCache.get(destinationName); + cachedSelectors.clear(); + } + return true; + } + + public boolean isIgnoreWildcardSelectors() { + return ignoreWildcardSelectors; + } + + public void setIgnoreWildcardSelectors(boolean ignoreWildcardSelectors) { + this.ignoreWildcardSelectors = ignoreWildcardSelectors; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java index 01f5e90..72be2cd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java +++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java @@ -21,6 +21,8 @@ import java.io.File; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerPlugin; +import static org.apache.activemq.plugin.SubQueueSelectorCacheBroker.MAX_PERSIST_INTERVAL; + /** * A plugin which allows the caching of the selector from a subscription queue. * <p/> @@ -36,10 +38,17 @@ public class SubQueueSelectorCacheBrokerPlugin implements BrokerPlugin { private File persistFile; + private boolean singleSelectorPerDestination = false; + private boolean ignoreWildcardSelectors = false; + private long persistInterval = MAX_PERSIST_INTERVAL; @Override public Broker installPlugin(Broker broker) throws Exception { - return new SubQueueSelectorCacheBroker(broker, persistFile); + SubQueueSelectorCacheBroker rc = new SubQueueSelectorCacheBroker(broker, persistFile); + rc.setSingleSelectorPerDestination(singleSelectorPerDestination); + rc.setPersistInterval(persistInterval); + rc.setIgnoreWildcardSelectors(ignoreWildcardSelectors); + return rc; } /** @@ -52,4 +61,28 @@ public class SubQueueSelectorCacheBrokerPlugin implements BrokerPlugin { public File getPersistFile() { return persistFile; } + + public boolean isSingleSelectorPerDestination() { + return singleSelectorPerDestination; + } + + public void setSingleSelectorPerDestination(boolean singleSelectorPerDestination) { + this.singleSelectorPerDestination = singleSelectorPerDestination; + } + + public long getPersistInterval() { + return persistInterval; + } + + public void setPersistInterval(long persistInterval) { + this.persistInterval = persistInterval; + } + + public boolean isIgnoreWildcardSelectors() { + return ignoreWildcardSelectors; + } + + public void setIgnoreWildcardSelectors(boolean ignoreWildcardSelectors) { + this.ignoreWildcardSelectors = ignoreWildcardSelectors; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java index ffaa735..00422e9 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java @@ -23,12 +23,14 @@ import javax.jms.*; import java.io.*; import java.net.URL; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; public class ProducerThread extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class); int messageCount = 1000; + boolean runIndefinitely = false; Destination destination; protected Session session; int sleep = 0; @@ -40,13 +42,14 @@ public class ProducerThread extends Thread { int transactionBatchSize; int transactions = 0; - int sentCount = 0; + AtomicInteger sentCount = new AtomicInteger(0); String message; String messageText = null; String payloadUrl = null; byte[] payload = null; boolean running = false; CountDownLatch finished; + CountDownLatch paused = new CountDownLatch(0); public ProducerThread(Session session, Destination destination) { @@ -67,18 +70,20 @@ public class ProducerThread extends Thread { LOG.info(threadName + " Started to calculate elapsed time ...\n"); long tStart = System.currentTimeMillis(); - for (sentCount = 0; sentCount < messageCount && running; sentCount++) { - Message message = createMessage(sentCount); - producer.send(message); - LOG.info(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID())); - - if (transactionBatchSize > 0 && sentCount > 0 && sentCount % transactionBatchSize == 0) { - LOG.info(threadName + " Committing transaction: " + transactions++); - session.commit(); + if (runIndefinitely) { + while (running) { + synchronized (this) { + paused.await(); + } + sendMessage(producer, threadName); + sentCount.incrementAndGet(); } - - if (sleep > 0) { - Thread.sleep(sleep); + }else{ + for (sentCount.set(0); sentCount.get() < messageCount && running; sentCount.incrementAndGet()) { + synchronized (this) { + paused.await(); + } + sendMessage(producer, threadName); } } @@ -104,6 +109,23 @@ public class ProducerThread extends Thread { } } + private void sendMessage(MessageProducer producer, String threadName) throws Exception { + Message message = createMessage(sentCount.get()); + producer.send(message); + if (LOG.isDebugEnabled()) { + LOG.debug(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID())); + } + + if (transactionBatchSize > 0 && sentCount.get() > 0 && sentCount.get() % transactionBatchSize == 0) { + LOG.info(threadName + " Committing transaction: " + transactions++); + session.commit(); + } + + if (sleep > 0) { + Thread.sleep(sleep); + } + } + private void initPayLoad() { if (messageSize > 0) { payload = new byte[messageSize]; @@ -182,7 +204,7 @@ public class ProducerThread extends Thread { } public int getSentCount() { - return sentCount; + return sentCount.get(); } public boolean isPersistent() { @@ -264,4 +286,24 @@ public class ProducerThread extends Thread { public void setMessage(String message) { this.message = message; } + + public boolean isRunIndefinitely() { + return runIndefinitely; + } + + public void setRunIndefinitely(boolean runIndefinitely) { + this.runIndefinitely = runIndefinitely; + } + + public synchronized void pauseProducer(){ + this.paused = new CountDownLatch(1); + } + + public synchronized void resumeProducer(){ + this.paused.countDown(); + } + + public void resetCounters(){ + this.sentCount.set(0); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java index 1d994b9..cc4a5a8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java @@ -514,6 +514,23 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { brokers.clear(); } + public String buildFailoverUriToAllBrokers() { + StringBuilder uriBuilder = new StringBuilder("failover:("); + + int index = 1, size = brokers.size(); + + for (BrokerItem b : brokers.values()) { + uriBuilder.append(b.getConnectionUri()); + if (index < size) { + uriBuilder.append(","); + index++; + } + + } + uriBuilder.append(")"); + return uriBuilder.toString(); + } + // Class to group broker components together public class BrokerItem { public BrokerService broker; @@ -535,6 +552,10 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { id = new IdGenerator(broker.getBrokerName() + ":"); } + public String getConnectionUri(){ + return broker.getVmConnectorURI().toString(); + } + public Connection createConnection() throws Exception { Connection conn = factory.createConnection(); conn.setClientID(id.generateId()); http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java new file mode 100644 index 0000000..b2ab88e --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java @@ -0,0 +1,693 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerMBeanSupport; +import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheViewMBean; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.util.MessageIdList; +import org.apache.activemq.util.ProducerThread; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.activemq.TestSupport.getDestination; + +/** + * @author <a href="http://www.christianposta.com/blog">Christian Posta</a> + */ +public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends + JmsMultipleBrokersTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerVirtualTopicSelectorAwareForwardingTest.class); + + private static final String PERSIST_SELECTOR_CACHE_FILE_BASEPATH = "./target/selectorCache-"; + + public void testJMX() throws Exception { + clearSelectorCacheFiles(); + // borkerA is local and brokerB is remote + bridgeAndConfigureBrokers("BrokerA", "BrokerB"); + startAllBrokers(); + waitForBridgeFormation(); + + createConsumer("BrokerB", createDestination("Consumer.B.VirtualTopic.tempTopic", false), + "foo = 'bar'"); + + + final BrokerService brokerA = brokers.get("BrokerA").broker; + + String testQueue = "queue://Consumer.B.VirtualTopic.tempTopic"; + VirtualDestinationSelectorCacheViewMBean cache = getVirtualDestinationSelectorCacheMBean(brokerA); + Set<String> selectors = cache.selectorsForDestination(testQueue); + + assertEquals(1, selectors.size()); + assertTrue(selectors.contains("foo = 'bar'")); + + boolean removed = cache.deleteSelectorForDestination(testQueue, "foo = 'bar'"); + assertTrue(removed); + + selectors = cache.selectorsForDestination(testQueue); + assertEquals(0, selectors.size()); + + createConsumer("BrokerB", createDestination("Consumer.B.VirtualTopic.tempTopic", false), + "ceposta = 'redhat'"); + + + Wait.waitFor(new Wait.Condition() { + + Destination dest = brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")); + + @Override + public boolean isSatisified() throws Exception { + return dest.getConsumers().size() == 2; + } + }, 500); + + selectors = cache.selectorsForDestination(testQueue); + assertEquals(1, selectors.size()); + cache.deleteAllSelectorsForDestination(testQueue); + selectors = cache.selectorsForDestination(testQueue); + assertEquals(0, selectors.size()); + + } + + public void testMessageLeaks() throws Exception{ + clearSelectorCacheFiles(); + startAllBrokers(); + + final BrokerService brokerA = brokers.get("BrokerA").broker; + + // Create the remote virtual topic consumer with selector + ActiveMQDestination consumerQueue = createDestination("Consumer.B.VirtualTopic.tempTopic", false); + // create it so that the queue is there and messages don't get lost + MessageConsumer consumer1 = createConsumer("BrokerA", consumerQueue, "SYMBOL = 'AAPL'"); + MessageConsumer consumer2 = createConsumer("BrokerA", consumerQueue, "SYMBOL = 'AAPL'"); + + ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic"); + ProducerThreadTester producerTester = createProducerTester("BrokerA", virtualTopic); + producerTester.setRunIndefinitely(true); + producerTester.setSleep(5); + producerTester.addMessageProperty("AAPL"); + producerTester.addMessageProperty("VIX"); + producerTester.start(); + + int currentCount = producerTester.getSentCount(); + LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + producerTester.getCountForProperty("AAPL") + ", VIX=" + producerTester.getCountForProperty("VIX")); + + // let some messages get sent + Thread.sleep(2000); + + MessageIdList consumer1Messages = getConsumerMessages("BrokerA", consumer1); + consumer1Messages.waitForMessagesToArrive(50, 1000); + + // switch one of the consumers to SYMBOL = 'VIX' + consumer1.close(); + consumer1 = createConsumer("BrokerA", consumerQueue, "SYMBOL = 'VIX'"); + + // wait till new consumer is on board + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getConsumers().size() == 2; + } + }); + + currentCount = producerTester.getSentCount(); + LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + producerTester.getCountForProperty("AAPL") + ", VIX=" + producerTester.getCountForProperty("VIX")); + + // let some messages get sent + Thread.sleep(2000); + + // switch the other consumer to SYMBOL = 'VIX' + consumer2.close(); + consumer2 = createConsumer("BrokerA", consumerQueue, "SYMBOL = 'VIX'"); + + // wait till new consumer is on board + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getConsumers().size() == 2; + } + }); + + currentCount = producerTester.getSentCount(); + LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + producerTester.getCountForProperty("AAPL") + ", VIX=" + producerTester.getCountForProperty("VIX")); + + // let some messages get sent + Thread.sleep(2000); + + currentCount = producerTester.getSentCount(); + LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + producerTester.getCountForProperty("AAPL") + ", VIX=" + producerTester.getCountForProperty("VIX")); + + + // make sure if there are messages that are orphaned in the queue that this number doesn't + // grow... + final long currentDepth = brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount(); + + LOG.info(">>>>> Orphaned messages? " + currentDepth); + + // wait 5s to see if we can get a growth in the depth of the queue + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount() > currentDepth; + } + }, 5000); + + // stop producers + producerTester.setRunning(false); + producerTester.join(); + + // pause to let consumers catch up + Thread.sleep(1000); + + assertTrue(brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount() <= currentDepth); + + + } + + private ProducerThreadTester createProducerTester(String brokerName, javax.jms.Destination destination) throws Exception{ + BrokerItem brokerItem = brokers.get(brokerName); + + Connection conn = brokerItem.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + ProducerThreadTester rc = new ProducerThreadTester(sess, destination); + rc.setPersistent(persistentDelivery); + return rc; + } + + public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws Exception{ + clearSelectorCacheFiles(); + startAllBrokers(); + + BrokerService brokerA = brokers.get("BrokerA").broker; + + // Create the remote virtual topic consumer with selector + ActiveMQDestination consumerQueue = createDestination("Consumer.B.VirtualTopic.tempTopic", false); + + // create it so that the queue is there and messages don't get lost + MessageConsumer selectingConsumer = establishConsumer("BrokerA", consumerQueue); + + // send messages with NO selection criteria first, and then with a property to be selected + // this should put messages at the head of the queue that don't match selection + ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic"); + sendMessages("BrokerA", virtualTopic, 1); + + // close the consumer w/out consuming any messages; they'll be marked redelivered + selectingConsumer.close(); + + selectingConsumer = createConsumer("BrokerA", consumerQueue, "foo = 'bar'"); + + sendMessages("BrokerA", virtualTopic, 1, asMap("foo", "bar")); + + + MessageIdList selectingConsumerMessages = getConsumerMessages("BrokerA", selectingConsumer); + selectingConsumerMessages.waitForMessagesToArrive(1, 1000L); + + assertEquals(1, selectingConsumerMessages.getMessageCount()); + selectingConsumerMessages.waitForMessagesToArrive(10, 1000L); + assertEquals(1, selectingConsumerMessages.getMessageCount()); + + // assert broker A stats + assertEquals(1, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getConsumers().size()); + assertEquals(2, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getEnqueues().getCount()); + assertEquals(1, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getDequeues().getCount()); + assertEquals(1, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount()); + + } + + private MessageConsumer establishConsumer(String broker, ActiveMQDestination consumerQueue) throws Exception{ + BrokerItem item = brokers.get(broker); + Connection c = item.createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + return s.createConsumer(consumerQueue); + } + + public void testSelectorsAndNonSelectors() throws Exception{ + clearSelectorCacheFiles(); + // borkerA is local and brokerB is remote + bridgeAndConfigureBrokers("BrokerA", "BrokerB"); + startAllBrokers(); + waitForBridgeFormation(); + + + final BrokerService brokerA = brokers.get("BrokerA").broker; + final BrokerService brokerB = brokers.get("BrokerB").broker; + + // Create the remote virtual topic consumer with selector + ActiveMQDestination consumerBQueue = createDestination("Consumer.B.VirtualTopic.tempTopic", false); + + MessageConsumer selectingConsumer = createConsumer("BrokerB", consumerBQueue, "foo = 'bar'"); + MessageConsumer nonSelectingConsumer = createConsumer("BrokerB", consumerBQueue); + + // let advisories propogate + Wait.waitFor(new Wait.Condition() { + Destination dest = brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")); + + @Override + public boolean isSatisified() throws Exception { + return dest.getConsumers().size() == 2; + } + }, 500); + + + Destination destination = getDestination(brokerB, consumerBQueue); + assertEquals(2, destination.getConsumers().size()); + + // publisher publishes to this + ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic"); + sendMessages("BrokerA", virtualTopic, 10, asMap("foo", "bar")); + sendMessages("BrokerA", virtualTopic, 10); + + + MessageIdList selectingConsumerMessages = getConsumerMessages("BrokerB", selectingConsumer); + + + MessageIdList nonSelectingConsumerMessages = getConsumerMessages("BrokerB", nonSelectingConsumer); + + // we only expect half of the messages that get sent with the selector, because they get load balanced + selectingConsumerMessages.waitForMessagesToArrive(5, 1000L); + assertEquals(5, selectingConsumerMessages.getMessageCount()); + + nonSelectingConsumerMessages.waitForMessagesToArrive(15, 1000L); + assertEquals(15, nonSelectingConsumerMessages.getMessageCount()); + + // assert broker A stats + assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getEnqueues().getCount()); + assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getDequeues().getCount()); + assertEquals(0, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount()); + + // assert broker B stats + assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getEnqueues().getCount()); + assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getDequeues().getCount()); + assertEquals(0, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount()); + + + //now let's close the consumer without the selector + nonSelectingConsumer.close(); + + + // let advisories propogate + Wait.waitFor(new Wait.Condition() { + Destination dest = brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")); + + @Override + public boolean isSatisified() throws Exception { + return dest.getConsumers().size() == 1; + } + }, 500); + + // and let's send messages with a selector that doesnt' match + selectingConsumerMessages.flushMessages(); + + sendMessages("BrokerA", virtualTopic, 10, asMap("ceposta", "redhat")); + + selectingConsumerMessages = getConsumerMessages("BrokerB", selectingConsumer); + selectingConsumerMessages.waitForMessagesToArrive(1, 1000L); + assertEquals(0, selectingConsumerMessages.getMessageCount()) ; + + // assert broker A stats + assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getEnqueues().getCount()); + assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getDequeues().getCount()); + assertEquals(0, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount()); + + // assert broker B stats + assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getEnqueues().getCount()); + assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getDequeues().getCount()); + assertEquals(0, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount()); + + // now lets disconect the selecting consumer for a sec and send messages with a selector that DOES match + selectingConsumer.close(); + + // let advisories propogate + Wait.waitFor(new Wait.Condition() { + Destination dest = brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")); + + @Override + public boolean isSatisified() throws Exception { + return dest.getConsumers().size() == 0; + } + }, 500); + + selectingConsumerMessages.flushMessages(); + + sendMessages("BrokerA", virtualTopic, 10, asMap("foo", "bar")); + + + // assert broker A stats + assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getEnqueues().getCount()); + assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getDequeues().getCount()); + assertEquals(10, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount()); + + // assert broker B stats + assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getEnqueues().getCount()); + assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getDequeues().getCount()); + assertEquals(0, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount()); + + selectingConsumer = createConsumer("BrokerB", consumerBQueue, "foo = 'bar'"); + selectingConsumerMessages = getConsumerMessages("BrokerB", selectingConsumer); + selectingConsumerMessages.waitForMessagesToArrive(10); + assertEquals(10, selectingConsumerMessages.getMessageCount()); + + // let advisories propogate + Wait.waitFor(new Wait.Condition() { + Destination dest = brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")); + + @Override + public boolean isSatisified() throws Exception { + return dest.getConsumers().size() == 1; + } + }, 500); + + // assert broker A stats + assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getEnqueues().getCount()); + assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getDequeues().getCount()); + assertEquals(0, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount()); + + // assert broker B stats + assertEquals(30, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getEnqueues().getCount()); + assertEquals(30, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getDequeues().getCount()); + assertEquals(0, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount()); + + + } + + public VirtualDestinationSelectorCacheViewMBean getVirtualDestinationSelectorCacheMBean(BrokerService broker) + throws MalformedObjectNameException { + ObjectName objectName = BrokerMBeanSupport + .createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache"); + return (VirtualDestinationSelectorCacheViewMBean)broker.getManagementContext() + .newProxyInstance(objectName, VirtualDestinationSelectorCacheViewMBean.class, true); + } + + public void testSelectorAwareForwarding() throws Exception { + clearSelectorCacheFiles(); + // borkerA is local and brokerB is remote + bridgeAndConfigureBrokers("BrokerA", "BrokerB"); + startAllBrokers(); + waitForBridgeFormation(); + + final BrokerService brokerB = brokers.get("BrokerB").broker; + final BrokerService brokerA = brokers.get("BrokerA").broker; + + // Create the remote virtual topic consumer with selector + MessageConsumer remoteConsumer = createConsumer("BrokerB", + createDestination("Consumer.B.VirtualTopic.tempTopic", false), + "foo = 'bar'"); + + + // let advisories propogate + Wait.waitFor(new Wait.Condition() { + Destination dest = brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")); + + @Override + public boolean isSatisified() throws Exception { + return dest.getConsumers().size() == 1; + } + }, 500); + + ActiveMQQueue queueB = new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"); + Destination destination = getDestination(brokers.get("BrokerB").broker, queueB); + assertEquals(1, destination.getConsumers().size()); + + ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic"); + assertNull(getDestination(brokers.get("BrokerA").broker, virtualTopic)); + assertNull(getDestination(brokers.get("BrokerB").broker, virtualTopic)); + + // send two types of messages, one unwanted and the other wanted + sendMessages("BrokerA", virtualTopic, 1, asMap("foo", "bar")); + sendMessages("BrokerA", virtualTopic, 1, asMap("ceposta", "redhat")); + + MessageIdList msgsB = getConsumerMessages("BrokerB", remoteConsumer); + // wait for the wanted one to arrive at the remote consumer + msgsB.waitForMessagesToArrive(1); + + // ensure we don't get any more messages + msgsB.waitForMessagesToArrive(1, 1000); + + // remote consumer should only get one of the messages + assertEquals(1, msgsB.getMessageCount()); + + // and the enqueue count for the remote queue should only be 1 + assertEquals(1, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getEnqueues().getCount()); + + + // now let's remove the consumer on broker B and recreate it with new selector + remoteConsumer.close(); + + + + + // now let's shut down broker A and clear its persistent selector cache + brokerA.stop(); + brokerA.waitUntilStopped(); + deleteSelectorCacheFile("BrokerA"); + + assertEquals(0, destination.getConsumers().size()); + + remoteConsumer = createConsumer("BrokerB", + createDestination("Consumer.B.VirtualTopic.tempTopic", false), + "ceposta = 'redhat'"); + + + assertEquals(1, destination.getConsumers().size()); + + + // now let's start broker A back up + brokerA.start(true); + brokerA.waitUntilStarted(); + + System.out.println(brokerA.getNetworkConnectors()); + + // give a sec to let advisories propogate + // let advisories propogate + Wait.waitFor(new Wait.Condition() { + Destination dest = brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")); + + @Override + public boolean isSatisified() throws Exception { + return dest.getConsumers().size() == 1; + } + }, 500); + + + // send two types of messages, one unwanted and the other wanted + sendMessages("BrokerA", virtualTopic, 1, asMap("foo", "bar")); + sendMessages("BrokerB", virtualTopic, 1, asMap("foo", "bar")); + sendMessages("BrokerA", virtualTopic, 1, asMap("ceposta", "redhat")); + sendMessages("BrokerB", virtualTopic, 1, asMap("ceposta", "redhat")); + + // lets get messages on consumer B + msgsB = getConsumerMessages("BrokerB", remoteConsumer); + msgsB.waitForMessagesToArrive(2); + + // ensure we don't get any more messages + msgsB.waitForMessagesToArrive(1, 1000); + + + // remote consumer should only get 10 of the messages + assertEquals(2, msgsB.getMessageCount()); + + + // queue should be drained + assertEquals(0, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getMessages().getCount()); + // and the enqueue count for the remote queue should only be 1 + assertEquals(3, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) + .getDestinationStatistics().getEnqueues().getCount()); + + + } + + private HashMap<String, Object> asMap(String key, Object value) { + HashMap<String, Object> rc = new HashMap<String,Object>(1); + rc.put(key, value); + return rc; + } + + + + private void bridgeAndConfigureBrokers(String local, String remote) + throws Exception { + NetworkConnector bridge = bridgeBrokers(local, remote, false, 1, false); + bridge.setDecreaseNetworkConsumerPriority(true); + bridge.setDuplex(true); + } + + public void setUp() throws Exception { + super.setAutoFail(true); + super.setUp(); + String options = new String( + "?useJmx=false&deleteAllMessagesOnStartup=true"); + createAndConfigureBroker(new URI( + "broker:(tcp://localhost:61616)/BrokerA" + options)); + createAndConfigureBroker(new URI( + "broker:(tcp://localhost:61617)/BrokerB" + options)); + } + + private void clearSelectorCacheFiles() { + String[] brokerNames = new String[]{"BrokerA", "BrokerB"}; + for (String brokerName : brokerNames) { + deleteSelectorCacheFile(brokerName); + } + } + + private void deleteSelectorCacheFile(String brokerName) { + File brokerPersisteFile = new File(PERSIST_SELECTOR_CACHE_FILE_BASEPATH + brokerName); + + if (brokerPersisteFile.exists()) { + brokerPersisteFile.delete(); + } + } + + private BrokerService createAndConfigureBroker(URI uri) throws Exception { + BrokerService broker = createBroker(uri); + broker.setUseJmx(true); + // Make topics "selectorAware" + VirtualTopic virtualTopic = new VirtualTopic(); + virtualTopic.setSelectorAware(true); + VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); + interceptor + .setVirtualDestinations(new VirtualDestination[] { virtualTopic }); + broker.setDestinationInterceptors(new DestinationInterceptor[] { interceptor }); + configurePersistenceAdapter(broker); + + SubQueueSelectorCacheBrokerPlugin selectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin(); + selectorCacheBrokerPlugin.setSingleSelectorPerDestination(true); + File persisteFile = new File(PERSIST_SELECTOR_CACHE_FILE_BASEPATH + broker.getBrokerName()); + selectorCacheBrokerPlugin.setPersistFile(persisteFile); + broker.setPlugins(new BrokerPlugin[]{selectorCacheBrokerPlugin}); + return broker; + } + + protected void configurePersistenceAdapter(BrokerService broker) + throws IOException { + File dataFileDir = new File("target/test-amq-data/kahadb/" + + broker.getBrokerName()); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(dataFileDir); + broker.setPersistenceAdapter(kaha); + } + + class ProducerThreadTester extends ProducerThread { + + private Set<String> selectors = new LinkedHashSet<String>(); + private Map<String, AtomicInteger> selectorCounts = new HashMap<String, AtomicInteger>(); + private Random rand = new Random(System.currentTimeMillis()); + + + public ProducerThreadTester(Session session, javax.jms.Destination destination) { + super(session, destination); + } + + @Override + protected Message createMessage(int i) throws Exception { + TextMessage msg = createTextMessage(this.session, "Message-" + i); + if (selectors.size() > 0) { + String value = getRandomKey(); + msg.setStringProperty("SYMBOL", value); + AtomicInteger currentCount = selectorCounts.get(value); + currentCount.incrementAndGet(); + } + + return msg; + } + + @Override + public void resetCounters() { + super.resetCounters(); + for (String key : selectorCounts.keySet()) { + selectorCounts.put(key, new AtomicInteger(0)); + } + } + + private String getRandomKey() { + ArrayList<String> keys = new ArrayList(selectors); + return keys.get(rand.nextInt(keys.size())); + } + + public void addMessageProperty(String value) { + if (!this.selectors.contains(value)) { + selectors.add(value); + selectorCounts.put(value, new AtomicInteger(0)); + } + } + + public int getCountForProperty(String key) { + return selectorCounts.get(key).get(); + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java b/activemq-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java index 7140a86..fcc7fe6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java @@ -135,7 +135,10 @@ public class MessageIdList extends Assert implements MessageListener { } } - public void waitForMessagesToArrive(int messageCount) { + public void waitForMessagesToArrive(int messageCount){ + waitForMessagesToArrive(messageCount, maximumDuration); + } + public void waitForMessagesToArrive(int messageCount, long maximumDuration) { LOG.info("Waiting for " + messageCount + " message(s) to arrive"); long start = System.currentTimeMillis();
