Repository: activemq
Updated Branches:
  refs/heads/master e16d05436 -> 61da1faa4


https://issues.apache.org/jira/browse/AMQ-5672 Added an option for allowing 
only a single selector for the virtual destination selector cache. also added 
some JMX views into the selector cache that can be used at runtime. includes 
unit tests


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/61da1faa
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/61da1faa
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/61da1faa

Branch: refs/heads/master
Commit: 61da1faa4c4e49191b373049ee36434aaad58897
Parents: e16d054
Author: Christian Posta <[email protected]>
Authored: Thu Mar 12 18:14:09 2015 -0700
Committer: Christian Posta <[email protected]>
Committed: Mon Apr 6 16:12:20 2015 -0700

----------------------------------------------------------------------
 .../activemq/broker/jmx/BrokerMBeanSupport.java |   7 +
 .../VirtualDestinationSelectorCacheView.java    |  49 ++
 ...irtualDestinationSelectorCacheViewMBean.java |  36 +
 .../network/DemandForwardingBridgeSupport.java  |   2 +-
 .../plugin/SubQueueSelectorCacheBroker.java     | 157 ++++-
 .../SubQueueSelectorCacheBrokerPlugin.java      |  35 +-
 .../apache/activemq/util/ProducerThread.java    |  68 +-
 .../activemq/JmsMultipleBrokersTestSupport.java |  21 +
 ...VirtualTopicSelectorAwareForwardingTest.java | 693 +++++++++++++++++++
 .../org/apache/activemq/util/MessageIdList.java |   5 +-
 10 files changed, 1042 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
index e7d888d..43254c5 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
@@ -161,6 +161,13 @@ public class BrokerMBeanSupport {
         return createNetworkConnectorName(brokerObjectName.toString(), type, 
name);
     }
 
