Author: rgodfrey
Date: Sun Dec 25 22:55:13 2016
New Revision: 1776037

URL: http://svn.apache.org/viewvc?rev=1776037&view=rev
Log:
QPID-6028 : allow binding destinations to be things other than queues

Added:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
   (with props)
Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
    
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
    
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
 Sun Dec 25 22:55:13 2016
@@ -24,6 +24,7 @@ import java.security.AccessControlExcept
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -49,9 +50,8 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Binding;
@@ -66,13 +66,9 @@ import org.apache.qpid.server.model.Publ
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.StateTransition;
-import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.FixedKeyMapCreator;
 import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -86,6 +82,8 @@ public abstract class AbstractExchange<T
 {
     private static final Logger _logger = 
LoggerFactory.getLogger(AbstractExchange.class);
 
+    private static final ThreadLocal<Map<AbstractExchange<?>, Set<String>>> 
CURRENT_ROUTING = new ThreadLocal<>();
+
     private static final FixedKeyMapCreator BIND_ARGUMENTS_CREATOR =
             new FixedKeyMapCreator("bindingKey", "destination", "arguments");
     private static final FixedKeyMapCreator UNBIND_ARGUMENTS_CREATOR =
@@ -469,131 +467,84 @@ public abstract class AbstractExchange<T
     }
 
 
-    final List<? extends BaseQueue> route(final ServerMessage message,
-                                          final String routingAddress,
-                                          final InstanceProperties 
instanceProperties)
-    {
-        _receivedMessageCount.incrementAndGet();
-        _receivedMessageSize.addAndGet(message.getSize());
-        List<? extends BaseQueue> queues = doRoute(message, routingAddress, 
instanceProperties);
-        List<? extends BaseQueue> allQueues = queues;
-
-        boolean deletedQueues = false;
-
-        for(BaseQueue q : allQueues)
-        {
-            if(q.isDeleted())
-            {
-                if(!deletedQueues)
-                {
-                    deletedQueues = true;
-                    queues = new ArrayList<>(allQueues);
-                }
-                _logger.debug("Exchange: {} - attempt to enqueue message onto 
deleted queue {}", getName(), q.getName());
-
-                queues.remove(q);
-            }
-        }
-
-
-        if(!queues.isEmpty())
-        {
-            _routedMessageCount.incrementAndGet();
-            _routedMessageSize.addAndGet(message.getSize());
-        }
-        else
-        {
-            _droppedMessageCount.incrementAndGet();
-            _droppedMessageSize.addAndGet(message.getSize());
-        }
-        return queues;
-    }
-
     @Override
-    public final  <M extends ServerMessage<? extends StorableMessageMetaData>> 
int send(final M message,
-                                                                               
         final String routingAddress,
-                                                                               
         final InstanceProperties instanceProperties,
-                                                                               
         final ServerTransaction txn,
-                                                                               
         final Action<? super MessageInstance> postEnqueueAction)
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> 
RoutingResult<M> route(final M message,
+                                                                               
                final String routingAddress,
+                                                                               
                final InstanceProperties instanceProperties)
     {
         if (_virtualHost.getState() != State.ACTIVE)
         {
             throw new VirtualHostUnavailableException(this._virtualHost);
         }
 
-        List<? extends BaseQueue> queues = route(message, routingAddress, 
instanceProperties);
+        final RoutingResult<M> routingResult = new RoutingResult<>(message);
 
-        if(queues == null || queues.isEmpty())
+        Map<AbstractExchange<?>, Set<String>> currentThreadMap = 
CURRENT_ROUTING.get();
+        boolean topLevel = currentThreadMap == null;
+        try
         {
-            Exchange altExchange = getAlternateExchange();
-            if(altExchange != null)
+            if (topLevel)
+            {
+                currentThreadMap = new HashMap<>();
+                CURRENT_ROUTING.set(currentThreadMap);
+            }
+            Set<String> existingRoutes = currentThreadMap.get(this);
+            if (existingRoutes == null)
+            {
+                currentThreadMap.put(this, 
Collections.singleton(routingAddress));
+            }
+            else if (existingRoutes.contains(routingAddress))
             {
-                return altExchange.send(message, routingAddress, 
instanceProperties, txn, postEnqueueAction);
+                return routingResult;
             }
             else
             {
-                return 0;
+                existingRoutes = new HashSet<>(existingRoutes);
+                existingRoutes.add(routingAddress);
+                currentThreadMap.put(this, existingRoutes);
             }
-        }
-        else
-        {
-            for(BaseQueue q : queues)
+
+            _receivedMessageCount.incrementAndGet();
+            _receivedMessageSize.addAndGet(message.getSize());
+
+            doRoute(message, routingAddress, instanceProperties, 
routingResult);
+
+            if (!routingResult.hasRoutes())
             {
-                if(!message.isResourceAcceptable(q))
+                Exchange altExchange = getAlternateExchange();
+                if (altExchange != null)
                 {
-                    return 0;
+                    routingResult.add(altExchange.route(message, 
routingAddress, instanceProperties));
                 }
             }
-            final BaseQueue[] baseQueues;
 
-            if(message.isReferenced())
+            if (routingResult.hasRoutes())
             {
-                ArrayList<BaseQueue> uniqueQueues = new 
ArrayList<>(queues.size());
-                for(BaseQueue q : queues)
-                {
-                    if(!message.isReferenced(q))
-                    {
-                        uniqueQueues.add(q);
-                    }
-                }
-                baseQueues = uniqueQueues.toArray(new 
BaseQueue[uniqueQueues.size()]);
+                _routedMessageCount.incrementAndGet();
+                _routedMessageSize.addAndGet(message.getSize());
             }
             else
             {
-                baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+                _droppedMessageCount.incrementAndGet();
+                _droppedMessageSize.addAndGet(message.getSize());
             }
 
-            txn.enqueue(queues,message, new ServerTransaction.EnqueueAction()
+            return routingResult;
+        }
+        finally
+        {
+            if(topLevel)
             {
-                MessageReference _reference = message.newReference();
-
-                public void postCommit(MessageEnqueueRecord... records)
-                {
-                    try
-                    {
-                        for(int i = 0; i < baseQueues.length; i++)
-                        {
-                            baseQueues[i].enqueue(message, postEnqueueAction, 
records[i]);
-                        }
-                    }
-                    finally
-                    {
-                        _reference.release();
-                    }
-                }
-
-                public void onRollback()
-                {
-                    _reference.release();
-                }
-            });
-            return queues.size();
+                CURRENT_ROUTING.set(null);
+            }
         }
     }
 
