Author: rgodfrey
Date: Sun Dec 25 22:55:13 2016
New Revision: 1776037
URL: http://svn.apache.org/viewvc?rev=1776037&view=rev
Log:
QPID-6028 : allow binding destinations to be things other than queues
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
(with props)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
Sun Dec 25 22:55:13 2016
@@ -24,6 +24,7 @@ import java.security.AccessControlExcept
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -49,9 +50,8 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Binding;
@@ -66,13 +66,9 @@ import org.apache.qpid.server.model.Publ
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.FixedKeyMapCreator;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -86,6 +82,8 @@ public abstract class AbstractExchange<T
{
private static final Logger _logger =
LoggerFactory.getLogger(AbstractExchange.class);
+ private static final ThreadLocal<Map<AbstractExchange<?>, Set<String>>>
CURRENT_ROUTING = new ThreadLocal<>();
+
private static final FixedKeyMapCreator BIND_ARGUMENTS_CREATOR =
new FixedKeyMapCreator("bindingKey", "destination", "arguments");
private static final FixedKeyMapCreator UNBIND_ARGUMENTS_CREATOR =
@@ -469,131 +467,84 @@ public abstract class AbstractExchange<T
}
- final List<? extends BaseQueue> route(final ServerMessage message,
- final String routingAddress,
- final InstanceProperties
instanceProperties)
- {
- _receivedMessageCount.incrementAndGet();
- _receivedMessageSize.addAndGet(message.getSize());
- List<? extends BaseQueue> queues = doRoute(message, routingAddress,
instanceProperties);
- List<? extends BaseQueue> allQueues = queues;
-
- boolean deletedQueues = false;
-
- for(BaseQueue q : allQueues)
- {
- if(q.isDeleted())
- {
- if(!deletedQueues)
- {
- deletedQueues = true;
- queues = new ArrayList<>(allQueues);
- }
- _logger.debug("Exchange: {} - attempt to enqueue message onto
deleted queue {}", getName(), q.getName());
-
- queues.remove(q);
- }
- }
-
-
- if(!queues.isEmpty())
- {
- _routedMessageCount.incrementAndGet();
- _routedMessageSize.addAndGet(message.getSize());
- }
- else
- {
- _droppedMessageCount.incrementAndGet();
- _droppedMessageSize.addAndGet(message.getSize());
- }
- return queues;
- }
-
@Override
- public final <M extends ServerMessage<? extends StorableMessageMetaData>>
int send(final M message,
-
final String routingAddress,
-
final InstanceProperties instanceProperties,
-
final ServerTransaction txn,
-
final Action<? super MessageInstance> postEnqueueAction)
+ public <M extends ServerMessage<? extends StorableMessageMetaData>>
RoutingResult<M> route(final M message,
+
final String routingAddress,
+
final InstanceProperties instanceProperties)
{
if (_virtualHost.getState() != State.ACTIVE)
{
throw new VirtualHostUnavailableException(this._virtualHost);
}
- List<? extends BaseQueue> queues = route(message, routingAddress,
instanceProperties);
+ final RoutingResult<M> routingResult = new RoutingResult<>(message);
- if(queues == null || queues.isEmpty())
+ Map<AbstractExchange<?>, Set<String>> currentThreadMap =
CURRENT_ROUTING.get();
+ boolean topLevel = currentThreadMap == null;
+ try
{
- Exchange altExchange = getAlternateExchange();
- if(altExchange != null)
+ if (topLevel)
+ {
+ currentThreadMap = new HashMap<>();
+ CURRENT_ROUTING.set(currentThreadMap);
+ }
+ Set<String> existingRoutes = currentThreadMap.get(this);
+ if (existingRoutes == null)
+ {
+ currentThreadMap.put(this,
Collections.singleton(routingAddress));
+ }
+ else if (existingRoutes.contains(routingAddress))
{
- return altExchange.send(message, routingAddress,
instanceProperties, txn, postEnqueueAction);
+ return routingResult;
}
else
{
- return 0;
+ existingRoutes = new HashSet<>(existingRoutes);
+ existingRoutes.add(routingAddress);
+ currentThreadMap.put(this, existingRoutes);
}
- }
- else
- {
- for(BaseQueue q : queues)
+
+ _receivedMessageCount.incrementAndGet();
+ _receivedMessageSize.addAndGet(message.getSize());
+
+ doRoute(message, routingAddress, instanceProperties,
routingResult);
+
+ if (!routingResult.hasRoutes())
{
- if(!message.isResourceAcceptable(q))
+ Exchange altExchange = getAlternateExchange();
+ if (altExchange != null)
{
- return 0;
+ routingResult.add(altExchange.route(message,
routingAddress, instanceProperties));
}
}
- final BaseQueue[] baseQueues;
- if(message.isReferenced())
+ if (routingResult.hasRoutes())
{
- ArrayList<BaseQueue> uniqueQueues = new
ArrayList<>(queues.size());
- for(BaseQueue q : queues)
- {
- if(!message.isReferenced(q))
- {
- uniqueQueues.add(q);
- }
- }
- baseQueues = uniqueQueues.toArray(new
BaseQueue[uniqueQueues.size()]);
+ _routedMessageCount.incrementAndGet();
+ _routedMessageSize.addAndGet(message.getSize());
}
else
{
- baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+ _droppedMessageCount.incrementAndGet();
+ _droppedMessageSize.addAndGet(message.getSize());
}
- txn.enqueue(queues,message, new ServerTransaction.EnqueueAction()
+ return routingResult;
+ }
+ finally
+ {
+ if(topLevel)
{
- MessageReference _reference = message.newReference();
-
- public void postCommit(MessageEnqueueRecord... records)
- {
- try
- {
- for(int i = 0; i < baseQueues.length; i++)
- {
- baseQueues[i].enqueue(message, postEnqueueAction,
records[i]);
- }
- }
- finally
- {
- _reference.release();
- }
- }
-
- public void onRollback()
- {
- _reference.release();
- }
- });
- return queues.size();
+ CURRENT_ROUTING.set(null);
+ }
}
}
- protected abstract List<? extends BaseQueue> doRoute(final ServerMessage
message,
- final String
routingAddress,
- final
InstanceProperties instanceProperties);
+
+ protected abstract <M extends ServerMessage<? extends
StorableMessageMetaData>> void doRoute(final M message,
+ final String routingAddress,
+ final InstanceProperties
instanceProperties,
+ final RoutingResult<M> result);
@Override
public boolean bind(final String destination,
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
Sun Dec 25 22:55:13 2016
@@ -24,8 +24,8 @@ import java.util.Map;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
@@ -37,8 +37,6 @@ import org.apache.qpid.server.security.R
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class DefaultDestination implements MessageDestination,
PermissionedObject
@@ -94,43 +92,43 @@ public class DefaultDestination implemen
}
- public final <M extends ServerMessage<? extends StorableMessageMetaData>>
int send(final M message,
-
String routingAddress,
-
final InstanceProperties instanceProperties,
-
final ServerTransaction txn,
-
final Action<? super MessageInstance> postEnqueueAction)
+ @Override
+ public <M extends ServerMessage<? extends StorableMessageMetaData>>
RoutingResult<M> route(M message,
+
String routingAddress,
+
InstanceProperties instanceProperties)
{
- if(routingAddress == null || routingAddress.trim().equals(""))
- {
- return 0;
- }
- final MessageDestination dest =
_virtualHost.getAttainedMessageDestination(routingAddress);
- if(dest == null)
+ RoutingResult<M> result = new RoutingResult<>(message);
+
+ if (routingAddress != null && !routingAddress.trim().equals(""))
{
- routingAddress = _virtualHost.getLocalAddress(routingAddress);
- if(routingAddress.contains("/") && !routingAddress.startsWith("/"))
+ final MessageDestination dest =
_virtualHost.getAttainedMessageDestination(routingAddress);
+ if (dest == null)
{
- String[] parts = routingAddress.split("/",2);
- Exchange<?> exchange =
_virtualHost.getAttainedChildFromAddress(Exchange.class, parts[0]);
- if(exchange != null)
+ routingAddress = _virtualHost.getLocalAddress(routingAddress);
+ if (routingAddress.contains("/") &&
!routingAddress.startsWith("/"))
{
- return exchange.send(message, parts[1],
instanceProperties, txn, postEnqueueAction);
+ String[] parts = routingAddress.split("/", 2);
+ Exchange<?> exchange =
_virtualHost.getAttainedChildFromAddress(Exchange.class, parts[0]);
+ if (exchange != null)
+ {
+ result.add(exchange.route(message, parts[1],
instanceProperties));
+ }
}
- }
- else if(!routingAddress.contains("/"))
- {
- Exchange<?> exchange =
_virtualHost.getAttainedChildFromAddress(Exchange.class, routingAddress);
- if(exchange != null)
+ else if (!routingAddress.contains("/"))
{
- return exchange.send(message, "", instanceProperties, txn,
postEnqueueAction);
+ Exchange<?> exchange =
_virtualHost.getAttainedChildFromAddress(Exchange.class, routingAddress);
+ if (exchange != null)
+ {
+ result.add(exchange.route(message, "",
instanceProperties));
+ }
}
}
- return 0;
- }
- else
- {
- return dest.send(message, routingAddress, instanceProperties, txn,
postEnqueueAction);
+ else
+ {
+ result.add(dest.route(message, routingAddress,
instanceProperties));
+ }
}
+ return result;
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
Sun Dec 25 22:55:13 2016
@@ -20,11 +20,9 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -39,10 +37,11 @@ import org.apache.qpid.server.filter.Fil
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl>
implements DirectExchange<DirectExchangeImpl>
@@ -52,8 +51,8 @@ public class DirectExchangeImpl extends
private final class BindingSet
{
- private final Set<BaseQueue> _unfilteredQueues;
- private final Map<BaseQueue, FilterManager> _filteredQueues;
+ private final Set<MessageDestination> _unfilteredQueues;
+ private final Map<MessageDestination, FilterManager> _filteredQueues;
public BindingSet()
{
@@ -61,14 +60,14 @@ public class DirectExchangeImpl extends
_filteredQueues = Collections.emptyMap();
}
- private BindingSet(final Set<BaseQueue> unfilteredQueues,
- final Map<BaseQueue, FilterManager> filteredQueues)
+ private BindingSet(final Set<MessageDestination> unfilteredQueues,
+ final Map<MessageDestination, FilterManager>
filteredQueues)
{
_unfilteredQueues = unfilteredQueues;
_filteredQueues = filteredQueues;
}
- public Set<BaseQueue> getUnfilteredQueues()
+ public Set<MessageDestination> getUnfilteredQueues()
{
return _unfilteredQueues;
}
@@ -83,7 +82,7 @@ public class DirectExchangeImpl extends
return _unfilteredQueues.isEmpty() && _filteredQueues.isEmpty();
}
- public Map<BaseQueue,FilterManager> getFilteredQueues()
+ public Map<MessageDestination,FilterManager> getFilteredQueues()
{
return _filteredQueues;
}
@@ -98,8 +97,8 @@ public class DirectExchangeImpl extends
{
try
{
- Set<BaseQueue> unfilteredQueues;
- Map<BaseQueue, FilterManager> filteredQueues;
+ Set<MessageDestination> unfilteredQueues;
+ Map<MessageDestination, FilterManager> filteredQueues;
if (_unfilteredQueues.contains(destination))
{
unfilteredQueues = new HashSet<>(_unfilteredQueues);
@@ -111,7 +110,7 @@ public class DirectExchangeImpl extends
}
filteredQueues = new HashMap<>(_filteredQueues);
- filteredQueues.put((BaseQueue) destination,
+ filteredQueues.put(destination,
FilterSupport.createMessageFilter(arguments, (Queue<?>) destination));
return new
BindingSet(Collections.unmodifiableSet(unfilteredQueues),
Collections.unmodifiableMap(filteredQueues));
@@ -128,8 +127,8 @@ public class DirectExchangeImpl extends
}
else
{
- Set<BaseQueue> unfilteredQueues;
- Map<BaseQueue, FilterManager> filteredQueues;
+ Set<MessageDestination> unfilteredQueues;
+ Map<MessageDestination, FilterManager> filteredQueues;
if (_filteredQueues.containsKey(destination))
{
filteredQueues = new HashMap<>(_filteredQueues);
@@ -141,7 +140,7 @@ public class DirectExchangeImpl extends
}
unfilteredQueues = new HashSet<>(_unfilteredQueues);
- unfilteredQueues.add((BaseQueue)destination);
+ unfilteredQueues.add(destination);
return new
BindingSet(Collections.unmodifiableSet(unfilteredQueues),
Collections.unmodifiableMap(filteredQueues));
@@ -150,8 +149,8 @@ public class DirectExchangeImpl extends
public BindingSet removeBinding(final MessageDestination destination)
{
- Set<BaseQueue> unfilteredQueues;
- Map<BaseQueue, FilterManager> filteredQueues;
+ Set<MessageDestination> unfilteredQueues;
+ Map<MessageDestination, FilterManager> filteredQueues;
if (_unfilteredQueues.contains(destination))
{
unfilteredQueues = new HashSet<>(_unfilteredQueues);
@@ -182,49 +181,43 @@ public class DirectExchangeImpl extends
super(attributes, vhost);
}
+
@Override
- public List<? extends BaseQueue> doRoute(ServerMessage payload,
- final String routingKey,
- final InstanceProperties
instanceProperties)
+ public <M extends ServerMessage<? extends StorableMessageMetaData>> void
doRoute(final M payload,
+
final String routingKey,
+
final InstanceProperties instanceProperties,
+
final RoutingResult<M> result)
{
BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" :
routingKey);
if(bindings != null)
{
- // TODO - remove this garbage generation
- List<BaseQueue> queues = new
ArrayList<>(bindings.getUnfilteredQueues());
+ final Set<MessageDestination> unfilteredQueues =
bindings.getUnfilteredQueues();
+ for(MessageDestination destination : unfilteredQueues)
+ {
+ result.add(destination.route(payload, routingKey,
instanceProperties));
+ }
if(bindings.hasFilteredQueues())
{
- Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues);
Filterable filterable =
Filterable.Factory.newInstance(payload, instanceProperties);
- Map<BaseQueue, FilterManager> filteredQueues =
bindings.getFilteredQueues();
- for(Map.Entry<BaseQueue, FilterManager> entry :
filteredQueues.entrySet())
+ Map<MessageDestination, FilterManager> filteredQueues =
bindings.getFilteredQueues();
+ for(Map.Entry<MessageDestination, FilterManager> entry :
filteredQueues.entrySet())
{
- if(!queuesSet.contains(entry.getKey()))
+ if(!unfilteredQueues.contains(entry.getKey()))
{
FilterManager filter = entry.getValue();
if(filter.allAllow(filterable))
{
- queuesSet.add(entry.getKey());
+ result.add(entry.getKey().route(payload,
routingKey, instanceProperties));
}
}
}
- if(queues.size() != queuesSet.size())
- {
- queues = new ArrayList<>(queuesSet);
- }
}
- return queues;
- }
- else
- {
- return Collections.emptyList();
}
-
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
Sun Dec 25 22:55:13 2016
@@ -35,10 +35,11 @@ import org.apache.qpid.server.filter.Fil
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl>
implements FanoutExchange<FanoutExchangeImpl>
@@ -51,15 +52,15 @@ class FanoutExchangeImpl extends Abstrac
{
private final Map<MessageDestination,Integer> _queues;
- private final List<Queue<?>> _unfilteredQueues;
- private final List<Queue<?>> _filteredQueues;
+ private final List<MessageDestination> _unfilteredQueues;
+ private final List<MessageDestination> _filteredQueues;
- private final Map<Queue<?>,Map<BindingIdentifier, FilterManager>>
_filteredBindings;
+ private final Map<MessageDestination,Map<BindingIdentifier,
FilterManager>> _filteredBindings;
public BindingSet(final Map<MessageDestination, Integer> queues,
- final List<Queue<?>> unfilteredQueues,
- final List<Queue<?>> filteredQueues,
- final Map<Queue<?>, Map<BindingIdentifier,
FilterManager>> filteredBindings)
+ final List<MessageDestination> unfilteredQueues,
+ final List<MessageDestination> filteredQueues,
+ final Map<MessageDestination, Map<BindingIdentifier,
FilterManager>> filteredBindings)
{
_queues = queues;
_unfilteredQueues = unfilteredQueues;
@@ -81,19 +82,19 @@ class FanoutExchangeImpl extends Abstrac
{
try
{
- List<Queue<?>> filteredQueues;
+ List<MessageDestination> filteredQueues;
if
(!(_filteredQueues.contains(binding.getDestination())
||
_unfilteredQueues.contains(binding.getDestination())))
{
filteredQueues = new ArrayList<>(_filteredQueues);
- filteredQueues.add((Queue<?>)
binding.getDestination());
+ filteredQueues.add(binding.getDestination());
filteredQueues =
Collections.unmodifiableList(filteredQueues);
}
else
{
filteredQueues = _filteredQueues;
}
- Map<Queue<?>, Map<BindingIdentifier, FilterManager>>
filteredBindings =
+ Map<MessageDestination, Map<BindingIdentifier,
FilterManager>> filteredBindings =
new HashMap<>(_filteredBindings);
Map<BindingIdentifier, FilterManager> bindingsForQueue
=
filteredBindings.get(binding.getDestination());
@@ -107,8 +108,8 @@ class FanoutExchangeImpl extends Abstrac
}
bindingsForQueue.put(binding,
FilterSupport.createMessageFilter(arguments,
-
(Queue<?>) binding.getDestination()));
- filteredBindings.put((Queue<?>)
binding.getDestination(), bindingsForQueue);
+
binding.getDestination()));
+ filteredBindings.put(binding.getDestination(),
bindingsForQueue);
return new BindingSet(_queues, _unfilteredQueues,
filteredQueues, Collections.unmodifiableMap(filteredBindings));
}
catch (AMQInvalidArgumentException e)
@@ -122,8 +123,8 @@ class FanoutExchangeImpl extends Abstrac
else
{
Map<MessageDestination, Integer> queues = new
HashMap<>(_queues);
- List<Queue<?>> unfilteredQueues;
- List<Queue<?>> filteredQueues;
+ List<MessageDestination> unfilteredQueues;
+ List<MessageDestination> filteredQueues;
if (queues.containsKey(binding.getDestination()))
{
queues.put(binding.getDestination(),
queues.get(binding.getDestination()) + 1);
@@ -162,10 +163,10 @@ class FanoutExchangeImpl extends Abstrac
Queue<?> queue = (Queue<?>) binding.getDestination();
if(_filteredBindings.containsKey(queue) &&
_filteredBindings.get(queue).containsKey(binding))
{
- final Map<Queue<?>, Map<BindingIdentifier, FilterManager>>
filteredBindings = new HashMap<>(_filteredBindings);
+ final Map<MessageDestination, Map<BindingIdentifier,
FilterManager>> filteredBindings = new HashMap<>(_filteredBindings);
final Map<BindingIdentifier, FilterManager> bindingsForQueue =
new HashMap<>(filteredBindings.remove(queue));
bindingsForQueue.remove(binding);
- List<Queue<?>> filteredQueues;
+ List<MessageDestination> filteredQueues;
if(bindingsForQueue.isEmpty())
{
filteredQueues = new ArrayList<>(_filteredQueues);
@@ -183,8 +184,8 @@ class FanoutExchangeImpl extends Abstrac
{
Map<MessageDestination, Integer> queues = new
HashMap<>(_queues);
int count = queues.remove(queue);
- List<Queue<?>> unfilteredQueues;
- List<Queue<?>> filteredQueues;
+ List<MessageDestination> unfilteredQueues;
+ List<MessageDestination> filteredQueues;
if(count > 1)
{
queues.put(queue, --count);
@@ -230,28 +231,29 @@ class FanoutExchangeImpl extends Abstrac
}
@Override
- public ArrayList<BaseQueue> doRoute(ServerMessage payload,
- final String routingKey,
- final InstanceProperties
instanceProperties)
+ protected <M extends ServerMessage<? extends StorableMessageMetaData>>
void doRoute(final M message,
+
final String routingAddress,
+
final InstanceProperties instanceProperties,
+
final RoutingResult<M> result)
{
-
BindingSet bindingSet = _bindingSet;
- final ArrayList<BaseQueue> result = new
ArrayList<BaseQueue>(bindingSet._unfilteredQueues);
-
-
- final Map<Queue<?>, Map<BindingIdentifier, FilterManager>>
filteredBindings = bindingSet._filteredBindings;
+ for(MessageDestination destination : bindingSet._unfilteredQueues)
+ {
+ result.add(destination.route(message, routingAddress,
instanceProperties));
+ }
+ final Map<MessageDestination, Map<BindingIdentifier, FilterManager>>
filteredBindings = bindingSet._filteredBindings;
if(!bindingSet._filteredQueues.isEmpty())
{
- for(Queue<?> q : bindingSet._filteredQueues)
+ for(MessageDestination q : bindingSet._filteredQueues)
{
final Map<BindingIdentifier, FilterManager>
bindingMessageFilterMap = filteredBindings.get(q);
- if(!(bindingMessageFilterMap == null || result.contains(q)))
+ if(!(bindingMessageFilterMap == null ||
bindingSet._unfilteredQueues.contains(q)))
{
for(FilterManager filter :
bindingMessageFilterMap.values())
{
-
if(filter.allAllow(Filterable.Factory.newInstance(payload, instanceProperties)))
+
if(filter.allAllow(Filterable.Factory.newInstance(message, instanceProperties)))
{
- result.add(q);
+ result.add(q.route(message, routingAddress,
instanceProperties));
break;
}
}
@@ -261,10 +263,6 @@ class FanoutExchangeImpl extends Abstrac
}
- _logger.debug("Publishing message to queue {}", result);
-
- return result;
-
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
Sun Dec 25 22:55:13 2016
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedHashSet;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,10 +33,11 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
/**
@@ -86,14 +85,13 @@ public class HeadersExchangeImpl extends
}
@Override
- public ArrayList<BaseQueue> doRoute(ServerMessage payload,
- final String routingKey,
- final InstanceProperties
instanceProperties)
+ public <M extends ServerMessage<? extends StorableMessageMetaData>> void
doRoute(M payload,
+
final String routingKey,
+
final InstanceProperties instanceProperties,
+
RoutingResult<M> routingResult)
{
_logger.debug("Exchange {}: routing message with headers {}",
getName(), payload.getMessageHeader());
- LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>();
-
for (HeadersBinding hb : _bindingHeaderMatchers)
{
if
(hb.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
@@ -106,13 +104,12 @@ public class HeadersExchangeImpl extends
_logger.debug("Exchange " + getName() + ": delivering
message with headers " +
payload.getMessageHeader() + " to " +
b.getDestination().getName());
}
- queues.add((BaseQueue) b.getDestination());
+ routingResult.add(b.getDestination().route(payload,
routingKey, instanceProperties));
}
}
-
- return new ArrayList<>(queues);
}
+
@Override
protected void onBind(final BindingIdentifier binding, Map<String,Object>
arguments)
{
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
Sun Dec 25 22:55:13 2016
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -39,11 +38,12 @@ import org.apache.qpid.server.filter.AMQ
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -68,9 +68,9 @@ class TopicExchangeImpl extends Abstract
protected synchronized void onBindingUpdated(final BindingIdentifier
binding, final Map<String, Object> newArguments)
{
final String bindingKey = binding.getBindingKey();
- Queue<?> queue = (Queue<?>) binding.getDestination();
+ final MessageDestination destination = binding.getDestination();
- _logger.debug("Updating binding of queue {} with routing key {}",
queue.getName(), bindingKey);
+ _logger.debug("Updating binding of queue {} with routing key {}",
destination.getName(), bindingKey);
String routingKey = TopicNormalizer.normalize(bindingKey);
@@ -87,22 +87,22 @@ class TopicExchangeImpl extends Abstract
{
if (FilterSupport.argumentsContainFilter(oldArgs))
{
- result.replaceQueueFilter(queue,
-
FilterSupport.createMessageFilter(oldArgs, queue),
-
FilterSupport.createMessageFilter(newArguments, queue));
+ result.replaceQueueFilter(destination,
+
FilterSupport.createMessageFilter(oldArgs, destination),
+
FilterSupport.createMessageFilter(newArguments, destination));
}
else
{
- result.addFilteredQueue(queue,
FilterSupport.createMessageFilter(newArguments, queue));
- result.removeUnfilteredQueue(queue);
+ result.addFilteredQueue(destination,
FilterSupport.createMessageFilter(newArguments, destination));
+ result.removeUnfilteredQueue(destination);
}
}
else
{
if (FilterSupport.argumentsContainFilter(oldArgs))
{
- result.addUnfilteredQueue(queue);
- result.removeFilteredQueue(queue,
FilterSupport.createMessageFilter(oldArgs, queue));
+ result.addUnfilteredQueue(destination);
+ result.removeFilteredQueue(destination,
FilterSupport.createMessageFilter(oldArgs, destination));
}
else
{
@@ -204,39 +204,25 @@ class TopicExchangeImpl extends Abstract
}
@Override
- public ArrayList<BaseQueue> doRoute(ServerMessage payload,
- final String routingAddress,
- final InstanceProperties
instanceProperties)
+ public <M extends ServerMessage<? extends StorableMessageMetaData>> void
doRoute(M payload,
+
String routingAddress,
+
InstanceProperties instanceProperties,
+
RoutingResult<M> result)
{
-
final String routingKey = routingAddress == null
- ? ""
- : routingAddress;
+ ? ""
+ : routingAddress;
- final Collection<Queue<?>> matchedQueues =
+ final Collection<MessageDestination> matchedQueues =
getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties),
routingKey);
- ArrayList<BaseQueue> queues;
-
- if(matchedQueues.getClass() == ArrayList.class)
- {
- queues = (ArrayList) matchedQueues;
- }
- else
- {
- queues = new ArrayList<BaseQueue>();
- queues.addAll(matchedQueues);
- }
-
- if(queues == null || queues.isEmpty())
+ for(MessageDestination queue : matchedQueues)
{
- _logger.info("Message routing key: " + routingAddress + " No
routes.");
+ result.add(queue.route(payload, routingAddress,
instanceProperties));
}
-
- return queues;
-
}
+
private synchronized boolean deregisterQueue(final BindingIdentifier
binding)
{
if(_bindings.containsKey(binding))
@@ -274,7 +260,7 @@ class TopicExchangeImpl extends Abstract
}
}
- private Collection<Queue<?>> getMatchedQueues(Filterable message, String
routingKey)
+ private Collection<MessageDestination> getMatchedQueues(Filterable
message, String routingKey)
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
@@ -287,7 +273,7 @@ class TopicExchangeImpl extends Abstract
results.toArray(resultQueues);
return
((TopicExchangeResult)resultQueues[0]).processMessage(message, null);
default:
- Collection<Queue<?>> queues = new HashSet<>();
+ Collection<MessageDestination> queues = new HashSet<>();
for(TopicMatcherResult result : results)
{
TopicExchangeResult res = (TopicExchangeResult)result;
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
Sun Dec 25 22:55:13 2016
@@ -33,22 +33,22 @@ import java.util.concurrent.CopyOnWriteA
import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.message.MessageDestination;
public final class TopicExchangeResult implements TopicMatcherResult
{
private final List<AbstractExchange.BindingIdentifier> _bindings = new
CopyOnWriteArrayList<>();
- private final Map<Queue<?>, Integer> _unfilteredQueues = new
ConcurrentHashMap<>();
- private final ConcurrentMap<Queue<?>, Map<FilterManager,Integer>>
_filteredQueues = new ConcurrentHashMap<>();
- private volatile ArrayList<Queue<?>> _unfilteredQueueList = new
ArrayList<>(0);
+ private final Map<MessageDestination, Integer> _unfilteredQueues = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<MessageDestination,
Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>();
+ private volatile ArrayList<MessageDestination> _unfilteredQueueList = new
ArrayList<>(0);
- public void addUnfilteredQueue(Queue<?> queue)
+ public void addUnfilteredQueue(MessageDestination queue)
{
Integer instances = _unfilteredQueues.get(queue);
if(instances == null)
{
_unfilteredQueues.put(queue, 1);
- ArrayList<Queue<?>> newList = new
ArrayList<>(_unfilteredQueueList);
+ ArrayList<MessageDestination> newList = new
ArrayList<>(_unfilteredQueueList);
newList.add(queue);
_unfilteredQueueList = newList;
}
@@ -58,13 +58,13 @@ public final class TopicExchangeResult i
}
}
- public void removeUnfilteredQueue(Queue<?> queue)
+ public void removeUnfilteredQueue(MessageDestination queue)
{
Integer instances = _unfilteredQueues.get(queue);
if(instances == 1)
{
_unfilteredQueues.remove(queue);
- ArrayList<Queue<?>> newList = new
ArrayList<>(_unfilteredQueueList);
+ ArrayList<MessageDestination> newList = new
ArrayList<>(_unfilteredQueueList);
newList.remove(queue);
_unfilteredQueueList = newList;
@@ -76,7 +76,7 @@ public final class TopicExchangeResult i
}
- public Collection<Queue<?>> getUnfilteredQueues()
+ public Collection<MessageDestination> getUnfilteredQueues()
{
return _unfilteredQueues.keySet();
}
@@ -96,7 +96,7 @@ public final class TopicExchangeResult i
return new ArrayList<>(_bindings);
}
- public void addFilteredQueue(Queue<?> queue, FilterManager filter)
+ public void addFilteredQueue(MessageDestination queue, FilterManager
filter)
{
Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
if(filters == null)
@@ -116,7 +116,7 @@ public final class TopicExchangeResult i
}
- public void removeFilteredQueue(Queue<?> queue, FilterManager filter)
+ public void removeFilteredQueue(MessageDestination queue, FilterManager
filter)
{
Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
if(filters != null)
@@ -142,7 +142,7 @@ public final class TopicExchangeResult i
}
- public void replaceQueueFilter(Queue<?> queue,
+ public void replaceQueueFilter(MessageDestination queue,
FilterManager oldFilter,
FilterManager newFilter)
{
@@ -169,7 +169,7 @@ public final class TopicExchangeResult i
_filteredQueues.put(queue,newFilters);
}
- public Collection<Queue<?>> processMessage(Filterable msg,
Collection<Queue<?>> queues)
+ public Collection<MessageDestination> processMessage(Filterable msg,
Collection<MessageDestination> queues)
{
if(queues == null)
{
@@ -179,18 +179,18 @@ public final class TopicExchangeResult i
}
else
{
- queues = new HashSet<Queue<?>>();
+ queues = new HashSet<>();
}
}
else if(!(queues instanceof Set))
{
- queues = new HashSet<Queue<?>>(queues);
+ queues = new HashSet<>(queues);
}
queues.addAll(_unfilteredQueues.keySet());
if(!_filteredQueues.isEmpty())
{
- for(Map.Entry<Queue<?>, Map<FilterManager, Integer>> entry :
_filteredQueues.entrySet())
+ for(Map.Entry<MessageDestination, Map<FilterManager, Integer>>
entry : _filteredQueues.entrySet())
{
if(!queues.contains(entry.getKey()))
{
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
Sun Dec 25 22:55:13 2016
@@ -31,6 +31,7 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.queue.QueueConsumer;
@@ -94,13 +95,13 @@ public class FilterSupport
&&
((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() !=
0;
}
- public static FilterManager createMessageFilter(final Map<String,Object>
args, Queue<?> queue) throws AMQInvalidArgumentException
+ public static FilterManager createMessageFilter(final Map<String,Object>
args, MessageDestination queue) throws AMQInvalidArgumentException
{
FilterManager filterManager = null;
- if(argumentsContainNoLocal(args))
+ if(argumentsContainNoLocal(args) && queue instanceof Queue)
{
filterManager = new FilterManager();
- filterManager.add(AMQPFilterTypes.NO_LOCAL.toString(), new
NoLocalFilter(queue));
+ filterManager.add(AMQPFilterTypes.NO_LOCAL.toString(), new
NoLocalFilter((Queue<?>) queue));
}
if(argumentsContainJMSSelector(args))
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
Sun Dec 25 22:55:13 2016
@@ -27,8 +27,6 @@ import org.apache.qpid.server.model.Name
import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
public interface MessageDestination extends MessageNode
{
@@ -46,15 +44,10 @@ public interface MessageDestination exte
* @param message the message to be routed
* @param routingAddress
* @param instanceProperties the instance properties
- * @param txn the transaction to enqueue within
- * @param postEnqueueAction action to perform on the result of every
enqueue (may be null)
- * @return the number of queues in which the message was enqueued performed
- */
- <M extends ServerMessage<? extends StorableMessageMetaData>> int send(M
message,
-
final String routingAddress,
-
InstanceProperties instanceProperties,
-
ServerTransaction txn,
-
Action<? super MessageInstance> postEnqueueAction);
+ */
+ <M extends ServerMessage<? extends StorableMessageMetaData>>
RoutingResult<M> route(M message,
+
String routingAddress,
+
InstanceProperties instanceProperties);
boolean isDurable();
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java?rev=1776037&view=auto
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
(added)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
Sun Dec 25 22:55:13 2016
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+
+public class RoutingResult<M extends ServerMessage<? extends
StorableMessageMetaData>>
+{
+ private static final Logger _logger =
LoggerFactory.getLogger(RoutingResult.class);
+
+ private final M _message;
+
+ private final Set<BaseQueue> _queues = new HashSet<>();
+
+ public RoutingResult(final M message)
+ {
+ _message = message;
+ }
+
+ public void addQueue(BaseQueue q)
+ {
+ if(q.isDeleted())
+ {
+ _logger.debug("Attempt to enqueue message onto deleted queue {}",
q.getName());
+ }
+ else
+ {
+ _queues.add(q);
+ }
+ }
+
+ public void addQueues(Collection<? extends BaseQueue> queues)
+ {
+ boolean deletedQueues = false;
+ for(BaseQueue q : queues)
+ {
+ if(q.isDeleted())
+ {
+ if(!deletedQueues)
+ {
+ deletedQueues = true;
+ queues = new ArrayList<>(queues);
+ }
+ _logger.debug("Attempt to enqueue message onto deleted queue
{}", q.getName());
+
+ queues.remove(q);
+ }
+ }
+
+ _queues.addAll(queues);
+ }
+
+ public void add(RoutingResult<M> result)
+ {
+ addQueues(result._queues);
+ }
+
+ public int send(ServerTransaction txn,
+ final Action<? super MessageInstance> postEnqueueAction)
+ {
+ for(BaseQueue q : _queues)
+ {
+ if(!_message.isResourceAcceptable(q))
+ {
+ return 0;
+ }
+ }
+ final BaseQueue[] baseQueues;
+
+ if(_message.isReferenced())
+ {
+ ArrayList<BaseQueue> uniqueQueues = new
ArrayList<>(_queues.size());
+ for(BaseQueue q : _queues)
+ {
+ if(!_message.isReferenced(q))
+ {
+ uniqueQueues.add(q);
+ }
+ }
+ baseQueues = uniqueQueues.toArray(new
BaseQueue[uniqueQueues.size()]);
+ }
+ else
+ {
+ baseQueues = _queues.toArray(new BaseQueue[_queues.size()]);
+ }
+ txn.enqueue(_queues, _message, new ServerTransaction.EnqueueAction()
+ {
+ MessageReference _reference = _message.newReference();
+
+ public void postCommit(MessageEnqueueRecord... records)
+ {
+ try
+ {
+ for(int i = 0; i < baseQueues.length; i++)
+ {
+ baseQueues[i].enqueue(_message, postEnqueueAction,
records[i]);
+ }
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return _queues.size();
+ }
+
+ public boolean hasRoutes()
+ {
+ return !_queues.isEmpty();
+ }
+}
Propchange:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Sun Dec 25 22:55:13 2016
@@ -91,6 +91,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.*;
@@ -2567,46 +2568,21 @@ public abstract class AbstractQueue<X ex
_notificationListener = listener == null ? NULL_NOTIFICATION_LISTENER
: listener;
}
- public final <M extends ServerMessage<? extends StorableMessageMetaData>>
int send(final M message,
-
final String routingAddress,
-
final InstanceProperties instanceProperties,
-
final ServerTransaction txn,
-
final Action<? super MessageInstance> postEnqueueAction)
+ @Override
+ public <M extends ServerMessage<? extends StorableMessageMetaData>>
RoutingResult<M> route(final M message,
+
final String routingAddress,
+
final InstanceProperties instanceProperties)
{
if (_virtualHost.getState() != State.ACTIVE)
{
throw new VirtualHostUnavailableException(this._virtualHost);
}
+ RoutingResult<M> result = new RoutingResult<>(message);
if(message.isResourceAcceptable(this) && !message.isReferenced(this))
{
- txn.enqueue(this, message, new ServerTransaction.EnqueueAction()
- {
- MessageReference _reference = message.newReference();
-
- public void postCommit(MessageEnqueueRecord... records)
- {
- try
- {
- AbstractQueue.this.enqueue(message, postEnqueueAction,
records[0]);
- }
- finally
- {
- _reference.release();
- }
- }
-
- public void onRollback()
- {
- _reference.release();
- }
- });
- return 1;
- }
- else
- {
- return 0;
+ result.addQueue(this);
}
-
+ return result;
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
Sun Dec 25 22:55:13 2016
@@ -30,9 +30,5 @@ import org.apache.qpid.server.util.Actio
public interface BaseQueue extends TransactionLogResource
{
void enqueue(ServerMessage message, Action<? super MessageInstance>
action, MessageEnqueueRecord record);
-
- boolean isDurable();
boolean isDeleted();
-
- String getName();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Sun Dec 25 22:55:13 2016
@@ -36,6 +36,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
@@ -569,27 +570,23 @@ public abstract class QueueEntryImpl imp
final Queue<?> currentQueue = getQueue();
Exchange<?> alternateExchange = currentQueue.getAlternateExchange();
boolean autocommit = txn == null;
- int enqueues;
if(autocommit)
{
txn = new
LocalTransaction(getQueue().getVirtualHost().getMessageStore());
}
+ RoutingResult result;
if (alternateExchange != null)
{
- enqueues = alternateExchange.send(getMessage(),
-
getMessage().getInitialRoutingAddress(),
- getInstanceProperties(),
- txn,
- action);
+ result = alternateExchange.route(getMessage(),
getMessage().getInitialRoutingAddress(),
+
getInstanceProperties());
}
else
{
- enqueues = 0;
+ result = new RoutingResult<>(getMessage());
}
-
- txn.dequeue(getEnqueueRecord(), new ServerTransaction.Action()
+ txn.addPostTransactionAction(new ServerTransaction.Action()
{
public void postCommit()
{
@@ -601,13 +598,13 @@ public abstract class QueueEntryImpl imp
}
});
+ int enqueues = result.send(txn, null);
if(autocommit)
{
txn.commit();
}
-
- return enqueues;
+ return enqueues;
}
public boolean isQueueDeleted()
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
Sun Dec 25 22:55:13 2016
@@ -24,9 +24,7 @@ import java.util.UUID;
public interface TransactionLogResource
{
-
String getName();
- public UUID getId();
- //boolean isDurable();
+ UUID getId();
MessageDurability getMessageDurability();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
Sun Dec 25 22:55:13 2016
@@ -259,7 +259,7 @@ public class AsyncAutoCommitTransaction
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage
message, EnqueueAction postTransactionAction)
+ public void enqueue(Collection<? extends BaseQueue> queues,
EnqueueableMessage message, EnqueueAction postTransactionAction)
{
Transaction txn = null;
try
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
Sun Dec 25 22:55:13 2016
@@ -185,7 +185,7 @@ public class AutoCommitTransaction imple
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage
message, EnqueueAction postTransactionAction)
+ public void enqueue(Collection<? extends BaseQueue> queues,
EnqueueableMessage message, EnqueueAction postTransactionAction)
{
Transaction txn = null;
try
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
Sun Dec 25 22:55:13 2016
@@ -21,6 +21,8 @@
package org.apache.qpid.server.txn;
+import java.util.Collection;
+
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -29,9 +31,6 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.transport.Xid;
-import java.util.Collection;
-import java.util.List;
-
public class DistributedTransaction implements ServerTransaction
{
@@ -137,7 +136,7 @@ public class DistributedTransaction impl
}
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage
message,
+ public void enqueue(Collection<? extends BaseQueue> queues,
EnqueueableMessage message,
final EnqueueAction postTransactionAction)
{
if(_branch != null)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
Sun Dec 25 22:55:13 2016
@@ -264,7 +264,7 @@ public class LocalTransaction implements
}
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage
message, EnqueueAction postTransactionAction)
+ public void enqueue(Collection<? extends BaseQueue> queues,
EnqueueableMessage message, EnqueueAction postTransactionAction)
{
sync();
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
Sun Dec 25 22:55:13 2016
@@ -21,7 +21,6 @@
package org.apache.qpid.server.txn;
import java.util.Collection;
-import java.util.List;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageInstance;
@@ -111,7 +110,7 @@ public interface ServerTransaction
*
* Store operations will result only for a persistent messages on durable
queues.
*/
- void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message,
EnqueueAction postTransactionAction);
+ void enqueue(Collection<? extends BaseQueue> queues, EnqueueableMessage
message, EnqueueAction postTransactionAction);
/**
* Commit the transaction represented by this object.
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Sun Dec 25 22:55:13 2016
@@ -89,6 +89,7 @@ import org.apache.qpid.server.message.In
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.*;
@@ -801,8 +802,9 @@ public abstract class AbstractVirtualHos
}
}
};
-
- return destination.send(internalMessage, address, instanceProperties,
txn, null);
+ final RoutingResult<InternalMessage> result =
+ destination.route(internalMessage, address,
instanceProperties);
+ return result.send(txn, null);
}
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
Sun Dec 25 22:55:13 2016
@@ -20,11 +20,15 @@
*/
package org.apache.qpid.server.exchange;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -40,15 +44,22 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -79,6 +90,7 @@ public class FanoutExchangeTest extends
_virtualHost = mock(QueueManagingVirtualHost.class);
when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
+ when(_virtualHost.getState()).thenReturn(State.ACTIVE);
when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor);
when(_virtualHost.getChildExecutor()).thenReturn(_taskExecutor);
when(_virtualHost.getModel()).thenReturn(BrokerModel.getInstance());
@@ -153,6 +165,9 @@ public class FanoutExchangeTest extends
when(queue.getChildExecutor()).thenReturn(taskExecutor);
when(queue.getParent()).thenReturn(_virtualHost);
when(_virtualHost.getAttainedQueue(eq(name))).thenReturn(queue);
+ RoutingResult result = new RoutingResult(null);
+ result.addQueue(queue);
+
when(queue.route(any(ServerMessage.class),anyString(),any(InstanceProperties.class))).thenReturn(result);
return queue;
}
@@ -165,8 +180,8 @@ public class FanoutExchangeTest extends
_exchange.addBinding("key",queue1, null);
_exchange.addBinding("key",queue2, null);
-
- List<? extends BaseQueue> result = _exchange.route(mockMessage(true),
"", InstanceProperties.EMPTY);
+ List<? extends BaseQueue> result;
+ result = routeToQueues(mockMessage(true), "",
InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2,
result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -174,8 +189,7 @@ public class FanoutExchangeTest extends
_exchange.addBinding("key2",queue2,
Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select
= True"));
-
- result = _exchange.route(mockMessage(true), "",
InstanceProperties.EMPTY);
+ result = routeToQueues(mockMessage(true), "",
InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2,
result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -183,14 +197,13 @@ public class FanoutExchangeTest extends
_exchange.deleteBinding("key",queue2);
- result = _exchange.route(mockMessage(true), "",
InstanceProperties.EMPTY);
+ result = routeToQueues(mockMessage(true), "",
InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2,
result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
assertTrue("Expected queue2 to be routed to", result.contains(queue2));
-
- result = _exchange.route(mockMessage(false), "",
InstanceProperties.EMPTY);
+ result = routeToQueues(mockMessage(false), "",
InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to queue1 only", 1,
result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -198,8 +211,7 @@ public class FanoutExchangeTest extends
_exchange.addBinding("key",queue2,
Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select
= False"));
-
- result = _exchange.route(mockMessage(false), "",
InstanceProperties.EMPTY);
+ result = routeToQueues(mockMessage(false), "",
InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2,
result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
assertTrue("Expected queue2 to be routed to", result.contains(queue2));
@@ -207,6 +219,88 @@ public class FanoutExchangeTest extends
}
+ private List<? extends BaseQueue> routeToQueues(final ServerMessage
message,
+ final String
routingAddress,
+ final InstanceProperties
instanceProperties)
+ {
+ RoutingResult result = _exchange.route(message, routingAddress,
instanceProperties);
+ final List<BaseQueue> resultQueues = new ArrayList<>();
+ result.send(new ServerTransaction()
+ {
+ @Override
+ public long getTransactionStartTime()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return 0;
+ }
+
+ @Override
+ public void addPostTransactionAction(final Action
postTransactionAction)
+ {
+
+ }
+
+ @Override
+ public void dequeue(final MessageEnqueueRecord record, final
Action postTransactionAction)
+ {
+
+ }
+
+ @Override
+ public void dequeue(final Collection<MessageInstance> messages,
final Action postTransactionAction)
+ {
+
+ }
+
+ @Override
+ public void enqueue(final TransactionLogResource queue,
+ final EnqueueableMessage message,
+ final EnqueueAction postTransactionAction)
+ {
+ resultQueues.add((BaseQueue) queue);
+ }
+
+ @Override
+ public void enqueue(final Collection<? extends BaseQueue> queues,
+ final EnqueueableMessage message,
+ final EnqueueAction postTransactionAction)
+ {
+ resultQueues.addAll(queues);
+ }
+
+ @Override
+ public void commit()
+ {
+
+ }
+
+ @Override
+ public void commit(final Runnable immediatePostTransactionAction)
+ {
+
+ }
+
+ @Override
+ public void rollback()
+ {
+
+ }
+
+ @Override
+ public boolean isTransactional()
+ {
+ return false;
+ }
+ }, null);
+
+ return resultQueues;
+ }
+
private ServerMessage mockMessage(boolean val)
{
final AMQMessageHeader header = mock(AMQMessageHeader.class);
@@ -225,6 +319,7 @@ public class FanoutExchangeTest extends
});
final ServerMessage serverMessage = mock(ServerMessage.class);
when(serverMessage.getMessageHeader()).thenReturn(header);
+
when(serverMessage.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
return serverMessage;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]