+    public static ObjectName 
createVirtualDestinationSelectorCacheName(ObjectName brokerObjectName, String 
type, String name) throws MalformedObjectNameException {
+        String objectNameStr = brokerObjectName.toString();
+        objectNameStr += ",service=" + type + 
",virtualDestinationSelectoCache="+ JMXSupport.encodeObjectNamePart(name);
+        ObjectName objectName = new ObjectName(objectNameStr);
+        return objectName;
+    }
+
     public static ObjectName createNetworkConnectorName(String 
brokerObjectName, String type, String name) throws MalformedObjectNameException 
{
         String objectNameStr = brokerObjectName;
         objectNameStr += ",connector=" + type + ",networkConnectorName="+ 
JMXSupport.encodeObjectNamePart(name);

http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheView.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheView.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheView.java
new file mode 100644
index 0000000..6fbb33e
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheView.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.plugin.SubQueueSelectorCacheBroker;
+
+import java.util.Set;
+
+/**
+ * Created by ceposta
+ * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
+ */
+public class VirtualDestinationSelectorCacheView implements 
VirtualDestinationSelectorCacheViewMBean {
+
+    private final SubQueueSelectorCacheBroker selectorCacheBroker;
+
+    public VirtualDestinationSelectorCacheView(SubQueueSelectorCacheBroker 
selectorCacheBroker) {
+        this.selectorCacheBroker = selectorCacheBroker;
+    }
+
+    @Override
+    public Set<String> selectorsForDestination(String destinationName) {
+        return selectorCacheBroker.getSelectorsForDestination(destinationName);
+    }
+
+    @Override
+    public boolean deleteSelectorForDestination(String destinationName, String 
selector) {
+        return 
selectorCacheBroker.deleteSelectorForDestination(destinationName, selector);
+    }
+
+    @Override
+    public boolean deleteAllSelectorsForDestination(String destinationName) {
+        return 
selectorCacheBroker.deleteAllSelectorsForDestination(destinationName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheViewMBean.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheViewMBean.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheViewMBean.java
new file mode 100644
index 0000000..7490d13
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/VirtualDestinationSelectorCacheViewMBean.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import java.util.Set;
+
+/**
+ * Created by ceposta
+ * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
+ */
+public interface VirtualDestinationSelectorCacheViewMBean {
+
+    @MBeanInfo("Dump raw cache of selectors organized by destination")
+    public Set<String> selectorsForDestination(String destinationName);
+
+    @MBeanInfo("Delete a selector for a destination. Selector must match what 
returns from selectorsForDestination operation")
+    public boolean deleteSelectorForDestination(String destinationName, String 
selector);
+
+    @MBeanInfo("Dump raw cache of selectors organized by destination")
+    public boolean deleteAllSelectorsForDestination(String destinationName);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 83eea31..bbf11f0 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -981,7 +981,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                         }
 
                         Message message = configureMessage(md);
-                        LOG.debug("bridging ({} -> {}), consumer: {}, 
destinaition: {}, brokerPath: {}, message: {}", new Object[]{
+                        LOG.debug("bridging ({} -> {}), consumer: {}, 
destination: {}, brokerPath: {}, message: {}", new Object[]{
                                 configuration.getBrokerName(), 
remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), 
md.getConsumerId(), message.getDestination(), 
Arrays.toString(message.getBrokerPath()), message
                         });
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
index af02b54..c6a788f 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
@@ -16,14 +16,21 @@
  */
 package org.apache.activemq.plugin;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.jmx.AnnotatedMBean;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheView;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ConsumerInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.JMException;
+import javax.management.ObjectName;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -49,6 +56,7 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class SubQueueSelectorCacheBroker extends BrokerFilter implements 
Runnable {
     private static final Logger LOG = 
LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class);
+    public static final String MATCH_EVERYTHING = "TRUE";
 
     /**
      * The subscription's selector cache. We cache compiled expressions keyed
@@ -57,10 +65,14 @@ public class SubQueueSelectorCacheBroker extends 
BrokerFilter implements Runnabl
     private ConcurrentHashMap<String, Set<String>> subSelectorCache = new 
ConcurrentHashMap<String, Set<String>>();
 
     private final File persistFile;
+    private boolean singleSelectorPerDestination = false;
+    private boolean ignoreWildcardSelectors = false;
+    private ObjectName objectName;
 
     private boolean running = true;
     private Thread persistThread;
-    private static final long MAX_PERSIST_INTERVAL = 600000;
+    private long persistInterval = MAX_PERSIST_INTERVAL;
+    public static final long MAX_PERSIST_INTERVAL = 600000;
     private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = 
"SelectorCachePersistThread";
 
     /**
@@ -75,6 +87,22 @@ public class SubQueueSelectorCacheBroker extends 
BrokerFilter implements Runnabl
 
         persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME);
         persistThread.start();
+        enableJmx();
+    }
+
+    private void enableJmx() {
+        BrokerService broker = getBrokerService();
+        if (broker.isUseJmx()) {
+            VirtualDestinationSelectorCacheView view = new 
VirtualDestinationSelectorCacheView(this);
+            try {
+                objectName = 
BrokerMBeanSupport.createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(),
 "plugin", "virtualDestinationCache");
+                LOG.trace("virtualDestinationCacheSelector mbean name; " + 
objectName.toString());
+                AnnotatedMBean.registerMBean(broker.getManagementContext(), 
view, objectName);
+            } catch (Exception e) {
+                LOG.warn("JMX is enabled, but when installing the 
VirtualDestinationSelectorCache, couldn't install the JMX mbeans. Continuing 
without installing the mbeans.");
+            }
+
+        }
     }
 
     @Override
@@ -84,29 +112,79 @@ public class SubQueueSelectorCacheBroker extends 
BrokerFilter implements Runnabl
             persistThread.interrupt();
             persistThread.join();
         } //if
+        unregisterMBeans();
+    }
+
+    private void unregisterMBeans() {
+        BrokerService broker = getBrokerService();
+        if (broker.isUseJmx() && this.objectName != null) {
+            try {
+                broker.getManagementContext().unregisterMBean(objectName);
+            } catch (JMException e) {
+                LOG.warn("Trying uninstall VirtualDestinationSelectorCache; 
couldn't uninstall mbeans, continuting...");
+            }
+        }
     }
 
     @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo 
info) throws Exception {
-        String destinationName = info.getDestination().getQualifiedName();
-        LOG.debug("Caching consumer selector [{}] on a {}", 
info.getSelector(), destinationName);
-        String selector = info.getSelector();
+        // don't track selectors for advisory topics
+        if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
+            String destinationName = info.getDestination().getQualifiedName();
+            LOG.debug("Caching consumer selector [{}] on  '{}'", 
info.getSelector(), destinationName);
 
-        // As ConcurrentHashMap doesn't support null values, use always true 
expression
-        if (selector == null) {
-            selector = "TRUE";
-        }
+            String selector = info.getSelector() == null ? MATCH_EVERYTHING : 
info.getSelector();
 
-        Set<String> selectors = subSelectorCache.get(destinationName);
-        if (selectors == null) {
-            selectors = Collections.synchronizedSet(new HashSet<String>());
-        }
-        selectors.add(selector);
-        subSelectorCache.put(destinationName, selectors);
+            if (!(ignoreWildcardSelectors && hasWildcards(selector))) {
+
+                Set<String> selectors = subSelectorCache.get(destinationName);
+                if (selectors == null) {
+                    selectors = Collections.synchronizedSet(new 
HashSet<String>());
+                } else if (singleSelectorPerDestination && 
!MATCH_EVERYTHING.equals(selector)) {
+                    // in this case, we allow only ONE selector. But we don't 
count the catch-all "null/TRUE" selector
+                    // here, we always allow that one. But only one true 
selector.
+                    boolean containsMatchEverything = 
selectors.contains(MATCH_EVERYTHING);
+                    selectors.clear();
 
+                    // put back the MATCH_EVERYTHING selector
+                    if (containsMatchEverything) {
+                        selectors.add(MATCH_EVERYTHING);
+                    }
+                }
+
+                LOG.debug("adding new selector: into cache " + selector);
+                selectors.add(selector);
+                LOG.debug("current selectors in cache: " + selectors);
+                subSelectorCache.put(destinationName, selectors);
+            }
+
+
+        }
         return super.addConsumer(context, info);
     }
 
+    // trivial check for SQL92/selector wildcards
+    private boolean hasWildcards(String selector) {
+        return selector.contains("%") || selector.contains("_");
+    }
+
+    @Override
+    public void removeConsumer(ConnectionContext context, ConsumerInfo info) 
throws Exception {
+        if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
+
+            if (singleSelectorPerDestination) {
+                String destinationName = 
info.getDestination().getQualifiedName();
+                Set<String> selectors = subSelectorCache.get(destinationName);
+                if (info.getSelector() == null && selectors.size() > 1) {
+                    boolean removed = selectors.remove(MATCH_EVERYTHING);
+                    LOG.debug("A non-selector consumer has dropped. Removing 
the catchall matching pattern 'TRUE'. Successful? " + removed);
+                }
+            }
+
+        }
+        super.removeConsumer(context, info);
+    }
+
     private void readCache() {
         if (persistFile != null && persistFile.exists()) {
             try {
@@ -169,12 +247,61 @@ public class SubQueueSelectorCacheBroker extends 
BrokerFilter implements Runnabl
     public void run() {
         while (running) {
             try {
-                Thread.sleep(MAX_PERSIST_INTERVAL);
+                Thread.sleep(persistInterval);
             } catch (InterruptedException ex) {
             } //try
 
             persistCache();
         }
     }
+
+    public boolean isSingleSelectorPerDestination() {
+        return singleSelectorPerDestination;
+    }
+
+    public void setSingleSelectorPerDestination(boolean 
singleSelectorPerDestination) {
+        this.singleSelectorPerDestination = singleSelectorPerDestination;
+    }
+
+    public Set<String> getSelectorsForDestination(String destinationName) {
+        if (subSelectorCache.containsKey(destinationName)) {
+            return new HashSet<String>(subSelectorCache.get(destinationName));
+        }
+
+        return Collections.EMPTY_SET;
+    }
+
+    public long getPersistInterval() {
+        return persistInterval;
+    }
+
+    public void setPersistInterval(long persistInterval) {
+        this.persistInterval = persistInterval;
+    }
+
+    public boolean deleteSelectorForDestination(String destinationName, String 
selector) {
+        if (subSelectorCache.containsKey(destinationName)) {
+            Set<String> cachedSelectors = 
subSelectorCache.get(destinationName);
+            return cachedSelectors.remove(selector);
+        }
+
+        return false;
+    }
+
+    public boolean deleteAllSelectorsForDestination(String destinationName) {
+        if (subSelectorCache.containsKey(destinationName)) {
+            Set<String> cachedSelectors = 
subSelectorCache.get(destinationName);
+            cachedSelectors.clear();
+        }
+        return true;
+    }
+
+    public boolean isIgnoreWildcardSelectors() {
+        return ignoreWildcardSelectors;
+    }
+
+    public void setIgnoreWildcardSelectors(boolean ignoreWildcardSelectors) {
+        this.ignoreWildcardSelectors = ignoreWildcardSelectors;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java
 
b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java
index 01f5e90..72be2cd 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java
@@ -21,6 +21,8 @@ import java.io.File;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerPlugin;
 
+import static 
org.apache.activemq.plugin.SubQueueSelectorCacheBroker.MAX_PERSIST_INTERVAL;
+
 /**
  * A plugin which allows the caching of the selector from a subscription queue.
  * <p/>
@@ -36,10 +38,17 @@ public class SubQueueSelectorCacheBrokerPlugin implements 
BrokerPlugin {
 
 
     private File persistFile;
+    private boolean singleSelectorPerDestination = false;
+    private boolean ignoreWildcardSelectors = false;
+    private long persistInterval = MAX_PERSIST_INTERVAL;
 
     @Override
     public Broker installPlugin(Broker broker) throws Exception {
-        return new SubQueueSelectorCacheBroker(broker, persistFile);
+        SubQueueSelectorCacheBroker rc = new 
SubQueueSelectorCacheBroker(broker, persistFile);
+        rc.setSingleSelectorPerDestination(singleSelectorPerDestination);
+        rc.setPersistInterval(persistInterval);
+        rc.setIgnoreWildcardSelectors(ignoreWildcardSelectors);
+        return rc;
     }
 
     /**
@@ -52,4 +61,28 @@ public class SubQueueSelectorCacheBrokerPlugin implements 
BrokerPlugin {
     public File getPersistFile() {
         return persistFile;
     }
+
+    public boolean isSingleSelectorPerDestination() {
+        return singleSelectorPerDestination;
+    }
+
+    public void setSingleSelectorPerDestination(boolean 
singleSelectorPerDestination) {
+        this.singleSelectorPerDestination = singleSelectorPerDestination;
+    }
+
+    public long getPersistInterval() {
+        return persistInterval;
+    }
+
+    public void setPersistInterval(long persistInterval) {
+        this.persistInterval = persistInterval;
+    }
+
+    public boolean isIgnoreWildcardSelectors() {
+        return ignoreWildcardSelectors;
+    }
+
+    public void setIgnoreWildcardSelectors(boolean ignoreWildcardSelectors) {
+        this.ignoreWildcardSelectors = ignoreWildcardSelectors;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java 
b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
index ffaa735..00422e9 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
@@ -23,12 +23,14 @@ import javax.jms.*;
 import java.io.*;
 import java.net.URL;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class ProducerThread extends Thread {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ProducerThread.class);
 
     int messageCount = 1000;
+    boolean runIndefinitely = false;
     Destination destination;
     protected Session session;
     int sleep = 0;
@@ -40,13 +42,14 @@ public class ProducerThread extends Thread {
     int transactionBatchSize;
 
     int transactions = 0;
-    int sentCount = 0;
+    AtomicInteger sentCount = new AtomicInteger(0);
     String message;
     String messageText = null;
     String payloadUrl = null;
     byte[] payload = null;
     boolean running = false;
     CountDownLatch finished;
+    CountDownLatch paused = new CountDownLatch(0);
 
 
     public ProducerThread(Session session, Destination destination) {
@@ -67,18 +70,20 @@ public class ProducerThread extends Thread {
             LOG.info(threadName +  " Started to calculate elapsed time ...\n");
             long tStart = System.currentTimeMillis();
 
-            for (sentCount = 0; sentCount < messageCount && running; 
sentCount++) {
-                Message message = createMessage(sentCount);
-                producer.send(message);
-                LOG.info(threadName + " Sent: " + (message instanceof 
TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
-
-                if (transactionBatchSize > 0 && sentCount > 0 && sentCount % 
transactionBatchSize == 0) {
-                    LOG.info(threadName + " Committing transaction: " + 
transactions++);
-                    session.commit();
+            if (runIndefinitely) {
+                while (running) {
+                    synchronized (this) {
+                        paused.await();
+                    }
+                    sendMessage(producer, threadName);
+                    sentCount.incrementAndGet();
                 }
-
-                if (sleep > 0) {
-                    Thread.sleep(sleep);
+            }else{
+                for (sentCount.set(0); sentCount.get() < messageCount && 
running; sentCount.incrementAndGet()) {
+                    synchronized (this) {
+                        paused.await();
+                    }
+                    sendMessage(producer, threadName);
                 }
             }
 
@@ -104,6 +109,23 @@ public class ProducerThread extends Thread {
         }
     }
 
+    private void sendMessage(MessageProducer producer, String threadName) 
throws Exception {
+        Message message = createMessage(sentCount.get());
+        producer.send(message);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(threadName + " Sent: " + (message instanceof TextMessage 
? ((TextMessage) message).getText() : message.getJMSMessageID()));
+        }
+
+        if (transactionBatchSize > 0 && sentCount.get() > 0 && sentCount.get() 
% transactionBatchSize == 0) {
+            LOG.info(threadName + " Committing transaction: " + 
transactions++);
+            session.commit();
+        }
+
+        if (sleep > 0) {
+            Thread.sleep(sleep);
+        }
+    }
+
     private void initPayLoad() {
         if (messageSize > 0) {
             payload = new byte[messageSize];
@@ -182,7 +204,7 @@ public class ProducerThread extends Thread {
     }
 
     public int getSentCount() {
-        return sentCount;
+        return sentCount.get();
     }
 
     public boolean isPersistent() {
@@ -264,4 +286,24 @@ public class ProducerThread extends Thread {
     public void setMessage(String message) {
         this.message = message;
     }
+
+    public boolean isRunIndefinitely() {
+        return runIndefinitely;
+    }
+
+    public void setRunIndefinitely(boolean runIndefinitely) {
+        this.runIndefinitely = runIndefinitely;
+    }
+
+    public synchronized void pauseProducer(){
+        this.paused = new CountDownLatch(1);
+    }
+
+    public synchronized void resumeProducer(){
+        this.paused.countDown();
+    }
+
+    public void resetCounters(){
+        this.sentCount.set(0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
index 1d994b9..cc4a5a8 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
@@ -514,6 +514,23 @@ public class JmsMultipleBrokersTestSupport extends 
CombinationTestSupport {
         brokers.clear();
     }
 
+    public String buildFailoverUriToAllBrokers() {
+        StringBuilder uriBuilder = new StringBuilder("failover:(");
+
+        int index = 1, size = brokers.size();
+
+        for (BrokerItem b : brokers.values()) {
+            uriBuilder.append(b.getConnectionUri());
+            if (index < size) {
+                uriBuilder.append(",");
+                index++;
+            }
+
+        }
+        uriBuilder.append(")");
+        return uriBuilder.toString();
+    }
+
     // Class to group broker components together
     public class BrokerItem {
         public BrokerService broker;
@@ -535,6 +552,10 @@ public class JmsMultipleBrokersTestSupport extends 
CombinationTestSupport {
             id = new IdGenerator(broker.getBrokerName() + ":");
         }
 
+        public String getConnectionUri(){
+            return broker.getVmConnectorURI().toString();
+        }
+
         public Connection createConnection() throws Exception {
             Connection conn = factory.createConnection();
             conn.setClientID(id.generateId());

http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
new file mode 100644
index 0000000..b2ab88e
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheViewMBean;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.ProducerThread;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.activemq.TestSupport.getDestination;
+
+/**
+ * @author <a href="http://www.christianposta.com/blog";>Christian Posta</a>
+ */
+public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
+        JmsMultipleBrokersTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TwoBrokerVirtualTopicSelectorAwareForwardingTest.class);
+
+    private static final String PERSIST_SELECTOR_CACHE_FILE_BASEPATH = 
"./target/selectorCache-";
+
+    public void testJMX() throws Exception {
+        clearSelectorCacheFiles();
+        // borkerA is local and brokerB is remote
+        bridgeAndConfigureBrokers("BrokerA", "BrokerB");
+        startAllBrokers();
+        waitForBridgeFormation();
+
+        createConsumer("BrokerB", 
createDestination("Consumer.B.VirtualTopic.tempTopic", false),
+                "foo = 'bar'");
+
+
+        final BrokerService brokerA = brokers.get("BrokerA").broker;
+
+        String testQueue = "queue://Consumer.B.VirtualTopic.tempTopic";
+        VirtualDestinationSelectorCacheViewMBean cache = 
getVirtualDestinationSelectorCacheMBean(brokerA);
+        Set<String> selectors = cache.selectorsForDestination(testQueue);
+
+        assertEquals(1, selectors.size());
+        assertTrue(selectors.contains("foo = 'bar'"));
+
+        boolean removed = cache.deleteSelectorForDestination(testQueue, "foo = 
'bar'");
+        assertTrue(removed);
+
+        selectors = cache.selectorsForDestination(testQueue);
+        assertEquals(0, selectors.size());
+
+        createConsumer("BrokerB", 
createDestination("Consumer.B.VirtualTopic.tempTopic", false),
+                "ceposta = 'redhat'");
+
+
+        Wait.waitFor(new Wait.Condition() {
+
+            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return dest.getConsumers().size() == 2;
+            }
+        }, 500);
+
+        selectors = cache.selectorsForDestination(testQueue);
+        assertEquals(1, selectors.size());
+        cache.deleteAllSelectorsForDestination(testQueue);
+        selectors = cache.selectorsForDestination(testQueue);
+        assertEquals(0, selectors.size());
+
+    }
+
+    public void testMessageLeaks() throws Exception{
+        clearSelectorCacheFiles();
+        startAllBrokers();
+
+        final BrokerService brokerA = brokers.get("BrokerA").broker;
+
+        // Create the remote virtual topic consumer with selector
+        ActiveMQDestination consumerQueue = 
createDestination("Consumer.B.VirtualTopic.tempTopic", false);
+        // create it so that the queue is there and messages don't get lost
+        MessageConsumer consumer1 = createConsumer("BrokerA", consumerQueue, 
"SYMBOL = 'AAPL'");
+        MessageConsumer consumer2 = createConsumer("BrokerA", consumerQueue, 
"SYMBOL = 'AAPL'");
+
+        ActiveMQTopic virtualTopic = new 
ActiveMQTopic("VirtualTopic.tempTopic");
+        ProducerThreadTester producerTester = createProducerTester("BrokerA", 
virtualTopic);
+        producerTester.setRunIndefinitely(true);
+        producerTester.setSleep(5);
+        producerTester.addMessageProperty("AAPL");
+        producerTester.addMessageProperty("VIX");
+        producerTester.start();
+
+        int currentCount = producerTester.getSentCount();
+        LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + 
producerTester.getCountForProperty("AAPL") + ", VIX=" + 
producerTester.getCountForProperty("VIX"));
+
+        // let some messages get sent
+        Thread.sleep(2000);
+
+        MessageIdList consumer1Messages = getConsumerMessages("BrokerA", 
consumer1);
+        consumer1Messages.waitForMessagesToArrive(50, 1000);
+
+        // switch one of the consumers to SYMBOL = 'VIX'
+        consumer1.close();
+        consumer1 = createConsumer("BrokerA", consumerQueue, "SYMBOL = 'VIX'");
+
+        // wait till new consumer is on board
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                        .getConsumers().size() == 2;
+            }
+        });
+
+        currentCount = producerTester.getSentCount();
+        LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + 
producerTester.getCountForProperty("AAPL") + ", VIX=" + 
producerTester.getCountForProperty("VIX"));
+
+        // let some messages get sent
+        Thread.sleep(2000);
+
+        // switch the other consumer to SYMBOL = 'VIX'
+        consumer2.close();
+        consumer2 = createConsumer("BrokerA", consumerQueue, "SYMBOL = 'VIX'");
+
+        // wait till new consumer is on board
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                        .getConsumers().size() == 2;
+            }
+        });
+
+        currentCount = producerTester.getSentCount();
+        LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + 
producerTester.getCountForProperty("AAPL") + ", VIX=" + 
producerTester.getCountForProperty("VIX"));
+
+        // let some messages get sent
+        Thread.sleep(2000);
+
+        currentCount = producerTester.getSentCount();
+        LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + 
producerTester.getCountForProperty("AAPL") + ", VIX=" + 
producerTester.getCountForProperty("VIX"));
+
+
+        // make sure if there are messages that are orphaned in the queue that 
this number doesn't
+        // grow...
+        final long currentDepth = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount();
+
+        LOG.info(">>>>> Orphaned messages? " + currentDepth);
+
+        // wait 5s to see if we can get a growth in the depth of the queue
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                        .getDestinationStatistics().getMessages().getCount() > 
currentDepth;
+            }
+        }, 5000);
+
+        // stop producers
+        producerTester.setRunning(false);
+        producerTester.join();
+
+        // pause to let consumers catch up
+        Thread.sleep(1000);
+
+        assertTrue(brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount() <= 
currentDepth);
+
+
+    }
+
+    private ProducerThreadTester createProducerTester(String brokerName, 
javax.jms.Destination destination) throws Exception{
+        BrokerItem brokerItem = brokers.get(brokerName);
+
+        Connection conn = brokerItem.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ProducerThreadTester rc = new ProducerThreadTester(sess, destination);
+        rc.setPersistent(persistentDelivery);
+        return rc;
+    }
+
+    public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws 
Exception{
+        clearSelectorCacheFiles();
+        startAllBrokers();
+
+        BrokerService brokerA = brokers.get("BrokerA").broker;
+
+        // Create the remote virtual topic consumer with selector
+        ActiveMQDestination consumerQueue = 
createDestination("Consumer.B.VirtualTopic.tempTopic", false);
+
+        // create it so that the queue is there and messages don't get lost
+        MessageConsumer selectingConsumer = establishConsumer("BrokerA", 
consumerQueue);
+
+        // send messages with NO selection criteria first, and then with a 
property to be selected
+        // this should put messages at the head of the queue that don't match 
selection
+        ActiveMQTopic virtualTopic = new 
ActiveMQTopic("VirtualTopic.tempTopic");
+        sendMessages("BrokerA", virtualTopic, 1);
+
+        // close the consumer w/out consuming any messages; they'll be marked 
redelivered
+        selectingConsumer.close();
+
+        selectingConsumer = createConsumer("BrokerA", consumerQueue, "foo = 
'bar'");
+
+        sendMessages("BrokerA", virtualTopic, 1, asMap("foo", "bar"));
+
+
+        MessageIdList selectingConsumerMessages = 
getConsumerMessages("BrokerA", selectingConsumer);
+        selectingConsumerMessages.waitForMessagesToArrive(1, 1000L);
+
+        assertEquals(1, selectingConsumerMessages.getMessageCount());
+        selectingConsumerMessages.waitForMessagesToArrive(10, 1000L);
+        assertEquals(1, selectingConsumerMessages.getMessageCount());
+
+        // assert broker A stats
+        assertEquals(1, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getConsumers().size());
+        assertEquals(2, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(1, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getDequeues().getCount());
+        assertEquals(1, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount());
+
+    }
+
+    private MessageConsumer establishConsumer(String broker, 
ActiveMQDestination consumerQueue) throws Exception{
+        BrokerItem item = brokers.get(broker);
+        Connection c = item.createConnection();
+        c.start();
+        Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        return s.createConsumer(consumerQueue);
+    }
+
+    public void testSelectorsAndNonSelectors() throws Exception{
+        clearSelectorCacheFiles();
+        // borkerA is local and brokerB is remote
+        bridgeAndConfigureBrokers("BrokerA", "BrokerB");
+        startAllBrokers();
+        waitForBridgeFormation();
+
+
+        final BrokerService brokerA = brokers.get("BrokerA").broker;
+        final BrokerService brokerB = brokers.get("BrokerB").broker;
+
+        // Create the remote virtual topic consumer with selector
+        ActiveMQDestination consumerBQueue = 
createDestination("Consumer.B.VirtualTopic.tempTopic", false);
+
+        MessageConsumer selectingConsumer = createConsumer("BrokerB", 
consumerBQueue, "foo = 'bar'");
+        MessageConsumer nonSelectingConsumer = createConsumer("BrokerB", 
consumerBQueue);
+
+        // let advisories propogate
+        Wait.waitFor(new Wait.Condition() {
+            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return dest.getConsumers().size() == 2;
+            }
+        }, 500);
+
+
+        Destination destination = getDestination(brokerB, consumerBQueue);
+        assertEquals(2, destination.getConsumers().size());
+
+        // publisher publishes to this
+        ActiveMQTopic virtualTopic = new 
ActiveMQTopic("VirtualTopic.tempTopic");
+        sendMessages("BrokerA", virtualTopic, 10, asMap("foo", "bar"));
+        sendMessages("BrokerA", virtualTopic, 10);
+
+
+        MessageIdList selectingConsumerMessages = 
getConsumerMessages("BrokerB", selectingConsumer);
+
+
+        MessageIdList nonSelectingConsumerMessages = 
getConsumerMessages("BrokerB", nonSelectingConsumer);
+
+        // we only expect half of the messages that get sent with the 
selector, because they get load balanced
+        selectingConsumerMessages.waitForMessagesToArrive(5, 1000L);
+        assertEquals(5, selectingConsumerMessages.getMessageCount());
+
+        nonSelectingConsumerMessages.waitForMessagesToArrive(15, 1000L);
+        assertEquals(15, nonSelectingConsumerMessages.getMessageCount());
+
+        // assert broker A stats
+        assertEquals(20, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(20, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getDequeues().getCount());
+        assertEquals(0, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount());
+
+        // assert broker B stats
+        assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getDequeues().getCount());
+        assertEquals(0, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount());
+
+
+        //now let's close the consumer without the selector
+        nonSelectingConsumer.close();
+
+
+        // let advisories propogate
+        Wait.waitFor(new Wait.Condition() {
+            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return dest.getConsumers().size() == 1;
+            }
+        }, 500);
+
+        // and let's send messages with a selector that doesnt' match
+        selectingConsumerMessages.flushMessages();
+
+        sendMessages("BrokerA", virtualTopic, 10, asMap("ceposta", "redhat"));
+
+        selectingConsumerMessages = getConsumerMessages("BrokerB", 
selectingConsumer);
+        selectingConsumerMessages.waitForMessagesToArrive(1, 1000L);
+        assertEquals(0, selectingConsumerMessages.getMessageCount()) ;
+
+        // assert broker A stats
+        assertEquals(20, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(20, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getDequeues().getCount());
+        assertEquals(0, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount());
+
+        // assert broker B stats
+        assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getDequeues().getCount());
+        assertEquals(0, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount());
+
+        // now lets disconect the selecting consumer for a sec and send 
messages with a selector that DOES match
+        selectingConsumer.close();
+
+        // let advisories propogate
+        Wait.waitFor(new Wait.Condition() {
+            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return dest.getConsumers().size() == 0;
+            }
+        }, 500);
+
+        selectingConsumerMessages.flushMessages();
+
+        sendMessages("BrokerA", virtualTopic, 10, asMap("foo", "bar"));
+
+
+        // assert broker A stats
+        assertEquals(30, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(20, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getDequeues().getCount());
+        assertEquals(10, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount());
+
+        // assert broker B stats
+        assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getDequeues().getCount());
+        assertEquals(0, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount());
+
+        selectingConsumer = createConsumer("BrokerB", consumerBQueue, "foo = 
'bar'");
+        selectingConsumerMessages = getConsumerMessages("BrokerB", 
selectingConsumer);
+        selectingConsumerMessages.waitForMessagesToArrive(10);
+        assertEquals(10, selectingConsumerMessages.getMessageCount());
+
+        // let advisories propogate
+        Wait.waitFor(new Wait.Condition() {
+            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return dest.getConsumers().size() == 1;
+            }
+        }, 500);
+
+        // assert broker A stats
+        assertEquals(30, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(30, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getDequeues().getCount());
+        assertEquals(0, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount());
+
+        // assert broker B stats
+        assertEquals(30, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(30, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getDequeues().getCount());
+        assertEquals(0, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount());
+
+
+    }
+
+    public VirtualDestinationSelectorCacheViewMBean 
getVirtualDestinationSelectorCacheMBean(BrokerService broker)
+            throws MalformedObjectNameException {
+        ObjectName objectName = BrokerMBeanSupport
+                
.createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), 
"plugin", "virtualDestinationCache");
+        return 
(VirtualDestinationSelectorCacheViewMBean)broker.getManagementContext()
+                .newProxyInstance(objectName, 
VirtualDestinationSelectorCacheViewMBean.class, true);
+    }
+
+    public void testSelectorAwareForwarding() throws Exception {
+        clearSelectorCacheFiles();
+        // borkerA is local and brokerB is remote
+        bridgeAndConfigureBrokers("BrokerA", "BrokerB");
+        startAllBrokers();
+        waitForBridgeFormation();
+
+        final BrokerService brokerB = brokers.get("BrokerB").broker;
+        final BrokerService brokerA = brokers.get("BrokerA").broker;
+
+        // Create the remote virtual topic consumer with selector
+        MessageConsumer remoteConsumer = createConsumer("BrokerB",
+                createDestination("Consumer.B.VirtualTopic.tempTopic", false),
+                "foo = 'bar'");
+
+
+        // let advisories propogate
+        Wait.waitFor(new Wait.Condition() {
+            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return dest.getConsumers().size() == 1;
+            }
+        }, 500);
+
+        ActiveMQQueue queueB = new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
+        Destination destination = 
getDestination(brokers.get("BrokerB").broker, queueB);
+        assertEquals(1, destination.getConsumers().size());
+
+        ActiveMQTopic virtualTopic = new 
ActiveMQTopic("VirtualTopic.tempTopic");
+        assertNull(getDestination(brokers.get("BrokerA").broker, 
virtualTopic));
+        assertNull(getDestination(brokers.get("BrokerB").broker, 
virtualTopic));
+
+        // send two types of messages, one unwanted and the other wanted
+        sendMessages("BrokerA", virtualTopic, 1, asMap("foo", "bar"));
+        sendMessages("BrokerA", virtualTopic, 1, asMap("ceposta", "redhat"));
+
+        MessageIdList msgsB = getConsumerMessages("BrokerB", remoteConsumer);
+        // wait for the wanted one to arrive at the remote consumer
+        msgsB.waitForMessagesToArrive(1);
+
+        // ensure we don't get any more messages
+        msgsB.waitForMessagesToArrive(1, 1000);
+
+        // remote consumer should only get one of the messages
+        assertEquals(1, msgsB.getMessageCount());
+
+        // and the enqueue count for the remote queue should only be 1
+        assertEquals(1, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getEnqueues().getCount());
+
+
+        // now let's remove the consumer on broker B and recreate it with new 
selector
+        remoteConsumer.close();
+
+
+
+
+        // now let's shut down broker A and clear its persistent selector cache
+        brokerA.stop();
+        brokerA.waitUntilStopped();
+        deleteSelectorCacheFile("BrokerA");
+
+        assertEquals(0, destination.getConsumers().size());
+
+        remoteConsumer = createConsumer("BrokerB",
+                createDestination("Consumer.B.VirtualTopic.tempTopic", false),
+                "ceposta = 'redhat'");
+
+
+        assertEquals(1, destination.getConsumers().size());
+
+
+        // now let's start broker A back up
+        brokerA.start(true);
+        brokerA.waitUntilStarted();
+
+        System.out.println(brokerA.getNetworkConnectors());
+
+        // give a sec to let advisories propogate
+        // let advisories propogate
+        Wait.waitFor(new Wait.Condition() {
+            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return dest.getConsumers().size() == 1;
+            }
+        }, 500);
+
+
+        // send two types of messages, one unwanted and the other wanted
+        sendMessages("BrokerA", virtualTopic, 1, asMap("foo", "bar"));
+        sendMessages("BrokerB", virtualTopic, 1, asMap("foo", "bar"));
+        sendMessages("BrokerA", virtualTopic, 1, asMap("ceposta", "redhat"));
+        sendMessages("BrokerB", virtualTopic, 1, asMap("ceposta", "redhat"));
+
+        // lets get messages on consumer B
+        msgsB = getConsumerMessages("BrokerB", remoteConsumer);
+        msgsB.waitForMessagesToArrive(2);
+
+        // ensure we don't get any more messages
+        msgsB.waitForMessagesToArrive(1, 1000);
+
+
+        // remote consumer should only get 10 of the messages
+        assertEquals(2, msgsB.getMessageCount());
+
+
+        // queue should be drained
+        assertEquals(0, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getMessages().getCount());
+        // and the enqueue count for the remote queue should only be 1
+        assertEquals(3, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+                .getDestinationStatistics().getEnqueues().getCount());
+
+
+    }
+
+    private HashMap<String, Object> asMap(String key, Object value) {
+        HashMap<String, Object> rc = new HashMap<String,Object>(1);
+        rc.put(key, value);
+        return rc;
+    }
+
+
+
+    private void bridgeAndConfigureBrokers(String local, String remote)
+            throws Exception {
+        NetworkConnector bridge = bridgeBrokers(local, remote, false, 1, 
false);
+        bridge.setDecreaseNetworkConsumerPriority(true);
+        bridge.setDuplex(true);
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        String options = new String(
+                "?useJmx=false&deleteAllMessagesOnStartup=true");
+        createAndConfigureBroker(new URI(
+                "broker:(tcp://localhost:61616)/BrokerA" + options));
+        createAndConfigureBroker(new URI(
+                "broker:(tcp://localhost:61617)/BrokerB" + options));
+    }
+
+    private void clearSelectorCacheFiles() {
+        String[] brokerNames = new String[]{"BrokerA", "BrokerB"};
+        for (String brokerName : brokerNames) {
+            deleteSelectorCacheFile(brokerName);
+        }
+    }
+
+    private void deleteSelectorCacheFile(String brokerName) {
+        File brokerPersisteFile = new 
File(PERSIST_SELECTOR_CACHE_FILE_BASEPATH + brokerName);
+
+        if (brokerPersisteFile.exists()) {
+            brokerPersisteFile.delete();
+        }
+    }
+
+    private BrokerService createAndConfigureBroker(URI uri) throws Exception {
+        BrokerService broker = createBroker(uri);
+        broker.setUseJmx(true);
+        // Make topics "selectorAware"
+        VirtualTopic virtualTopic = new VirtualTopic();
+        virtualTopic.setSelectorAware(true);
+        VirtualDestinationInterceptor interceptor = new 
VirtualDestinationInterceptor();
+        interceptor
+                .setVirtualDestinations(new VirtualDestination[] { 
virtualTopic });
+        broker.setDestinationInterceptors(new DestinationInterceptor[] { 
interceptor });
+        configurePersistenceAdapter(broker);
+
+        SubQueueSelectorCacheBrokerPlugin selectorCacheBrokerPlugin = new 
SubQueueSelectorCacheBrokerPlugin();
+        selectorCacheBrokerPlugin.setSingleSelectorPerDestination(true);
+        File persisteFile = new File(PERSIST_SELECTOR_CACHE_FILE_BASEPATH + 
broker.getBrokerName());
+        selectorCacheBrokerPlugin.setPersistFile(persisteFile);
+        broker.setPlugins(new BrokerPlugin[]{selectorCacheBrokerPlugin});
+        return broker;
+    }
+
+    protected void configurePersistenceAdapter(BrokerService broker)
+            throws IOException {
+        File dataFileDir = new File("target/test-amq-data/kahadb/"
+                + broker.getBrokerName());
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(dataFileDir);
+        broker.setPersistenceAdapter(kaha);
+    }
+
+    class ProducerThreadTester extends ProducerThread {
+
+        private Set<String> selectors = new LinkedHashSet<String>();
+        private Map<String, AtomicInteger> selectorCounts = new 
HashMap<String, AtomicInteger>();
+        private Random rand = new Random(System.currentTimeMillis());
+
+
+        public ProducerThreadTester(Session session, javax.jms.Destination 
destination) {
+            super(session, destination);
+        }
+
+        @Override
+        protected Message createMessage(int i) throws Exception {
+            TextMessage msg = createTextMessage(this.session, "Message-" + i);
+            if (selectors.size() > 0) {
+                String value = getRandomKey();
+                msg.setStringProperty("SYMBOL", value);
+                AtomicInteger currentCount = selectorCounts.get(value);
+                currentCount.incrementAndGet();
+            }
+
+            return msg;
+        }
+
+        @Override
+        public void resetCounters() {
+            super.resetCounters();
+            for (String key : selectorCounts.keySet()) {
+                selectorCounts.put(key, new AtomicInteger(0));
+            }
+        }
+
+        private String getRandomKey() {
+            ArrayList<String> keys = new ArrayList(selectors);
+            return keys.get(rand.nextInt(keys.size()));
+        }
+
+        public void addMessageProperty(String value) {
+            if (!this.selectors.contains(value)) {
+                selectors.add(value);
+                selectorCounts.put(value, new AtomicInteger(0));
+            }
+        }
+
+        public int getCountForProperty(String key) {
+            return selectorCounts.get(key).get();
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/61da1faa/activemq-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java
index 7140a86..fcc7fe6 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java
@@ -135,7 +135,10 @@ public class MessageIdList extends Assert implements 
MessageListener {
         }
     }
 
-    public void waitForMessagesToArrive(int messageCount) {
+    public void waitForMessagesToArrive(int messageCount){
+        waitForMessagesToArrive(messageCount, maximumDuration);
+    }
+    public void waitForMessagesToArrive(int messageCount, long 
maximumDuration) {
         LOG.info("Waiting for " + messageCount + " message(s) to arrive");
 
         long start = System.currentTimeMillis();

Reply via email to