-    protected abstract List<? extends BaseQueue> doRoute(final ServerMessage 
message,
-                                                         final String 
routingAddress,
-                                                         final 
InstanceProperties instanceProperties);
+
+    protected abstract <M extends ServerMessage<? extends 
StorableMessageMetaData>> void doRoute(final M message,
+                                    final String routingAddress,
+                                    final InstanceProperties 
instanceProperties,
+                                    final RoutingResult<M> result);
 
     @Override
     public boolean bind(final String destination,

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
 Sun Dec 25 22:55:13 2016
@@ -24,8 +24,8 @@ import java.util.Map;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Exchange;
@@ -37,8 +37,6 @@ import org.apache.qpid.server.security.R
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public class DefaultDestination implements MessageDestination, 
PermissionedObject
@@ -94,43 +92,43 @@ public class DefaultDestination implemen
     }
 
 
-    public final  <M extends ServerMessage<? extends StorableMessageMetaData>> 
int send(final M message,
-                                                                               
         String routingAddress,
-                                                                               
         final InstanceProperties instanceProperties,
-                                                                               
         final ServerTransaction txn,
-                                                                               
         final Action<? super MessageInstance> postEnqueueAction)
+    @Override
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> 
RoutingResult<M> route(M message,
+                                                                               
                String routingAddress,
+                                                                               
                InstanceProperties instanceProperties)
     {
-        if(routingAddress == null || routingAddress.trim().equals(""))
-        {
-            return 0;
-        }
-        final MessageDestination dest = 
_virtualHost.getAttainedMessageDestination(routingAddress);
-        if(dest == null)
+        RoutingResult<M> result = new RoutingResult<>(message);
+
+        if (routingAddress != null && !routingAddress.trim().equals(""))
         {
-            routingAddress = _virtualHost.getLocalAddress(routingAddress);
-            if(routingAddress.contains("/") && !routingAddress.startsWith("/"))
+            final MessageDestination dest = 
_virtualHost.getAttainedMessageDestination(routingAddress);
+            if (dest == null)
             {
-                String[] parts = routingAddress.split("/",2);
-                Exchange<?> exchange = 
_virtualHost.getAttainedChildFromAddress(Exchange.class, parts[0]);
-                if(exchange != null)
+                routingAddress = _virtualHost.getLocalAddress(routingAddress);
+                if (routingAddress.contains("/") && 
!routingAddress.startsWith("/"))
                 {
-                    return exchange.send(message, parts[1], 
instanceProperties, txn, postEnqueueAction);
+                    String[] parts = routingAddress.split("/", 2);
+                    Exchange<?> exchange = 
_virtualHost.getAttainedChildFromAddress(Exchange.class, parts[0]);
+                    if (exchange != null)
+                    {
+                        result.add(exchange.route(message, parts[1], 
instanceProperties));
+                    }
                 }
-            }
-            else if(!routingAddress.contains("/"))
-            {
-                Exchange<?> exchange = 
_virtualHost.getAttainedChildFromAddress(Exchange.class, routingAddress);
-                if(exchange != null)
+                else if (!routingAddress.contains("/"))
                 {
-                    return exchange.send(message, "", instanceProperties, txn, 
postEnqueueAction);
+                    Exchange<?> exchange = 
_virtualHost.getAttainedChildFromAddress(Exchange.class, routingAddress);
+                    if (exchange != null)
+                    {
+                        result.add(exchange.route(message, "", 
instanceProperties));
+                    }
                 }
             }
-            return 0;
-        }
-        else
-        {
-            return dest.send(message, routingAddress, instanceProperties, txn, 
postEnqueueAction);
+            else
+            {
+                result.add(dest.route(message, routingAddress, 
instanceProperties));
+            }
         }
+        return result;
     }
 
     @Override

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
 Sun Dec 25 22:55:13 2016
@@ -20,11 +20,9 @@
  */
 package org.apache.qpid.server.exchange;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -39,10 +37,11 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> 
implements DirectExchange<DirectExchangeImpl>
@@ -52,8 +51,8 @@ public class DirectExchangeImpl extends
 
     private final class BindingSet
     {
-        private final Set<BaseQueue> _unfilteredQueues;
-        private final Map<BaseQueue, FilterManager> _filteredQueues;
+        private final Set<MessageDestination> _unfilteredQueues;
+        private final Map<MessageDestination, FilterManager> _filteredQueues;
 
         public BindingSet()
         {
@@ -61,14 +60,14 @@ public class DirectExchangeImpl extends
             _filteredQueues = Collections.emptyMap();
         }
 
-        private BindingSet(final Set<BaseQueue> unfilteredQueues,
-                           final Map<BaseQueue, FilterManager> filteredQueues)
+        private BindingSet(final Set<MessageDestination> unfilteredQueues,
+                           final Map<MessageDestination, FilterManager> 
filteredQueues)
         {
             _unfilteredQueues = unfilteredQueues;
             _filteredQueues = filteredQueues;
         }
 
-        public Set<BaseQueue> getUnfilteredQueues()
+        public Set<MessageDestination> getUnfilteredQueues()
         {
             return _unfilteredQueues;
         }
@@ -83,7 +82,7 @@ public class DirectExchangeImpl extends
             return _unfilteredQueues.isEmpty() && _filteredQueues.isEmpty();
         }
 
-        public Map<BaseQueue,FilterManager> getFilteredQueues()
+        public Map<MessageDestination,FilterManager> getFilteredQueues()
         {
             return _filteredQueues;
         }
@@ -98,8 +97,8 @@ public class DirectExchangeImpl extends
             {
                 try
                 {
-                    Set<BaseQueue> unfilteredQueues;
-                    Map<BaseQueue, FilterManager> filteredQueues;
+                    Set<MessageDestination> unfilteredQueues;
+                    Map<MessageDestination, FilterManager> filteredQueues;
                     if (_unfilteredQueues.contains(destination))
                     {
                         unfilteredQueues = new HashSet<>(_unfilteredQueues);
@@ -111,7 +110,7 @@ public class DirectExchangeImpl extends
                     }
 
                     filteredQueues = new HashMap<>(_filteredQueues);
-                    filteredQueues.put((BaseQueue) destination,
+                    filteredQueues.put(destination,
                                        
FilterSupport.createMessageFilter(arguments, (Queue<?>) destination));
 
                     return new 
BindingSet(Collections.unmodifiableSet(unfilteredQueues), 
Collections.unmodifiableMap(filteredQueues));
@@ -128,8 +127,8 @@ public class DirectExchangeImpl extends
             }
             else
             {
-                Set<BaseQueue> unfilteredQueues;
-                Map<BaseQueue, FilterManager> filteredQueues;
+                Set<MessageDestination> unfilteredQueues;
+                Map<MessageDestination, FilterManager> filteredQueues;
                 if (_filteredQueues.containsKey(destination))
                 {
                     filteredQueues = new HashMap<>(_filteredQueues);
@@ -141,7 +140,7 @@ public class DirectExchangeImpl extends
                 }
 
                 unfilteredQueues = new HashSet<>(_unfilteredQueues);
-                unfilteredQueues.add((BaseQueue)destination);
+                unfilteredQueues.add(destination);
 
                 return new 
BindingSet(Collections.unmodifiableSet(unfilteredQueues), 
Collections.unmodifiableMap(filteredQueues));
 
@@ -150,8 +149,8 @@ public class DirectExchangeImpl extends
 
         public BindingSet removeBinding(final MessageDestination destination)
         {
-            Set<BaseQueue> unfilteredQueues;
-            Map<BaseQueue, FilterManager> filteredQueues;
+            Set<MessageDestination> unfilteredQueues;
+            Map<MessageDestination, FilterManager> filteredQueues;
             if (_unfilteredQueues.contains(destination))
             {
                 unfilteredQueues = new HashSet<>(_unfilteredQueues);
@@ -182,49 +181,43 @@ public class DirectExchangeImpl extends
         super(attributes, vhost);
     }
 
+
     @Override
-    public List<? extends BaseQueue> doRoute(ServerMessage payload,
-                                             final String routingKey,
-                                             final InstanceProperties 
instanceProperties)
+    public  <M extends ServerMessage<? extends StorableMessageMetaData>> void 
doRoute(final M payload,
+                                                                               
       final String routingKey,
+                                                                               
       final InstanceProperties instanceProperties,
+                                                                               
       final RoutingResult<M> result)
     {
 
         BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : 
routingKey);
 
         if(bindings != null)
         {
-            // TODO - remove this garbage generation
-            List<BaseQueue> queues = new 
ArrayList<>(bindings.getUnfilteredQueues());
+            final Set<MessageDestination> unfilteredQueues = 
bindings.getUnfilteredQueues();
+            for(MessageDestination destination : unfilteredQueues)
+            {
+                result.add(destination.route(payload, routingKey, 
instanceProperties));
+            }
 
             if(bindings.hasFilteredQueues())
             {
-                Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues);
                 Filterable filterable = 
Filterable.Factory.newInstance(payload, instanceProperties);
 
-                Map<BaseQueue, FilterManager> filteredQueues = 
bindings.getFilteredQueues();
-                for(Map.Entry<BaseQueue, FilterManager> entry : 
filteredQueues.entrySet())
+                Map<MessageDestination, FilterManager> filteredQueues = 
bindings.getFilteredQueues();
+                for(Map.Entry<MessageDestination, FilterManager> entry : 
filteredQueues.entrySet())
                 {
-                    if(!queuesSet.contains(entry.getKey()))
+                    if(!unfilteredQueues.contains(entry.getKey()))
                     {
                         FilterManager filter = entry.getValue();
                         if(filter.allAllow(filterable))
                         {
-                            queuesSet.add(entry.getKey());
+                            result.add(entry.getKey().route(payload, 
routingKey, instanceProperties));
                         }
                     }
                 }
-                if(queues.size() != queuesSet.size())
-                {
-                    queues = new ArrayList<>(queuesSet);
-                }
             }
-            return queues;
-        }
-        else
-        {
-            return Collections.emptyList();
         }
 
-
     }
 
     @Override

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
 Sun Dec 25 22:55:13 2016
@@ -35,10 +35,11 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> 
implements FanoutExchange<FanoutExchangeImpl>
@@ -51,15 +52,15 @@ class FanoutExchangeImpl extends Abstrac
     {
         private final Map<MessageDestination,Integer> _queues;
 
-        private final List<Queue<?>> _unfilteredQueues;
-        private final List<Queue<?>> _filteredQueues;
+        private final List<MessageDestination> _unfilteredQueues;
+        private final List<MessageDestination> _filteredQueues;
 
-        private final Map<Queue<?>,Map<BindingIdentifier, FilterManager>> 
_filteredBindings;
+        private final Map<MessageDestination,Map<BindingIdentifier, 
FilterManager>> _filteredBindings;
 
         public BindingSet(final Map<MessageDestination, Integer> queues,
-                          final List<Queue<?>> unfilteredQueues,
-                          final List<Queue<?>> filteredQueues,
-                          final Map<Queue<?>, Map<BindingIdentifier, 
FilterManager>> filteredBindings)
+                          final List<MessageDestination> unfilteredQueues,
+                          final List<MessageDestination> filteredQueues,
+                          final Map<MessageDestination, Map<BindingIdentifier, 
FilterManager>> filteredBindings)
         {
             _queues = queues;
             _unfilteredQueues = unfilteredQueues;
@@ -81,19 +82,19 @@ class FanoutExchangeImpl extends Abstrac
                 {
                     try
                     {
-                        List<Queue<?>> filteredQueues;
+                        List<MessageDestination> filteredQueues;
                         if 
(!(_filteredQueues.contains(binding.getDestination())
                               || 
_unfilteredQueues.contains(binding.getDestination())))
                         {
                             filteredQueues = new ArrayList<>(_filteredQueues);
-                            filteredQueues.add((Queue<?>) 
binding.getDestination());
+                            filteredQueues.add(binding.getDestination());
                             filteredQueues = 
Collections.unmodifiableList(filteredQueues);
                         }
                         else
                         {
                             filteredQueues = _filteredQueues;
                         }
-                        Map<Queue<?>, Map<BindingIdentifier, FilterManager>> 
filteredBindings =
+                        Map<MessageDestination, Map<BindingIdentifier, 
FilterManager>> filteredBindings =
                                 new HashMap<>(_filteredBindings);
                         Map<BindingIdentifier, FilterManager> bindingsForQueue 
=
                                 filteredBindings.get(binding.getDestination());
@@ -107,8 +108,8 @@ class FanoutExchangeImpl extends Abstrac
                         }
                         bindingsForQueue.put(binding,
                                              
FilterSupport.createMessageFilter(arguments,
-                                                                               
(Queue<?>) binding.getDestination()));
-                        filteredBindings.put((Queue<?>) 
binding.getDestination(), bindingsForQueue);
+                                                                               
binding.getDestination()));
+                        filteredBindings.put(binding.getDestination(), 
bindingsForQueue);
                         return new BindingSet(_queues, _unfilteredQueues, 
filteredQueues, Collections.unmodifiableMap(filteredBindings));
                     }
                     catch (AMQInvalidArgumentException e)
@@ -122,8 +123,8 @@ class FanoutExchangeImpl extends Abstrac
                 else
                 {
                     Map<MessageDestination, Integer> queues = new 
HashMap<>(_queues);
-                    List<Queue<?>> unfilteredQueues;
-                    List<Queue<?>> filteredQueues;
+                    List<MessageDestination> unfilteredQueues;
+                    List<MessageDestination> filteredQueues;
                     if (queues.containsKey(binding.getDestination()))
                     {
                         queues.put(binding.getDestination(), 
queues.get(binding.getDestination()) + 1);
@@ -162,10 +163,10 @@ class FanoutExchangeImpl extends Abstrac
             Queue<?> queue = (Queue<?>) binding.getDestination();
             if(_filteredBindings.containsKey(queue) && 
_filteredBindings.get(queue).containsKey(binding))
             {
-                final Map<Queue<?>, Map<BindingIdentifier, FilterManager>> 
filteredBindings = new HashMap<>(_filteredBindings);
+                final Map<MessageDestination, Map<BindingIdentifier, 
FilterManager>> filteredBindings = new HashMap<>(_filteredBindings);
                 final Map<BindingIdentifier, FilterManager> bindingsForQueue = 
new HashMap<>(filteredBindings.remove(queue));
                 bindingsForQueue.remove(binding);
-                List<Queue<?>> filteredQueues;
+                List<MessageDestination> filteredQueues;
                 if(bindingsForQueue.isEmpty())
                 {
                     filteredQueues = new ArrayList<>(_filteredQueues);
@@ -183,8 +184,8 @@ class FanoutExchangeImpl extends Abstrac
             {
                 Map<MessageDestination, Integer> queues = new 
HashMap<>(_queues);
                 int count = queues.remove(queue);
-                List<Queue<?>> unfilteredQueues;
-                List<Queue<?>> filteredQueues;
+                List<MessageDestination> unfilteredQueues;
+                List<MessageDestination> filteredQueues;
                 if(count > 1)
                 {
                     queues.put(queue, --count);
@@ -230,28 +231,29 @@ class FanoutExchangeImpl extends Abstrac
     }
 
     @Override
-    public ArrayList<BaseQueue> doRoute(ServerMessage payload,
-                                        final String routingKey,
-                                        final InstanceProperties 
instanceProperties)
+    protected <M extends ServerMessage<? extends StorableMessageMetaData>> 
void doRoute(final M message,
+                                                                               
         final String routingAddress,
+                                                                               
         final InstanceProperties instanceProperties,
+                                                                               
         final RoutingResult<M> result)
     {
-
         BindingSet bindingSet = _bindingSet;
-        final ArrayList<BaseQueue> result = new 
ArrayList<BaseQueue>(bindingSet._unfilteredQueues);
-
-
-        final Map<Queue<?>, Map<BindingIdentifier, FilterManager>> 
filteredBindings = bindingSet._filteredBindings;
+        for(MessageDestination destination : bindingSet._unfilteredQueues)
+        {
+            result.add(destination.route(message, routingAddress, 
instanceProperties));
+        }
+        final Map<MessageDestination, Map<BindingIdentifier, FilterManager>> 
filteredBindings = bindingSet._filteredBindings;
         if(!bindingSet._filteredQueues.isEmpty())
         {
-            for(Queue<?> q : bindingSet._filteredQueues)
+            for(MessageDestination q : bindingSet._filteredQueues)
             {
                 final Map<BindingIdentifier, FilterManager> 
bindingMessageFilterMap = filteredBindings.get(q);
-                if(!(bindingMessageFilterMap == null || result.contains(q)))
+                if(!(bindingMessageFilterMap == null || 
bindingSet._unfilteredQueues.contains(q)))
                 {
                     for(FilterManager filter : 
bindingMessageFilterMap.values())
                     {
-                        
if(filter.allAllow(Filterable.Factory.newInstance(payload, instanceProperties)))
+                        
if(filter.allAllow(Filterable.Factory.newInstance(message, instanceProperties)))
                         {
-                            result.add(q);
+                            result.add(q.route(message, routingAddress, 
instanceProperties));
                             break;
                         }
                     }
@@ -261,10 +263,6 @@ class FanoutExchangeImpl extends Abstrac
         }
 
 
-        _logger.debug("Publishing message to queue {}", result);
-
-        return result;
-
     }
 
     @Override

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
 Sun Dec 25 22:55:13 2016
@@ -20,9 +20,7 @@
  */
 package org.apache.qpid.server.exchange;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedHashSet;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -35,10 +33,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 /**
@@ -86,14 +85,13 @@ public class HeadersExchangeImpl extends
     }
 
     @Override
-    public ArrayList<BaseQueue> doRoute(ServerMessage payload,
-                                        final String routingKey,
-                                        final InstanceProperties 
instanceProperties)
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> void 
doRoute(M payload,
+                                                                               
      final String routingKey,
+                                                                               
      final InstanceProperties instanceProperties,
+                                                                               
      RoutingResult<M> routingResult)
     {
         _logger.debug("Exchange {}: routing message with headers {}", 
getName(), payload.getMessageHeader());
 
-        LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>();
-
         for (HeadersBinding hb : _bindingHeaderMatchers)
         {
             if 
(hb.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
@@ -106,13 +104,12 @@ public class HeadersExchangeImpl extends
                     _logger.debug("Exchange " + getName() + ": delivering 
message with headers " +
                                   payload.getMessageHeader() + " to " + 
b.getDestination().getName());
                 }
-                queues.add((BaseQueue) b.getDestination());
+                routingResult.add(b.getDestination().route(payload, 
routingKey, instanceProperties));
             }
         }
-
-        return new ArrayList<>(queues);
     }
 
+
     @Override
     protected void onBind(final BindingIdentifier binding, Map<String,Object> 
arguments)
     {

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
 Sun Dec 25 22:55:13 2016
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.exchange;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -39,11 +38,12 @@ import org.apache.qpid.server.filter.AMQ
 import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
@@ -68,9 +68,9 @@ class TopicExchangeImpl extends Abstract
     protected synchronized void onBindingUpdated(final BindingIdentifier 
binding, final Map<String, Object> newArguments)
     {
         final String bindingKey = binding.getBindingKey();
-        Queue<?> queue = (Queue<?>) binding.getDestination();
+        final MessageDestination destination = binding.getDestination();
 
-        _logger.debug("Updating binding of queue {} with routing key {}", 
queue.getName(), bindingKey);
+        _logger.debug("Updating binding of queue {} with routing key {}", 
destination.getName(), bindingKey);
 
 
         String routingKey = TopicNormalizer.normalize(bindingKey);
@@ -87,22 +87,22 @@ class TopicExchangeImpl extends Abstract
                 {
                     if (FilterSupport.argumentsContainFilter(oldArgs))
                     {
-                        result.replaceQueueFilter(queue,
-                                                  
FilterSupport.createMessageFilter(oldArgs, queue),
-                                                  
FilterSupport.createMessageFilter(newArguments, queue));
+                        result.replaceQueueFilter(destination,
+                                                  
FilterSupport.createMessageFilter(oldArgs, destination),
+                                                  
FilterSupport.createMessageFilter(newArguments, destination));
                     }
                     else
                     {
-                        result.addFilteredQueue(queue, 
FilterSupport.createMessageFilter(newArguments, queue));
-                        result.removeUnfilteredQueue(queue);
+                        result.addFilteredQueue(destination, 
FilterSupport.createMessageFilter(newArguments, destination));
+                        result.removeUnfilteredQueue(destination);
                     }
                 }
                 else
                 {
                     if (FilterSupport.argumentsContainFilter(oldArgs))
                     {
-                        result.addUnfilteredQueue(queue);
-                        result.removeFilteredQueue(queue, 
FilterSupport.createMessageFilter(oldArgs, queue));
+                        result.addUnfilteredQueue(destination);
+                        result.removeFilteredQueue(destination, 
FilterSupport.createMessageFilter(oldArgs, destination));
                     }
                     else
                     {
@@ -204,39 +204,25 @@ class TopicExchangeImpl extends Abstract
     }
 
     @Override
-    public ArrayList<BaseQueue> doRoute(ServerMessage payload,
-                                        final String routingAddress,
-                                        final InstanceProperties 
instanceProperties)
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> void 
doRoute(M payload,
+                                                                               
      String routingAddress,
+                                                                               
      InstanceProperties instanceProperties,
+                                                                               
      RoutingResult<M> result)
     {
-
         final String routingKey = routingAddress == null
-                                          ? ""
-                                          : routingAddress;
+                ? ""
+                : routingAddress;
 
-        final Collection<Queue<?>> matchedQueues =
+        final Collection<MessageDestination> matchedQueues =
                 
getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), 
routingKey);
 
-        ArrayList<BaseQueue> queues;
-
-        if(matchedQueues.getClass() == ArrayList.class)
-        {
-            queues = (ArrayList) matchedQueues;
-        }
-        else
-        {
-            queues = new ArrayList<BaseQueue>();
-            queues.addAll(matchedQueues);
-        }
-
-        if(queues == null || queues.isEmpty())
+        for(MessageDestination queue : matchedQueues)
         {
-            _logger.info("Message routing key: " + routingAddress + " No 
routes.");
+            result.add(queue.route(payload, routingAddress, 
instanceProperties));
         }
-
-        return queues;
-
     }
 
+
     private synchronized boolean deregisterQueue(final BindingIdentifier 
binding)
     {
         if(_bindings.containsKey(binding))
@@ -274,7 +260,7 @@ class TopicExchangeImpl extends Abstract
         }
     }
 
-    private Collection<Queue<?>> getMatchedQueues(Filterable message, String 
routingKey)
+    private Collection<MessageDestination> getMatchedQueues(Filterable 
message, String routingKey)
     {
 
         Collection<TopicMatcherResult> results = _parser.parse(routingKey);
@@ -287,7 +273,7 @@ class TopicExchangeImpl extends Abstract
                 results.toArray(resultQueues);
                 return 
((TopicExchangeResult)resultQueues[0]).processMessage(message, null);
             default:
-                Collection<Queue<?>> queues = new HashSet<>();
+                Collection<MessageDestination> queues = new HashSet<>();
                 for(TopicMatcherResult result : results)
                 {
                     TopicExchangeResult res = (TopicExchangeResult)result;

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
 Sun Dec 25 22:55:13 2016
@@ -33,22 +33,22 @@ import java.util.concurrent.CopyOnWriteA
 import org.apache.qpid.server.exchange.AbstractExchange;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.message.MessageDestination;
 
 public final class TopicExchangeResult implements TopicMatcherResult
 {
     private final List<AbstractExchange.BindingIdentifier> _bindings = new 
CopyOnWriteArrayList<>();
-    private final Map<Queue<?>, Integer> _unfilteredQueues = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<Queue<?>, Map<FilterManager,Integer>> 
_filteredQueues = new ConcurrentHashMap<>();
-    private volatile ArrayList<Queue<?>> _unfilteredQueueList = new 
ArrayList<>(0);
+    private final Map<MessageDestination, Integer> _unfilteredQueues = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<MessageDestination, 
Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>();
+    private volatile ArrayList<MessageDestination> _unfilteredQueueList = new 
ArrayList<>(0);
 
-    public void addUnfilteredQueue(Queue<?> queue)
+    public void addUnfilteredQueue(MessageDestination queue)
     {
         Integer instances = _unfilteredQueues.get(queue);
         if(instances == null)
         {
             _unfilteredQueues.put(queue, 1);
-            ArrayList<Queue<?>> newList = new 
ArrayList<>(_unfilteredQueueList);
+            ArrayList<MessageDestination> newList = new 
ArrayList<>(_unfilteredQueueList);
             newList.add(queue);
             _unfilteredQueueList = newList;
         }
@@ -58,13 +58,13 @@ public final class TopicExchangeResult i
         }
     }
 
-    public void removeUnfilteredQueue(Queue<?> queue)
+    public void removeUnfilteredQueue(MessageDestination queue)
     {
         Integer instances = _unfilteredQueues.get(queue);
         if(instances == 1)
         {
             _unfilteredQueues.remove(queue);
-            ArrayList<Queue<?>> newList = new 
ArrayList<>(_unfilteredQueueList);
+            ArrayList<MessageDestination> newList = new 
ArrayList<>(_unfilteredQueueList);
             newList.remove(queue);
             _unfilteredQueueList = newList;
 
@@ -76,7 +76,7 @@ public final class TopicExchangeResult i
 
     }
 
-    public Collection<Queue<?>> getUnfilteredQueues()
+    public Collection<MessageDestination> getUnfilteredQueues()
     {
         return _unfilteredQueues.keySet();
     }
@@ -96,7 +96,7 @@ public final class TopicExchangeResult i
         return new ArrayList<>(_bindings);
     }
 
-    public void addFilteredQueue(Queue<?> queue, FilterManager filter)
+    public void addFilteredQueue(MessageDestination queue, FilterManager 
filter)
     {
         Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
         if(filters == null)
@@ -116,7 +116,7 @@ public final class TopicExchangeResult i
 
     }
 
-    public void removeFilteredQueue(Queue<?> queue, FilterManager filter)
+    public void removeFilteredQueue(MessageDestination queue, FilterManager 
filter)
     {
         Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
         if(filters != null)
@@ -142,7 +142,7 @@ public final class TopicExchangeResult i
 
     }
 
-    public void replaceQueueFilter(Queue<?> queue,
+    public void replaceQueueFilter(MessageDestination queue,
                                    FilterManager oldFilter,
                                    FilterManager newFilter)
     {
@@ -169,7 +169,7 @@ public final class TopicExchangeResult i
         _filteredQueues.put(queue,newFilters);
     }
 
-    public Collection<Queue<?>> processMessage(Filterable msg, 
Collection<Queue<?>> queues)
+    public Collection<MessageDestination> processMessage(Filterable msg, 
Collection<MessageDestination> queues)
     {
         if(queues == null)
         {
@@ -179,18 +179,18 @@ public final class TopicExchangeResult i
             }
             else
             {
-                queues = new HashSet<Queue<?>>();
+                queues = new HashSet<>();
             }
         }
         else if(!(queues instanceof Set))
         {
-            queues = new HashSet<Queue<?>>(queues);
+            queues = new HashSet<>(queues);
         }
 
         queues.addAll(_unfilteredQueues.keySet());
         if(!_filteredQueues.isEmpty())
         {
-            for(Map.Entry<Queue<?>, Map<FilterManager, Integer>> entry : 
_filteredQueues.entrySet())
+            for(Map.Entry<MessageDestination, Map<FilterManager, Integer>> 
entry : _filteredQueues.entrySet())
             {
                 if(!queues.contains(entry.getKey()))
                 {

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
 Sun Dec 25 22:55:13 2016
@@ -31,6 +31,7 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.queue.QueueConsumer;
@@ -94,13 +95,13 @@ public class FilterSupport
                        && 
((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 
0;
     }
 
-    public static FilterManager createMessageFilter(final Map<String,Object> 
args, Queue<?> queue) throws AMQInvalidArgumentException
+    public static FilterManager createMessageFilter(final Map<String,Object> 
args, MessageDestination queue) throws AMQInvalidArgumentException
     {
         FilterManager filterManager = null;
-        if(argumentsContainNoLocal(args))
+        if(argumentsContainNoLocal(args) && queue instanceof Queue)
         {
             filterManager = new FilterManager();
-            filterManager.add(AMQPFilterTypes.NO_LOCAL.toString(), new 
NoLocalFilter(queue));
+            filterManager.add(AMQPFilterTypes.NO_LOCAL.toString(), new 
NoLocalFilter((Queue<?>) queue));
         }
 
         if(argumentsContainJMSSelector(args))

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
 Sun Dec 25 22:55:13 2016
@@ -27,8 +27,6 @@ import org.apache.qpid.server.model.Name
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
 
 public interface MessageDestination extends MessageNode
 {
@@ -46,15 +44,10 @@ public interface MessageDestination exte
      * @param message the message to be routed
      * @param routingAddress
      * @param instanceProperties the instance properties
-     * @param txn the transaction to enqueue within
-     * @param postEnqueueAction action to perform on the result of every 
enqueue (may be null)
-     * @return the number of queues in which the message was enqueued performed
-     */
-    <M extends ServerMessage<? extends StorableMessageMetaData>> int send(M 
message,
-                                                                          
final String routingAddress,
-                                                                          
InstanceProperties instanceProperties,
-                                                                          
ServerTransaction txn,
-                                                                          
Action<? super MessageInstance> postEnqueueAction);
+    */
+    <M extends ServerMessage<? extends StorableMessageMetaData>> 
RoutingResult<M> route(M message,
+                                                                               
         String routingAddress,
+                                                                               
         InstanceProperties instanceProperties);
 
     boolean isDurable();
 

Added: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java?rev=1776037&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
 (added)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
 Sun Dec 25 22:55:13 2016
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+
+public class RoutingResult<M extends ServerMessage<? extends 
StorableMessageMetaData>>
+{
+    private static final Logger _logger = 
LoggerFactory.getLogger(RoutingResult.class);
+
+    private final M _message;
+
+    private final Set<BaseQueue> _queues = new HashSet<>();
+
+    public RoutingResult(final M message)
+    {
+        _message = message;
+    }
+
+    public void addQueue(BaseQueue q)
+    {
+        if(q.isDeleted())
+        {
+            _logger.debug("Attempt to enqueue message onto deleted queue {}",  
q.getName());
+        }
+        else
+        {
+            _queues.add(q);
+        }
+    }
+
+    public void addQueues(Collection<? extends BaseQueue> queues)
+    {
+        boolean deletedQueues = false;
+        for(BaseQueue q : queues)
+        {
+            if(q.isDeleted())
+            {
+                if(!deletedQueues)
+                {
+                    deletedQueues = true;
+                    queues = new ArrayList<>(queues);
+                }
+                _logger.debug("Attempt to enqueue message onto deleted queue 
{}",  q.getName());
+
+                queues.remove(q);
+            }
+        }
+
+        _queues.addAll(queues);
+    }
+
+    public void add(RoutingResult<M> result)
+    {
+        addQueues(result._queues);
+    }
+
+    public int send(ServerTransaction txn,
+                    final Action<? super MessageInstance> postEnqueueAction)
+    {
+        for(BaseQueue q : _queues)
+        {
+            if(!_message.isResourceAcceptable(q))
+            {
+                return 0;
+            }
+        }
+        final BaseQueue[] baseQueues;
+
+        if(_message.isReferenced())
+        {
+            ArrayList<BaseQueue> uniqueQueues = new 
ArrayList<>(_queues.size());
+            for(BaseQueue q : _queues)
+            {
+                if(!_message.isReferenced(q))
+                {
+                    uniqueQueues.add(q);
+                }
+            }
+            baseQueues = uniqueQueues.toArray(new 
BaseQueue[uniqueQueues.size()]);
+        }
+        else
+        {
+            baseQueues = _queues.toArray(new BaseQueue[_queues.size()]);
+        }
+        txn.enqueue(_queues, _message, new ServerTransaction.EnqueueAction()
+        {
+            MessageReference _reference = _message.newReference();
+
+            public void postCommit(MessageEnqueueRecord... records)
+            {
+                try
+                {
+                    for(int i = 0; i < baseQueues.length; i++)
+                    {
+                        baseQueues[i].enqueue(_message, postEnqueueAction, 
records[i]);
+                    }
+                }
+                finally
+                {
+                    _reference.release();
+                }
+            }
+
+            public void onRollback()
+            {
+                _reference.release();
+            }
+        });
+        return _queues.size();
+    }
+
+    public boolean hasRoutes()
+    {
+        return !_queues.isEmpty();
+    }
+}

Propchange: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Sun Dec 25 22:55:13 2016
@@ -91,6 +91,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.*;
@@ -2567,46 +2568,21 @@ public abstract class AbstractQueue<X ex
         _notificationListener = listener == null ? NULL_NOTIFICATION_LISTENER 
: listener;
     }
 
-    public final  <M extends ServerMessage<? extends StorableMessageMetaData>> 
int send(final M message,
-                                                                               
         final String routingAddress,
-                                                                               
         final InstanceProperties instanceProperties,
-                                                                               
         final ServerTransaction txn,
-                                                                               
         final Action<? super MessageInstance> postEnqueueAction)
+    @Override
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> 
RoutingResult<M> route(final M message,
+                                                                               
                final String routingAddress,
+                                                                               
                final InstanceProperties instanceProperties)
     {
         if (_virtualHost.getState() != State.ACTIVE)
         {
             throw new VirtualHostUnavailableException(this._virtualHost);
         }
+        RoutingResult<M> result = new RoutingResult<>(message);
         if(message.isResourceAcceptable(this) && !message.isReferenced(this))
         {
-            txn.enqueue(this, message, new ServerTransaction.EnqueueAction()
-            {
-                MessageReference _reference = message.newReference();
-
-                public void postCommit(MessageEnqueueRecord... records)
-                {
-                    try
-                    {
-                        AbstractQueue.this.enqueue(message, postEnqueueAction, 
records[0]);
-                    }
-                    finally
-                    {
-                        _reference.release();
-                    }
-                }
-
-                public void onRollback()
-                {
-                    _reference.release();
-                }
-            });
-            return 1;
-        }
-        else
-        {
-            return 0;
+            result.addQueue(this);
         }
-
+        return result;
     }
 
     @Override

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
 Sun Dec 25 22:55:13 2016
@@ -30,9 +30,5 @@ import org.apache.qpid.server.util.Actio
 public interface BaseQueue extends TransactionLogResource
 {
     void enqueue(ServerMessage message, Action<? super MessageInstance> 
action, MessageEnqueueRecord record);
-
-    boolean isDurable();
     boolean isDeleted();
-
-    String getName();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 Sun Dec 25 22:55:13 2016
@@ -36,6 +36,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
@@ -569,27 +570,23 @@ public abstract class QueueEntryImpl imp
         final Queue<?> currentQueue = getQueue();
         Exchange<?> alternateExchange = currentQueue.getAlternateExchange();
         boolean autocommit =  txn == null;
-        int enqueues;
 
         if(autocommit)
         {
             txn = new 
LocalTransaction(getQueue().getVirtualHost().getMessageStore());
         }
 
+        RoutingResult result;
         if (alternateExchange != null)
         {
-            enqueues = alternateExchange.send(getMessage(),
-                                              
getMessage().getInitialRoutingAddress(),
-                                              getInstanceProperties(),
-                                              txn,
-                                              action);
+            result = alternateExchange.route(getMessage(), 
getMessage().getInitialRoutingAddress(),
+                                                           
getInstanceProperties());
         }
         else
         {
-            enqueues = 0;
+            result = new RoutingResult<>(getMessage());
         }
-
-        txn.dequeue(getEnqueueRecord(), new ServerTransaction.Action()
+        txn.addPostTransactionAction(new ServerTransaction.Action()
         {
             public void postCommit()
             {
@@ -601,13 +598,13 @@ public abstract class QueueEntryImpl imp
 
             }
         });
+        int enqueues = result.send(txn, null);
 
         if(autocommit)
         {
             txn.commit();
         }
-
-        return enqueues;
+        return  enqueues;
     }
 
     public boolean isQueueDeleted()

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
 Sun Dec 25 22:55:13 2016
@@ -24,9 +24,7 @@ import java.util.UUID;
 
 public interface TransactionLogResource
 {
-
     String getName();
-    public UUID getId();
-    //boolean isDurable();
+    UUID getId();
     MessageDurability getMessageDurability();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
 Sun Dec 25 22:55:13 2016
@@ -259,7 +259,7 @@ public class AsyncAutoCommitTransaction
 
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage 
message, EnqueueAction postTransactionAction)
+    public void enqueue(Collection<? extends BaseQueue> queues, 
EnqueueableMessage message, EnqueueAction postTransactionAction)
     {
         Transaction txn = null;
         try

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
 Sun Dec 25 22:55:13 2016
@@ -185,7 +185,7 @@ public class AutoCommitTransaction imple
 
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage 
message, EnqueueAction postTransactionAction)
+    public void enqueue(Collection<? extends BaseQueue> queues, 
EnqueueableMessage message, EnqueueAction postTransactionAction)
     {
         Transaction txn = null;
         try

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
 Sun Dec 25 22:55:13 2016
@@ -21,6 +21,8 @@
 
 package org.apache.qpid.server.txn;
 
+import java.util.Collection;
+
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -29,9 +31,6 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.transport.Xid;
 
-import java.util.Collection;
-import java.util.List;
-
 public class DistributedTransaction implements ServerTransaction
 {
 
@@ -137,7 +136,7 @@ public class DistributedTransaction impl
         }
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage 
message,
+    public void enqueue(Collection<? extends BaseQueue> queues, 
EnqueueableMessage message,
                         final EnqueueAction postTransactionAction)
     {
         if(_branch != null)

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
 Sun Dec 25 22:55:13 2016
@@ -264,7 +264,7 @@ public class LocalTransaction implements
         }
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage 
message, EnqueueAction postTransactionAction)
+    public void enqueue(Collection<? extends BaseQueue> queues, 
EnqueueableMessage message, EnqueueAction postTransactionAction)
     {
         sync();
         initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
 Sun Dec 25 22:55:13 2016
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.txn;
 
 import java.util.Collection;
-import java.util.List;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.MessageInstance;
@@ -111,7 +110,7 @@ public interface ServerTransaction
      * 
      * Store operations will result only for a persistent messages on durable 
queues.
      */
-    void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, 
EnqueueAction postTransactionAction);
+    void enqueue(Collection<? extends BaseQueue> queues, EnqueueableMessage 
message, EnqueueAction postTransactionAction);
 
     /** 
      * Commit the transaction represented by this object.

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 Sun Dec 25 22:55:13 2016
@@ -89,6 +89,7 @@ import org.apache.qpid.server.message.In
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageNode;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.*;
@@ -801,8 +802,9 @@ public abstract class AbstractVirtualHos
                         }
                     }
                 };
-
-        return destination.send(internalMessage, address, instanceProperties, 
txn, null);
+        final RoutingResult<InternalMessage> result =
+                destination.route(internalMessage, address, 
instanceProperties);
+        return result.send(txn, null);
 
     }
 

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
 Sun Dec 25 22:55:13 2016
@@ -20,11 +20,15 @@
  */
 package org.apache.qpid.server.exchange;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -40,15 +44,22 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -79,6 +90,7 @@ public class FanoutExchangeTest extends
         _virtualHost = mock(QueueManagingVirtualHost.class);
 
         when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
+        when(_virtualHost.getState()).thenReturn(State.ACTIVE);
         when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor);
         when(_virtualHost.getChildExecutor()).thenReturn(_taskExecutor);
         when(_virtualHost.getModel()).thenReturn(BrokerModel.getInstance());
@@ -153,6 +165,9 @@ public class FanoutExchangeTest extends
         when(queue.getChildExecutor()).thenReturn(taskExecutor);
         when(queue.getParent()).thenReturn(_virtualHost);
         when(_virtualHost.getAttainedQueue(eq(name))).thenReturn(queue);
+        RoutingResult result = new RoutingResult(null);
+        result.addQueue(queue);
+        
when(queue.route(any(ServerMessage.class),anyString(),any(InstanceProperties.class))).thenReturn(result);
         return queue;
     }
 
@@ -165,8 +180,8 @@ public class FanoutExchangeTest extends
         _exchange.addBinding("key",queue1, null);
         _exchange.addBinding("key",queue2, null);
 
-
-        List<? extends BaseQueue> result = _exchange.route(mockMessage(true), 
"", InstanceProperties.EMPTY);
+        List<? extends BaseQueue> result;
+        result = routeToQueues(mockMessage(true), "", 
InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, 
result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -174,8 +189,7 @@ public class FanoutExchangeTest extends
 
         _exchange.addBinding("key2",queue2, 
Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select
 = True"));
 
-
-        result = _exchange.route(mockMessage(true), "", 
InstanceProperties.EMPTY);
+        result = routeToQueues(mockMessage(true), "", 
InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, 
result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -183,14 +197,13 @@ public class FanoutExchangeTest extends
 
         _exchange.deleteBinding("key",queue2);
 
-        result = _exchange.route(mockMessage(true), "", 
InstanceProperties.EMPTY);
+        result = routeToQueues(mockMessage(true), "", 
InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, 
result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
         assertTrue("Expected queue2 to be routed to", result.contains(queue2));
 
-
-        result = _exchange.route(mockMessage(false), "", 
InstanceProperties.EMPTY);
+        result = routeToQueues(mockMessage(false), "", 
InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to queue1 only", 1, 
result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -198,8 +211,7 @@ public class FanoutExchangeTest extends
 
         _exchange.addBinding("key",queue2, 
Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select
 = False"));
 
-
-        result = _exchange.route(mockMessage(false), "", 
InstanceProperties.EMPTY);
+        result = routeToQueues(mockMessage(false), "", 
InstanceProperties.EMPTY);
         assertEquals("Expected message to be routed to both queues", 2, 
result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
         assertTrue("Expected queue2 to be routed to", result.contains(queue2));
@@ -207,6 +219,88 @@ public class FanoutExchangeTest extends
 
     }
 
+    private List<? extends BaseQueue> routeToQueues(final ServerMessage 
message,
+                                                    final String 
routingAddress,
+                                                    final InstanceProperties 
instanceProperties)
+    {
+        RoutingResult result = _exchange.route(message, routingAddress, 
instanceProperties);
+        final List<BaseQueue> resultQueues = new ArrayList<>();
+        result.send(new ServerTransaction()
+        {
+            @Override
+            public long getTransactionStartTime()
+            {
+                return 0;
+            }
+
+            @Override
+            public long getTransactionUpdateTime()
+            {
+                return 0;
+            }
+
+            @Override
+            public void addPostTransactionAction(final Action 
postTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void dequeue(final MessageEnqueueRecord record, final 
Action postTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void dequeue(final Collection<MessageInstance> messages, 
final Action postTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void enqueue(final TransactionLogResource queue,
+                                final EnqueueableMessage message,
+                                final EnqueueAction postTransactionAction)
+            {
+                resultQueues.add((BaseQueue) queue);
+            }
+
+            @Override
+            public void enqueue(final Collection<? extends BaseQueue> queues,
+                                final EnqueueableMessage message,
+                                final EnqueueAction postTransactionAction)
+            {
+                resultQueues.addAll(queues);
+            }
+
+            @Override
+            public void commit()
+            {
+
+            }
+
+            @Override
+            public void commit(final Runnable immediatePostTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void rollback()
+            {
+
+            }
+
+            @Override
+            public boolean isTransactional()
+            {
+                return false;
+            }
+        }, null);
+
+        return resultQueues;
+    }
+
     private ServerMessage mockMessage(boolean val)
     {
         final AMQMessageHeader header = mock(AMQMessageHeader.class);
@@ -225,6 +319,7 @@ public class FanoutExchangeTest extends
         });
         final ServerMessage serverMessage = mock(ServerMessage.class);
         when(serverMessage.getMessageHeader()).thenReturn(header);
+        
when(serverMessage.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
         return serverMessage;
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to